TSM - Java 8 – Procesarea colecțiilor

Ovidiu Simionica - Team Lead

Java 8 e frumoasă. Da, îi atribui oarecum genul feminin încă dinainte de a ajunge la magica cifra 8. În acest articol mă aventurez în a analiza în ce măsură această frumusețe este formă versus fond. Admit că de-a lungul competiției limbajelor de programare, Java a tot rămas datoare în a-și satisface comunitatea de entuziaști.

Lambda expressions. Da, C# avea Lambda expressions de ceva vreme, mai precis din versiunea 3.0 din anul 2007. Java a mai avut nevoie de încă șapte ani. Functional programming atât de târziu.

Generics. O amăgire în forma către template metaprogramming eșuând prin type erasure sau doar eu speram la altceva?

Mă simt ca un iubitor de Nokia ce visează la high resolution și la Android. Ca tot javrar-ul, vă imaginați cum așteptam forțând butonul de refresh de pe site-ul Oracle pentru a da o descărcare la proaspăta versiune în ziua release-ului.

Aflându-se, la data articolului de față, de ceva vreme pe "masa de operații", iată că am ajuns acum să împărtășesc impresii cu privire la o chestiune ce m-a tot frământat de-a lungul carierei: prelucrarea colecțiilor de date.

Ce este prelucrarea colecțiilor de date (aka filtrarea)?

Pentru cei familiarizați cu SQL, filtrarea este o operație de bază asupra unui set de date de tipul:

SELECT * FROM Cars WHERE 
Cars.manufacturer = "VW";

Dar ce ne facem dacă colecția este deja în memoria programului și vrem să o prelucrăm conform condițiilor de mai sus?

Până la Java 8, programatorul scria:

ArrayList filteredCars = 
  new ArrayList();
 
for(Car c : allCars) {
  if ("VW".equals(
    c.getManufacturer())) {
   
     filteredCars.add(c); 
  }
}

Alții, arhitecți, mai scriau:

interface Predicate {
  boolean apply(T t);  
}

class Filter { 

public static Collection 
 do (final Collection items, 
 Predicate pred) {
 
   Collection result = 
      new ArrayList<>();
  
  for(T item : items) {
      if (pred.apply(item)) {
        result.add(c); 
      }
    }
   
   return result;
  }
}

În Java 8 aplicăm:

new Filter().do( allCars, new Predicate() {
  @Override
  
public boolean apply(Car c) {
    return "VW".equals(c.getManufacturer());
  }
);

Și iată cum Java 8 ne oferă "high resolution".

Dar trăim într-o eră a big data și aplicațiile "real-world" se cer a fi scalabile și performante, ca atare nu mai e suficient să procesăm secvențial milioane de înregistrări (dacă cumva a fost vreodată suficient).

Parallel processing și utilizarea optimă a core-urilor de procesor este "everyday business".

Java 8 ne vine în ajutor și acum putem scrie:

Collection filtered = 
  allCars.stream().filter( c -> ("VW".equals(
    c.getManufacturer())) ).collect(
      Collectors.toList()); 

Doar prin acest simplu apel către metoda "parallel()" mă asigur că librăria de stream va face magie și va împărți stream-ul în bucățele mici ce vor fi procesate în paralel.

Job done. Sau nu?

Îmi place să citesc javadoc-uri, consider că e un obicei ce trebuie cultivat pentru că ne poate salva de la multe dezastre. Și am mai cultivat în timp un fel de "simț al pericolului", în special când văd cuvinte mari de genul "parallel", urmate apoi de magia rezultatului.

Ce fire de execuție folosește metoda aceasta? Și folosește destule? Cum determină câte să folosească și ce se întâmplă dacă metoda "parallel()" este apelată paralel la rândul ei?

Documentația spune: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()."

Prin urmare, implicit folosim un singur pool indiferent de câte thread-uri apelează metoda "parallel()" prin întreaga aplicație, deoarece librăria de stream folosește librăria de ForkJoin.

Și mai mult, thread-ul care trimite job-ul de procesare paralelă este el însuși utilizat ca worker. Se mixează astfel thread-urile unui pool cu un thread ce are alt scop. Dacă cumva chiar acel thread prinde o porțiune de procesare ce durează neașteptat de mult, avem riscul unei blocări a procesărilor din cadrul pool-ului tocmai din cauza design-ului conceptului de Fork/Join (thread-ul apelant lucrează ca worker și celelalte thread-uri nu pot adăuga rezultate câtă vreme se așteaptă după finalizarea execuției thread-ului părinte). Avem o problemă!

Avem de a face aici cu fenomenul de Paraquential "[a portmanteau word derived by combining parallel with sequential] The illusion of parallelization. Processing starts out in parallel mode by engaging multiple threads but quickly reverts to sequential computing by restricting further thread commitment." (Edward Hardned, 2014).

Soluția propusă de Oracle este utilizarea explicită a unui ForkJoinPool controlat de către dezvoltator. De tipul:

Collection filtered = 
  allCars. parallel().filter( c -> 
    ("VW".equals(c.getManufacturer())) ).
     collect(Collectors.toList());

Astfel toate task-urile generate de procesarea paralelă rămân în pool-ul specificat.

Un efect pozitiv este acela că putem folosi un timeout pe metoda get(); situație dorită de obicei într-o aplicație real-world.

Dar aceasta ne întoarce la problema de bază: managementul pool-urilor din nou în responsabilitatea dezvoltatorului! Situația devine mai dificilă în combinație cu situațiile complexe generate de mediul multi-thread și cu reglarea atentă a configurărilor în funcție de arhitectura hardware (e.g. procesor). Și iată cum din nou Java ne dă jumătăți de măsură pe când speram la un thread container self-managed (sau măcar easy-managed).

Chiar dacă pentru mulți dintre noi comportamentul implicit al framework-ului de streaming este și va fi mai mult decât suficient, în ceea ce mă privește, metoda parallel va rămâne în lista mea de "dangerous code" în activitățile de programare și code review.

Am atins doar vârful iceberg-ului prin această scurtă analiză.Pentru cei dintre voi curioși să vadă ce alte capcane se ascund chiar în inima librăriei de ForkJoin, vă invit să urmăriți cu atenție un dude cu peste 30 de ani de experiență, ce detaliază foarte bine aceste lucruri în postul său A Java Parallel calamity.