TSM - Simplificarea procesării batchurilor cu Spring Batch

Bianca Moga - Software Developer


Procesarea de tip batch există de foarte mult timp și este folosită pentru a agrega și procesa mai multe sarcini în același lot. Sarcinile agregate pot fi diferite, cum ar fi: extragerea de date dintr-o bază de date, filtrarea lor și, mai apoi, procesarea acestora. Această procesare era o sarcină destul de complexă, care necesita mult efort manual pentru a implementa și gestiona corect toate etapele unui proces batch.

Spring Batch, o soluție de procesare în loturi pentru platforma Spring, ajută la dezvoltarea aplicațiilor de procesare în loturi într-un mod mai modular și previzibil, oferind componente reutilizabile pentru citirea, scrierea și procesarea datelor, gestionarea integrată a tranzacțiilor și mecanisme încorporate pentru a relua procesarea după un eșec.

Cum funcționează

Spring Batch se folosește de un JobRepository, care este responsabil pentru toate informațiile fiecărui job și ale componentelor conexe (JobInstances, JobExecution și StepExecution). Fiecare job este compus din unul sau mai mulți pași (Steps), unul după altul. Cu Spring Batch, un pas poate urma condiționat un alt pas, permițând crearea unor fluxuri de lucru simple. De asemenea, acești pași pot fi executați și în același timp. La rularea unui job, îi este asociat și un JobParameters pentru a parametriza și a adapta jobul la nevoile proiectului. Un exemplu ar fi un job ce primește ca parametru un String pentru a determina tabelul din care va extrage datele. Aceasta legătură se numește JobInstance.

De fiecare dată când rulăm un Job Instance se numește un JobExecution. Ideal, pentru oricare JobInstance ar trebui să existe un singur JobExecution. Fiecare pas rulat din JobExecution se numește StepExecution.

Secvența operațională într-o aplicație ce folosește Spring Batch este următoarea:

Fig. 1. Secvența operațională

Avem un JobScheduler care rulează o dată la ceva timp și apelează JobLauncher. Rolul JobLauncherului este să creeze o nouă înregistrare în baza de date prin JobRepository, pentru a marca faptul că jobul este executat.

Jobul este executat pe pași (Step). Pașii pot fi o citire (ItemReader) sau scriere (ItemWriter) într-o bază de date sau o filtrare și procesare de informații (ItemProcessor). La final, JobLauncherul înregistrează faptul că execuția jobului a fost completată în baza de date prin JobRepository.

Cum integrăm Spring Batch cu Spring Boot?

Pentru exemplu vom folosi o bază de date de tip in-memory pentru JobRepository și vom avea nevoie de următoarele dependințe:

<dependency>
 <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

pentru baza de date și:

<dependency> 
  <groupId>
   org.springframework.boot
 </groupId>
 <artifactId>
   spring-boot-starter-batch
 </artifactId>
</dependency>

pentru procesarea de tip batch, unde Spring Boot ne configurează automat beanurile necesare pentru acest tip de procesare.

Baza de date default va fi folosită de către Spring Batch pentru a-și inițializa schema, dar în cazul în care avem mai multe baze de date asociate, cea pentru Spring Batch va trebui să fie definită explicit prin @BatchDataSource. Putem schimba modul de inițializare a bazei de date cu ajutorul proprietății:

spring.batch.jdbc.initialize-schema=always

Spring Boot rulează în mod automat joburile de tip batch când aplicația pornește. Pentru a dezactiva rularea automată a joburilor putem folosi proprietatea:

spring.batch.job.enabled=false

Exemplu

În exemplu am folosit \@SpringBatch pentru a citi date în loturi dintr-o bază de date remote și pentru a le scrie în baza de date a aplicației pentru a scurta timpul de citire. Baza de date a aplicației funcționa ca un cache local pentru cele două tabele (TableA & TableB).

@Configuration
public class BatchConfig {

    private final SimpleJobRepository jobRepository;
    private final JpaTransactionManager 
      transactionManager;

    @Autowired
    private TableARepository tableARepo;

