ABONAMENTE VIDEO REDACȚIA
RO
EN
NOU
Numărul 148
Numărul 147 Numărul 146 Numărul 145 Numărul 144 Numărul 143 Numărul 142 Numărul 141 Numărul 140 Numărul 139 Numărul 138 Numărul 137 Numărul 136 Numărul 135 Numărul 134 Numărul 133 Numărul 132 Numărul 131 Numărul 130 Numărul 129 Numărul 128 Numărul 127 Numărul 126 Numărul 125 Numărul 124 Numărul 123 Numărul 122 Numărul 121 Numărul 120 Numărul 119 Numărul 118 Numărul 117 Numărul 116 Numărul 115 Numărul 114 Numărul 113 Numărul 112 Numărul 111 Numărul 110 Numărul 109 Numărul 108 Numărul 107 Numărul 106 Numărul 105 Numărul 104 Numărul 103 Numărul 102 Numărul 101 Numărul 100 Numărul 99 Numărul 98 Numărul 97 Numărul 96 Numărul 95 Numărul 94 Numărul 93 Numărul 92 Numărul 91 Numărul 90 Numărul 89 Numărul 88 Numărul 87 Numărul 86 Numărul 85 Numărul 84 Numărul 83 Numărul 82 Numărul 81 Numărul 80 Numărul 79 Numărul 78 Numărul 77 Numărul 76 Numărul 75 Numărul 74 Numărul 73 Numărul 72 Numărul 71 Numărul 70 Numărul 69 Numărul 68 Numărul 67 Numărul 66 Numărul 65 Numărul 64 Numărul 63 Numărul 62 Numărul 61 Numărul 60 Numărul 59 Numărul 58 Numărul 57 Numărul 56 Numărul 55 Numărul 54 Numărul 53 Numărul 52 Numărul 51 Numărul 50 Numărul 49 Numărul 48 Numărul 47 Numărul 46 Numărul 45 Numărul 44 Numărul 43 Numărul 42 Numărul 41 Numărul 40 Numărul 39 Numărul 38 Numărul 37 Numărul 36 Numărul 35 Numărul 34 Numărul 33 Numărul 32 Numărul 31 Numărul 30 Numărul 29 Numărul 28 Numărul 27 Numărul 26 Numărul 25 Numărul 24 Numărul 23 Numărul 22 Numărul 21 Numărul 20 Numărul 19 Numărul 18 Numărul 17 Numărul 16 Numărul 15 Numărul 14 Numărul 13 Numărul 12 Numărul 11 Numărul 10 Numărul 9 Numărul 8 Numărul 7 Numărul 6 Numărul 5 Numărul 4 Numărul 3 Numărul 2 Numărul 1
×
▼ LISTĂ EDIȚII ▼
Numărul 147
Abonament PDF

Simplificarea procesării batchurilor cu Spring Batch

Bianca Moga
Software Developer



PROGRAMARE


Procesarea de tip batch există de foarte mult timp și este folosită pentru a agrega și procesa mai multe sarcini în același lot. Sarcinile agregate pot fi diferite, cum ar fi: extragerea de date dintr-o bază de date, filtrarea lor și, mai apoi, procesarea acestora. Această procesare era o sarcină destul de complexă, care necesita mult efort manual pentru a implementa și gestiona corect toate etapele unui proces batch.

Spring Batch, o soluție de procesare în loturi pentru platforma Spring, ajută la dezvoltarea aplicațiilor de procesare în loturi într-un mod mai modular și previzibil, oferind componente reutilizabile pentru citirea, scrierea și procesarea datelor, gestionarea integrată a tranzacțiilor și mecanisme încorporate pentru a relua procesarea după un eșec.

Cum funcționează

Spring Batch se folosește de un JobRepository, care este responsabil pentru toate informațiile fiecărui job și ale componentelor conexe (JobInstances, JobExecution și StepExecution). Fiecare job este compus din unul sau mai mulți pași (Steps), unul după altul. Cu Spring Batch, un pas poate urma condiționat un alt pas, permițând crearea unor fluxuri de lucru simple. De asemenea, acești pași pot fi executați și în același timp. La rularea unui job, îi este asociat și un JobParameters pentru a parametriza și a adapta jobul la nevoile proiectului. Un exemplu ar fi un job ce primește ca parametru un String pentru a determina tabelul din care va extrage datele. Aceasta legătură se numește JobInstance.

De fiecare dată când rulăm un Job Instance se numește un JobExecution. Ideal, pentru oricare JobInstance ar trebui să existe un singur JobExecution. Fiecare pas rulat din JobExecution se numește StepExecution.

Secvența operațională într-o aplicație ce folosește Spring Batch este următoarea:

Fig. 1. Secvența operațională

Avem un JobScheduler care rulează o dată la ceva timp și apelează JobLauncher. Rolul JobLauncherului este să creeze o nouă înregistrare în baza de date prin JobRepository, pentru a marca faptul că jobul este executat.