    @Autowired
    private TableBRepository tableBRepo;

    @Value("${spring.remote-datasource.url}")
    private String url;

    @Value("${spring.remote-datasource.password}")
    private String password;

    @Value("${spring.remote-datasource.username}")
    private String username;

    private EntityManagerFactory 
      entityManagerFactory;

   public BatchConfig(
     SimpleJobRepository jobRepository, 
     JpaTransactionManager transactionManager, 
     EntityManagerFactory entityManagerFactory) {
      this.jobRepository = jobRepository;
      this.transactionManager = transactionManager;
      this.entityManagerFactory=entityManagerFactory;
    }

   public DataSource getRemoteDataSource() {
      DataSource dataSource = new DataSource();
      dataSource.setReadOnly(true);
      dataSource.setUser(username);
      dataSource.setPassword(password);
      dataSource.setURL(url);
      return dataSource;
    }

   @Bean
   public Job importJob(Step step1, Step step2) {       
      return new JobBuilder("updateFromRemoteDB",     
                jobRepository)
                .incrementer(new RunIdIncrementer()
                .flow(step1)
                .next(step2)
                .end()
                .build();
    }
    @Bean
    @JobScope
    @Transactional
    public Step step1() {
      try {
        return new StepBuilder("step1", 
   jobRepository)
      .chunk(100, transactionManager)
      .reader(taleARemoteReader())
      .writer(tableAWriter())
      .build();
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
  }
    public JdbcCursorItemReader 
       tableARemoteReader() {
      JdbcCursorItemReader cursorItemReader = 
        new JdbcCursorItemReader<>();

      cursorItemReader
       .setDataSource(getRemoteDataSource());

      cursorItemReader
       .setSql("select * from tableARemote where …");

      cursorItemReader.setRowMapper(
        new TableARowMapper());
        return cursorItemReader;
    }

    @Bean
    public JpaItemWriter tableAWriter() {
      JpaItemWriter writer = 
       new JpaItemWriter<>();

      writer.setEntityManagerFactory(
       Objects.requireNonNull(entityManagerFactory));
        return writer;
    }

    @Bean
    @JobScope
    @Transactional
    public Step step2() {
     try {
      return new StepBuilder("step2", jobRepository)
      .chunk(100, transactionManager)
      .reader(tableBRemoteReader())
      .writer(tableBWriter())
      .build();
      } catch (Exception e) {
           throw new RuntimeException(e);
      }
  }

    public JdbcCursorItemReader 
      tableBRemoteReader() {
    JdbcCursorItemReader cursorItemReader = 
       new JdbcCursorItemReader<>();

       cursorItemReader
         .setDataSource(getRemoteDataSource());

      cursorItemReader
       .setSql("select * from tableBRemote where …");

      cursorItemReader.setRowMapper(
        new TableBRowMapper());
        return cursorItemReader;
    }

    @Bean
    public JpaItemWriter tableBWriter() {
    JpaItemWriter writer = 
      new JpaItemWriter<>();
      writer.setEntityManagerFactory(
       Objects.requireNonNull(entityManagerFactory));

      return writer;
    }
}

Se realizează conexiunea la baza de date remote, după care se execută partea de citire, unde se aplică interogarea bazei de date și ulterior se înfăptuiește scrierea în baza de date a aplicației. Citirea este făcută in chunks de câte 100 de linii. Desigur că acest număr se poate ajusta în funcție de nevoile aplicațiilor voastre.

Rularea joburilor eșuate

Procesarea în loturi funcționează în principal pe baza unor "chunks", care pot eșua din diverse motive. De exemplu, dacă încercăm să facem o actualizare a bazei de date putem să dăm de excepții precum: DeadlockLoserDataAccessException, CannotAcquireLockException etc.În aceste cazuri putem recurge la un retry. Acest lucru îl putem face prin a menționa excepția pentru care vom dori un retry și de câte ori vom dori acest lucru.

Pentru acest retry vom adăuga câteva linii în plus în pasul de execuție:

   @Bean
   @JobScope
   @Transactional
   public Step step() {
    try {
       return new StepBuilder("step", jobRepository)
           .chunk(100, transactionManager)
        .faultTolerant()
        .retryLimit(3)
        .retry(DeadlockLoserDataAccessException
      .class)
        .reader(tableRemoteReader())
        .writer(tableWriter())
        .build();
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
    }

În cazul în care procesarea unui chunk eșuează de trei ori consecutiv, acel chunk va fi considerat nereușit și va trece la următorul (dacă există), iar excepția va fi propagată, ceea ce poate duce la eșecul întregului pas sau job, în funcție de configurația aleasă.

Pentru a împiedica propagarea excepției în cazul în care un chunk eșuează de trei ori, se pot utiliza mai multe mecanisme oferite de Spring Batch. Cea mai comună abordare este mecanismul de skip și skipLimit pentru a ignora anumite excepții după ce numărul de încercări de retry a fost depășit. Skip va ignora excepția, după ce retry-urile eșuează, iar pentru skipLimit, va trebui să specificăm un număr maxim de excepții care pot fi ignorate înainte ca pasul să fie marcat ca eșuat. Exemplu:

    @Bean
    @JobScope
    @Transactional
    public Step step() {
      try {
      return new StepBuilder("step", jobRepository)
       .chunk(100, transactionManager)
       .faultTolerant()
       .retryLimit(3)
       .retry(DeadlockLoserDataAccessException.class)
       .skipLimit(5)
       .skip(DeadlockLoserDataAccessException.class)                 
       .reader(tableRemoteReader())
       .writer(tableWriter())
       .build();
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
  }

Rollback

Când apare o excepție într-un chunk (sau într-o altă parte a unui Step), Spring Batch va face un rollback al tranzacției curente, ceea ce înseamnă că toate modificările făcute în cadrul acelei tranzacții vor fi anulate.

Dacă dorim să controlăm cum sunt gestionate tranzacțiile și să evităm rollbackul la apariția unei anumite excepții, putem folosi noRollback pentru acea excepție. În Spring Batch, un chunk este procesat într-o singură tranzacție. Dacă se folosește noRollback, tranzacția curentă nu va fi anulată pentru excepția specificată, dar tot ce s-a întâmplat în acea tranzacție va fi păstrat. Nu putem controla cât din tranzacție să fie făcut rollback și cât să fie păstrat. Acest scenariu poate duce la situații în care datele sunt parțial procesate, ceea ce poate compromite integritatea datelor din sistem.

Acest mecanism se folosește, de obicei, în cazul în care erorile apărute nu afectează integritatea datelor și dorim să continuăm procesarea fără a anula tranzacția curentă, de exemplu trimiterea unui email sau apelarea unui API extern.

    @Bean
    @JobScope
    @Transactional
    public Step step() {
     try {
      return new StepBuilder("step", jobRepository)
       .chunk(100, transactionManager)
       .faultTolerant()
       .retryLimit(3)
       .retry(DeadlockLoserDataAccessException.class)
       .noRollback(DeadlockLoserDataAccessException
           .class)
       .skipLimit(5)                     
       .skip(DeadlockLoserDataAccessException.class)                     
       .reader(tableRemoteReader())
       .writer(tableWriter())
       .build();
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
    }

Concluzie

Integrarea Spring Batch cu Spring Boot simplifică și mai mult implementarea, automatizând configurările. Exemplele oferite demonstrează cum Spring Batch poate fi adaptat la nevoile specifice ale aplicațiilor.

În concluzie, Spring Batch reprezintă o soluție eficientă pentru procesarea în loturi, oferind instrumentele necesare pentru gestionarea eficientă a sarcinilor de prelucrare a datelor, adaptabilitate și suport pentru gestionarea erorilor, ceea ce îl face un instrument valoros.

Referințe:

  1. https://docs.spring.io/spring-batch/reference/index.html

  2. https://www.baeldung.com/introduction-to-spring-batch

  3. https://www.toptal.com/spring/spring-batch-tutorial

  4. https://shradhayewale.medium.com/spring-batch-processing-overview-371f1d9ee536