Jobul este executat pe pași (Step). Pașii pot fi o citire (ItemReader) sau scriere (ItemWriter) într-o bază de date sau o filtrare și procesare de informații (ItemProcessor). La final, JobLauncherul înregistrează faptul că execuția jobului a fost completată în baza de date prin JobRepository.

Cum integrăm Spring Batch cu Spring Boot?

Pentru exemplu vom folosi o bază de date de tip in-memory pentru JobRepository și vom avea nevoie de următoarele dependințe:

<dependency>
 <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

pentru baza de date și:

<dependency> 
  <groupId>
   org.springframework.boot
 </groupId>
 <artifactId>
   spring-boot-starter-batch
 </artifactId>
</dependency>

pentru procesarea de tip batch, unde Spring Boot ne configurează automat beanurile necesare pentru acest tip de procesare.

Baza de date default va fi folosită de către Spring Batch pentru a-și inițializa schema, dar în cazul în care avem mai multe baze de date asociate, cea pentru Spring Batch va trebui să fie definită explicit prin @BatchDataSource. Putem schimba modul de inițializare a bazei de date cu ajutorul proprietății:

spring.batch.jdbc.initialize-schema=always

Spring Boot rulează în mod automat joburile de tip batch când aplicația pornește. Pentru a dezactiva rularea automată a joburilor putem folosi proprietatea:

spring.batch.job.enabled=false

Exemplu

În exemplu am folosit \@SpringBatch pentru a citi date în loturi dintr-o bază de date remote și pentru a le scrie în baza de date a aplicației pentru a scurta timpul de citire. Baza de date a aplicației funcționa ca un cache local pentru cele două tabele (TableA & TableB).

@Configuration
public class BatchConfig {

    private final SimpleJobRepository jobRepository;
    private final JpaTransactionManager 
      transactionManager;

    @Autowired
    private TableARepository tableARepo;

    @Autowired
    private TableBRepository tableBRepo;

    @Value("${spring.remote-datasource.url}")
    private String url;

    @Value("${spring.remote-datasource.password}")
    private String password;

    @Value("${spring.remote-datasource.username}")
    private String username;

    private EntityManagerFactory 
      entityManagerFactory;

   public BatchConfig(
     SimpleJobRepository jobRepository, 
     JpaTransactionManager transactionManager, 
     EntityManagerFactory entityManagerFactory) {
      this.jobRepository = jobRepository;
      this.transactionManager = transactionManager;
      this.entityManagerFactory=entityManagerFactory;
    }

   public DataSource getRemoteDataSource() {
      DataSource dataSource = new DataSource();
      dataSource.setReadOnly(true);
      dataSource.setUser(username);
      dataSource.setPassword(password);
      dataSource.setURL(url);
      return dataSource;
    }

   @Bean
   public Job importJob(Step step1, Step step2) {       
      return new JobBuilder("updateFromRemoteDB",     
                jobRepository)
                .incrementer(new RunIdIncrementer()
                .flow(step1)
                .next(step2)
                .end()
                .build();
    }
    @Bean
    @JobScope
    @Transactional
    public Step step1() {
      try {
        return new StepBuilder("step1", 
   jobRepository)
      .chunk(100, transactionManager)
      .reader(taleARemoteReader())
      .writer(tableAWriter())
      .build();
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
  }
    public JdbcCursorItemReader 
       tableARemoteReader() {
      JdbcCursorItemReader cursorItemReader = 
        new JdbcCursorItemReader<>();

      cursorItemReader
       .setDataSource(getRemoteDataSource());

      cursorItemReader
       .setSql("select * from tableARemote where …");

      cursorItemReader.setRowMapper(
        new TableARowMapper());
        return cursorItemReader;
    }

    @Bean
    public JpaItemWriter tableAWriter() {
      JpaItemWriter writer = 
       new JpaItemWriter<>();

      writer.setEntityManagerFactory(
       Objects.requireNonNull(entityManagerFactory));
        return writer;
    }

    @Bean
    @JobScope
    @Transactional
    public Step step2() {
     try {
      return new StepBuilder("step2", jobRepository)
      .chunk(100, transactionManager)
      .reader(tableBRemoteReader())
      .writer(tableBWriter())
      .build();
      } catch (Exception e) {
           throw new RuntimeException(e);
      }
  }

    public JdbcCursorItemReader 
      tableBRemoteReader() {
    JdbcCursorItemReader cursorItemReader = 
       new JdbcCursorItemReader<>();

       cursorItemReader
         .setDataSource(getRemoteDataSource());

      cursorItemReader
       .setSql("select * from tableBRemote where …");

      cursorItemReader.setRowMapper(
        new TableBRowMapper());
        return cursorItemReader;
    }

    @Bean
    public JpaItemWriter tableBWriter() {
    JpaItemWriter writer = 
      new JpaItemWriter<>();
      writer.setEntityManagerFactory(
       Objects.requireNonNull(entityManagerFactory));

      return writer;
    }
}

Se realizează conexiunea la baza de date remote, după care se execută partea de citire, unde se aplică interogarea bazei de date și ulterior se înfăptuiește scrierea în baza de date a aplicației. Citirea este făcută in chunks de câte 100 de linii. Desigur că acest număr se poate ajusta în funcție de nevoile aplicațiilor voastre.

Rularea joburilor eșuate

Procesarea în loturi funcționează în principal pe baza unor "chunks", care pot eșua din diverse motive. De exemplu, dacă încercăm să facem o actualizare a bazei de date putem să dăm de excepții precum: DeadlockLoserDataAccessException, CannotAcquireLockException etc.În aceste cazuri putem recurge la un retry. Acest lucru îl putem face prin a menționa excepția pentru care vom dori un retry și de câte ori vom dori acest lucru.

Pentru acest retry vom adăuga câteva linii în plus în pasul de execuție:

   @Bean
   @JobScope
   @Transactional
   public Step step() {
    try {
       return new StepBuilder("step", jobRepository)
           .chunk(100, transactionManager)
        .faultTolerant()
        .retryLimit(3)
        .retry(DeadlockLoserDataAccessException
      .class)
        .reader(tableRemoteReader())
        .writer(tableWriter())
        .build();
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
    }

În cazul în care procesarea unui chunk eșuează de trei ori consecutiv, acel chunk va fi considerat nereușit și va trece la următorul (dacă există), iar excepția va fi propagată, ceea ce poate duce la eșecul întregului pas sau job, în funcție de configurația aleasă.

Pentru a împiedica propagarea excepției în cazul în care un chunk eșuează de trei ori, se pot utiliza mai multe mecanisme oferite de Spring Batch. Cea mai comună abordare este mecanismul de skip și skipLimit pentru a ignora anumite excepții după ce numărul de încercări de retry a fost depășit. Skip va ignora excepția, după ce retry-urile eșuează, iar pentru skipLimit, va trebui să specificăm un număr maxim de excepții care pot fi ignorate înainte ca pasul să fie marcat ca eșuat. Exemplu:

    @Bean
    @JobScope
    @Transactional
    public Step step() {
      try {
      return new StepBuilder("step", jobRepository)
       .chunk(100, transactionManager)
       .faultTolerant()
       .retryLimit(3)
       .retry(DeadlockLoserDataAccessException.class)
       .skipLimit(5)
       .skip(DeadlockLoserDataAccessException.class)                 
       .reader(tableRemoteReader())
       .writer(tableWriter())
       .build();
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
  }

Rollback

Când apare o excepție într-un chunk (sau într-o altă parte a unui Step), Spring Batch va face un rollback al tranzacției curente, ceea ce înseamnă că toate modificările făcute în cadrul acelei tranzacții vor fi anulate.

Dacă dorim să controlăm cum sunt gestionate tranzacțiile și să evităm rollbackul la apariția unei anumite excepții, putem folosi noRollback pentru acea excepție. În Spring Batch, un chunk este procesat într-o singură tranzacție. Dacă se folosește noRollback, tranzacția curentă nu va fi anulată pentru excepția specificată, dar tot ce s-a întâmplat în acea tranzacție va fi păstrat. Nu putem controla cât din tranzacție să fie făcut rollback și cât să fie păstrat. Acest scenariu poate duce la situații în care datele sunt parțial procesate, ceea ce poate compromite integritatea datelor din sistem.

Acest mecanism se folosește, de obicei, în cazul în care erorile apărute nu afectează integritatea datelor și dorim să continuăm procesarea fără a anula tranzacția curentă, de exemplu trimiterea unui email sau apelarea unui API extern.

    @Bean
    @JobScope
    @Transactional
    public Step step() {
     try {
      return new StepBuilder("step", jobRepository)
       .chunk(100, transactionManager)
       .faultTolerant()
       .retryLimit(3)
       .retry(DeadlockLoserDataAccessException.class)
       .noRollback(DeadlockLoserDataAccessException
           .class)
       .skipLimit(5)                     
       .skip(DeadlockLoserDataAccessException.class)                     
       .reader(tableRemoteReader())
       .writer(tableWriter())
       .build();
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
    }

Concluzie

Integrarea Spring Batch cu Spring Boot simplifică și mai mult implementarea, automatizând configurările. Exemplele oferite demonstrează cum Spring Batch poate fi adaptat la nevoile specifice ale aplicațiilor.

În concluzie, Spring Batch reprezintă o soluție eficientă pentru procesarea în loturi, oferind instrumentele necesare pentru gestionarea eficientă a sarcinilor de prelucrare a datelor, adaptabilitate și suport pentru gestionarea erorilor, ceea ce îl face un instrument valoros.

Referințe:

  1. https://docs.spring.io/spring-batch/reference/index.html

  2. https://www.baeldung.com/introduction-to-spring-batch

  3. https://www.toptal.com/spring/spring-batch-tutorial

  4. https://shradhayewale.medium.com/spring-batch-processing-overview-371f1d9ee536

Conferință TSM

NUMĂRUL 147 - Automotive

Sponsori

  • Accenture
  • BT Code Crafters
  • Accesa
  • Bosch
  • Betfair
  • MHP
  • BoatyardX
  • .msg systems
  • P3 group
  • Ing Hubs
  • Cognizant Softvision
  • Colors in projects