În acest articol vă prezentăm interesanta lume a procesării de streamuri de date. Problematicile abordate se referă la:
Principalul obiectiv al sistemelor de streaming este procesarea volumelor mari (potențial infinite) de date (Big Data sau chiar Global Scale Data) și extragerea de informații utile din ele, înainte de a le stoca undeva în scop de arhivare sau procesare ulterioară.
Abordarea tradițională pentru a procesa astfel de volume de date este batchingul (procesarea în loturi). De exemplu, o bancă își salvează toate tranzacțiile pe un suport de date în timpul zilei, iar după ora închiderii le procesează offline, ca un singur lot imens. Dacă apare o eroare sau se întâmplă ceva neașteptat, întregul lot poate fi pur și simplu reprocesat. Procesul e simplu și robust, dar introduce un delay imens între colectarea datelor și momentul în care pot fi luate decizii bazate pe ele.
Scopul procesării de streamuri este exact eliminarea acestei întârzieri. Nu numai că se procesează datele raw într-o manieră real-time, dar facilitează și procesarea incrementală și oferă scalabilitate și toleranță la erori.
Procesatul de streamuri ar trebui folosit în acele sisteme cu volum mare de date unde contează rapiditatea obținerii rezultatelor. Cu alte cuvinte, acolo unde valoarea informațiilor din fluxul de date descrește rapid pe măsură ce trece timpul.
Exemple:
Procesarea în loturi ne forțează să împărțim datele în bucăți izolate. Un exemplu ar fi un utilizator care navighează o pagină web. Să presupunem că toate analizele făcute pe traficul site-ului se întâmplă în batchuri zilnice. Ce se întâmplă cu utilizatorii care navighează în jur de miez de noapte? Activitatea lor se împarte în două, fiecare batch prinde numai o parte și corelările se pierd.
Pe de altă parte, prin definiție, streamurile sunt nemărginite.
Pe lângă celelalte avantaje, procesatul datelor în momentul în care apar, duce și la distribuția uniformă a volumului de muncă.
Un stream constă dintr-o secvență de înregistrări. Fiecare înregistrare conține informații despre un eveniment, cum ar fi accesul unui utilizator la o pagină de web, un update de temperatură de la un senzor IoT sau o tranzacție financiară de pe o bursă. Înregistrările reprezintă evenimente din trecut, fiind astfel fixe. Fluxul lor, ceea ce numim noi stream, este potențial infinit, iar evenimentele continuă să se întâmple fără un final previzibil.
Aplicația de procesare de streamuri oferă o vedere de ansamblu asupra fluxului, aducând informații referitoare la: numărul curent de utilizatori care accesează o pagină web, temperatura maximă înregistrată de un senzor în ultima oră și așa mai departe. Această funcționalitate se realizează efectiv prin aplicarea unor transformări asupra fluxului de date. Multiple modificări pot fi compuse pentru a determina altele mai complexe.
Dacă transformările reprezintă nucleul unui sistem de procesare de streamuri, interfața acestuia este formată din surse și sinkuri de date. Conectoare de surse sunt folosite pentru a lega aplicația la sisteme care o alimentează cu date, cum ar fi de exemplu, Apache Kafka, JMS broker sau orice sistem legacy capabil de asta. Pe de altă parte, conectoarele de sinkuri au rolul de a pasa mai departe datele transformate pentru analize ulterioare sau stocare. Practic vorbim despre intrările și ieșirile sistemului. O aplicație de procesat streamuri poate citi date din multiple surse și poate salva date derivate în multiple sinkuri.
Aceste aplicații rulează pe engine-uri și platforme specializate, permițând utilizatorului să se concentreze asupra business logicului, respectiv a transformărilor, în loc de tot felul de detalii tehnice ale sistemelor distribuite.
Pentru a construi o astfel de aplicație este nevoie de următorii pași:
Haideți să vedem un exemplu concret prin care putem ilustra modul de gândire caracteristic domeniului. Word Count este o aplicație care are ca scop număratul de cuvinte dintr-un șir de linii de text. Pornim de la un caz simplu unde liniile de text sunt stocate într-o listă finită, cum ar fi de exemplu un ArrayList. Cine e familiar cu Stream API-ul introdus in Java 8 știe că problema poate fi rezolvată prin următorul cod:
Map counts =
lines.stream()
.flatMap(line -> Arrays.stream(line.toLowercase().split("\\W+")))
.filter(word -> !word.isEmpty())
.collect(Collectors.groupingBy(word -> word, Collectors.counting()));
Dar acest cod are limitări severe: în primul rând nu poate beneficia de nici un fel de paralelism, nici măcar de execuție pe thread-uri multiple, nemaivorbind de mașini multiple. Tot ceea ce face Java cu acest cod, este să-l transforme și să ruleze pe un singur thread ceva de genul:
List lines = someExistingList();
Map counts = new HashMap<>();
for (String line : lines) {
for (String word : line.toLowercase().split("\\W+")) {
if (!word.isEmpty()) {
counts.merge (word,1L,(count, one)->count+one);
}
}
}
Să vedem cum ar arăta soluția în Hazelcast Jet, unul dintre sistemele de stream processing existente în momentul de față:
Pattern delimiter = Pattern.compile("\\W+");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.map("book-lines"))
.flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowercase())))
.filter(word -> !word.isEmpty())
.groupingKey(wholeitem())
.aggregate(AggregateOperations.counting())
.drainTo(Sinks.map("counts"));
Arată similar, cu un efort al programatorului comparabil, dar cu capabilități radical diferite. Jet transformă acest cod într-un graf aciclic direcționat (DAG - Direct Acyclic Graph) cu un grad de paralelism ridicat. Haideți să vedem cum, pornind de la un model serial.
Etapele de procesare necesare pot fi descrise într-un mod simplu, așa:
Doar prin această modelare simplă, putem deja vedea cum codul se sparge în multipli pași izolați cu interfețe de date clare între ele. Putem folosi câte unthread separat pentru fiecare pas:
// Source thread
for (String line readLines()) {
emit(line);
}
// Tokenizer thread
for (String line : receive()) {
for (String word:line.tolowercase().split("\\W+")){
if (!word.isEmpty()) {
emit(word);
}
}
}
// Accumulator thread
Map counts = new HashMap<>();
for (String word : receive()) {
counts.merge(word, 1L, (count, one) -> count + one);
}
// finally, when done receiving:
for (Entry
wordAndcount counts.entryset()) {
emit(wordAndCount);
}
Diferitele etape de procesare comunică între ele cu ajutorul unor cozi paralele (concurrent queues) ceea ce rezultă într-o arhitectură pipeline:
Această abordare ne permite acum, să folosim mai multe nuclee de procesor, însă în continuare suntem limitați de numărul de noduri din DAG. Vom putea folosi doar 2-3 nuclee, indiferent câte sunt disponibile. Pentru a rezolva această problemă, va trebui să putem paraleliza și munca dintr-un singur nucleu.
Să atacăm prima dată munca tokenizatorului. Acest lucru e simplu, dat fiind că avem de a face cu o așa numită problemă "jenant paralelizabilă", deoarece procesarea fiecărei linii este total independentă de procesarea oricărei alte linii. Tot ceea ce trebuie să facem e să începem să diferențiem un "nod" de DAG de un "procesor de nod" din DAG. Într-un DAG pot fi multipli procesori, care realizează munca unui singur nod. Să adăugăm un al doilea procesor pentru nodul tokenizator.
Acum sursa poate utiliza procesorii de tokenizare ca un pool și poate preda linii la oricare din ele, în funcție de care are coada de intrare mai scurtă.
Următorul pas este paralelizarea acumulatorului, dar acest lucru e un pic mai dificil. Acumulatorii numără cuvinte și dacă le utilizăm ca un pool de procesoare, atunci fiecare va vedea instanțe ale aproape fiecărui cuvânt, numărătoarea va fi parțială și rezultatele vor trebui combinate. O strategie comună pentru a rezolva această problemă e să ne asigurăm că toate instanțele unui anumit cuvânt vor ajunge la același procesor acumulator. Procedeul se numește partiționare de date, realizându-se în Hazelcast Jet prin folosirea unor muchii speciale de partajare între nodurile tokenizator și acumulator. În momentul în care un cuvânt este emis de un tokenizator, el trece printr-un switchboard și ajunge rutat la acumulatorul potrivit. De exemplu: cu doi acumulatori putem calcula hashcode-ul cuvântului și putem folosi cel mai nesemnificativ bit pentru a alege acumulatorul 1 sau 0.
În acest moment, avem un plan pentru un procedeu de calcul complet paralelizat, care poate utiliza toate procesoarele disponibile pe o mașină, presupunând că inițiem suficiente noduri de tokenizare si acumulare. Următoarea provocare este să putem utiliza mașini multiple (să nu uităm că scopul este de a putea procesa un volum foarte mare de date).
În primul rând, sursa nu va mai putea fi o simplă listă aflată în memorie pentru că acesta ar însemna că fiecare mașină procesează aceleași date (nemaivorbind de faptul că ne limităm la seturi de date care încap în memoria unei singure mașini). Pentru a exploata un cluster de mașini ca o singură resursă computațională, fiecare membru al lui trebuie să poată să observe doar o parte (partiție) a datelor de intrare. În Hazelcast Jet, acest lucru se realizează prin folosirea unor structuri de date distribuite, preluate din Hazelcast IMDG, dar și alte sisteme de _stream procesing_oferă soluții. Dar în acest articol, nu vom intra în prea multe detalii.
Problema pe care vrem s-o detaliem este situația muchiilor de partiționare într-un astfel de setup de mașini multiple. Fiecare acumulator ar trebui să primească subsetul lui propriu de cuvinte. Aceasta înseamnă că pe măsură ce folosim din ce în ce mai multe mașini, va trebui ca din ce în ce mai multe cuvinte să treacă prin rețea ca să ajungă la acumulatorul potrivit. Sistemul nu s-ar putea scala corespunzător. Soluția este introducerea unui nou tip de nod, numit combinator. Ideea este că fiecare mașină își calculează numerele de apariții de cuvinte locale, iar aceste rezultate parțiale vor fi combinate la rândul lor de nou tip de nod. Diferența de performanță provine din faptul că pe rețea nu va circula un mare număr de cuvinte, ci un număr mic de rezultate parțiale.
În Hazelcast Jet muchiile dintre noduri pot fi nu numai partiționate sau nu, ci și locale sau distribuite. Între tokenizatoare și acumulatoare avem muchii partiționate locale, iar între acumulatoare și combinatoare vom avea muchii partiționate distribuite. Codul de procesare al unui combinator arată cam așa:
// Combining vertex
Map combined = new HashMap<>();
for (Entry wordAndCount : receive()) {
combined.merge(wordAndCount.getKey(),
wordAndCount.getValue(),
(accCount, newCount) -> accCount + newCount);
}
// finally, when dane receiving:
for (Entry
wordAndCount combined.entrySet()) {
emit(wordAndCount);
}
Prin exemplul numărării de cuvinte, am vrut să ilustrăm genul de probleme cu care se confruntă sistemele de stream processing. În cele ce urmează, vrem să prezentăm modul în care business logicul acestor aplicații poate fi definit într-o manieră care să permită ca aceste sisteme să-și poată exercita magia.
Din cauza naturii distribuite a procesării făcute de către aceste sisteme, nu putem să folosim simplu cod
imperativ pentru a specifica logica de aplicat asupra datelor. Trebuie s-o descriem declarativ. Acesta este motivul pentru care aplicațiile de _stream processing_folosesc multe din principiile programării funcționale și pot părea destul de asemănătoare cu API-ul de Java Streams introdus de Java 8. Similitudinile sunt însă, de multe ori doar superficiale.
Să luăm ca exemplu, Pipeline API-ul oferit de Hazelcast Jet. Acesta permite definirea unui pipeline de procesare prin compunerea unor etape individuale, fiecare efectuând o singură transformare bine definită. Pipeline API-ul este un mod high-level de a defini logica de calcul. De fapt, în spate, acesta va construi un graf de procesare low-level, așa cum am văzut în cazul numărătorii de cuvinte. Dar utilizatorul nu trebuie să-și bată capul cu aceste detalii.
Am văzut deja, cum arată codul pentru numărătoarea de cuvinte în Pipeline API:
Pattern delimiter = Pattern.compile("\\W+");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.map("book-lines"))
.flatMap(e -> traverseArray(delimiter.split(
e.getValue().toLowercase())))
.filter(word -> !word.isEmpty())
.groupingKey(wholeitem())
.aggregate(AggregateOperations.counting())
.drainTo(Sinks.map("counts"));
În el putem vedea atât transformări simple, care procesează elementele fluxului de date independent una de alta (map, filter, flatMap), cât și transformări mai complexe, care trebuie să țină evidența unor stări care depind de multiple înregistrări. De exemplu, când numărăm cuvinte, trebuie să ținem minte contorul curent.
Dintre transformările cu stare exemplificăm:
În cazul general, stările transformărilor complexe sunt afectate de toate elementele observate în fluxul de date, toate elementele ajungând să fie implicate în calcule. De obicei, însă, informațiile de care avem nevoie să fie extrase sunt de genul "statistica ultimelor X secunde", nu "statistica de când a fost pornită aplicația". (Să nu uităm că fluxul de date se presupune a fi infinit). Acesta este momentul când conceptul "ferestrelor de date" intră în imagine. Ele definesc domeniul agregărilor într-o manieră utilă.
Ferestrele de date ne oferă o perspectivă finită și îngrădită a unui flux infinit de date. Fereastra definește cum pot fi selectate doar anumite elemente din flux și cum pot fi grupate împreună într-o manieră utilă. Transformările sunt executate doar asupra elementelor din fereastră.
O variantă foarte simplă de fereastră este cea de tip "tumbling", adică una care "se rostogolește". Împarte streamul continuu în părți complet separate, care nu se suprapun și sunt lipite una de alta (a se vedea figura de mai jos). Fereastra este de obicei, specificată printr-o perioadă de timp sau număr de elemente. Exemple: "numărul acceselor unui sistem pe minut", "scorul maxim într-un joc per 1000 de rezultate".
Un alt tip de fereastră, de dimensiune fixă, este cea de tip "sliding". Diferența este însă, că la acest model, ferestrele consecutive se pot suprapune (a se vedea al doilea desen din figura următoare). Fereastra se definește prin dimensiune și pas. Exemplu: "numărul acceselor unui sistem în ultimul minut cu update-uri la fiecare zece secunde".
Al treilea model de fereastră e cea de tip "sesiune". O sesiune reprezintă o perioadă de activitate urmată de una de inactivitate. O fereastră de acest fel colectează toată activitatea care aparține unei sesiuni. Diferența față de tipurile de fereastră discutate până acum, este că aceasta din urmă nu are o lungime fixă. Lungimea este determinată de date.
Această figură ilustrează diferitele tipuri de fereastră discutate:
Trebuie menționat că ferestrele pot fi atât globale, cât și bazate pe chei, acestea din urmă conținând doar acele elemente din fluxul de date care au o anumită cheie.
De cele mai multe ori, un element al fluxului de date reprezintă un eveniment petrecut în lumea reală cu propriul timestamp și acest "timp de eveniment" poate foarte ușor să difere radical de momentul de timp la care evenimentul ajunge la procesare. Acest lucru se poate întâmpla din varii motive:
În unele sisteme, e suficient să se folosească doar timpul de procesare. De exemplu: când accentul este pus pe minimizarea latenței și nu se așteaptă după elemente întârziate. Sau când elementele au un timestamp propriu, dar sursa lor nu este de încredere și atunci se preferă să fie ignorate în favoarea momentului când apar în sistem.
Dar în multe cazuri, timestampurile evenimentului nu pot fi ignorate, corectitudinea rezultatului depinzând de ele. Aceste situații sunt dificil de tratat. Specificația pipeline-ului de procesare trebuie să includă extragerea timestampului din evenimente și de asemenea, fixarea unei durate de așteptare a elementelor. Dacă se așteaptă mai mult, descrește probabilitatea ca elementele întârziate să fie ignorate. Dacă se așteaptă mai puțin, atunci sistemul va oferi rezultate mai rapide (descrește latența) și inclusiv consumul de resurse e mai mic. ( Adică mai puține date, trebuie stocate în buffere temporare.)
Într-un sistem distribuit, nu există o limită superioară a întârzierii evenimentelor. Să ne imaginăm un caz extrem, unde elementele fluxului de date sunt scoruri dintr-un joc de pe telefoane mobile. Jucătorul zboară cu avionul, dispozitivul este pus în _flight mode și scorurile nu pot fi trimise la procesare. Ce se întâmplă dacă dispozitivul rămâne setat pe flight mode timp de o săptămână? Totuși, evenimentul s-a întâmplat și e ușor de remarcat că acuratețea perfectă ar necesita ca fereastra din care face parte să rămână în așteptare o perioadă potențial infinită.
Este clar că în practică nu se poate face așa ceva. Frameworkurile de procesare de streamuri folosesc algoritmi euristici pentru a oferi estimări ale completitudinii ferestrelor de procesare. Ele se mai numesc și watermarkuri. În acest articol, nu intrăm în detalii cu privire la acest subiect.
O provocare majoră a procesării fluxurilor de date infinite o reprezintă protejarea stărilor interne de inevitabile defecțiuni ale sistemului distribuit. Acele date din flux care au intrat deja în sistem, dar încă nu au fost emise informațiile derivate din ele, riscă să dispară permanent dacă sistemul de procesare eșuează. Pentru a preveni astfel de pierderi, elementele originale ale intrării pot fi stocate până când rezultatele procesării lor ajung să fie emise. În acest caz, după ce sistemul e restabilit, datele de intrare stocate pot fi re-procesate. Stările interne ale sistemului de procesare pot fi și ele persistate.
Însă, în ambele cazuri, provocarea e mult mai mare dacă cerințele pentru corectitudine sunt stricte. Aceasta înseamnă că absolut fiecare element (al fluxului sau al stărilor interne) trebuie ținut în evidență și în fiecare stadiu de procesare, trebuie să știm exact dacă respectivul element a fost sau nu procesat. Această restricție se numește "garanția procesării exact o singură dată" (_exactly-once processing). O variantă mai relaxată și mai practică este "procesarea măcar o singură dată" (at-least-once processing), când e permis ca sistemul să reproceseze elemente de intrare care poate au mai fost procesate anterior.
Garanția "exact o singură dată" poate fi oferită doar cu un cost semnificativ de latență, throughput și spațiu de stocare. Dacă în schimb se poate găsi o variantă idempotentă a funcției de procesare, garanția mult mai ieftină de "măcar o dată" va fi suficientă, deoarece procesarea multiplă a unor elemente va produce același rezultat ca și procesarea lor unică. (De fapt, aceasta înseamnă funcție idempotentă).
Majoritatea sistemelor curente de stream processing suportă garanția exactly-once, dar merită să ne gândim un pic, dacă situația noastră de utilizare nu poate tolera un pic de duplicare în caz de erori. Diferența de cost a celor două variante poate fi enormă.
De exemplu: Într-un sistem care procesează loguri de acces și detectează tipare de fraudă verificând milioane de evenimente pe secundă, duplicările minore nu prea deranjează. Aici duplicările pot provoca doar niște detecții pozitive false. Pe de altă parte, într-un sistem financiar consistența perfectă este absolut obligatorie.
O aplicație de procesare de streamuri accesează surse și sinkuri de date prin conectori. Acestea sunt nodurile computaționale care au contact direct cu lumea exterioară.
Cu toate că acești conectori fac tot posibilul pentru a uniformiza multe tipuri de resurse sub umbrela aceleiași paradigme de flux de date, există totuși multe aspecte care necesită atenția noastră și care pot limita ceea ce putem face cu sistemul nostru de procesare.
Când vrem să construim un job computațional, prima decizie pe care trebuie s-o luăm este dacă vom avea de-a face cu un flux de date finit sau nu.
Seturi de date finite sunt tratate prin joburi de tip batch, existând mult mai puține aspecte de luat în considerare. Nu trebuie să ne gândim la ferestre de date și la evenimente întârziate. Exemple pentru astfel de date finite ar fi fișierele de pe disk, HDFS sau rezultatele unor interogări de baze de date.
Seturile de date infinite permit procesări continue, dar costul este că trebuie să definim ferestre. Unele operații nu pot fie efectuate pe un număr infinit de elemente (sume, medii, sortări, etc.). Pentru streamuri infinite de obicei, sursa preferată este Apache Kafka. Unele baze de date pot fi și ele transformate în surse infinite de date prin expunerea jurnalului, adică a fluxului tuturor schimbărilor efectuate pe baza de date în timp.
Hazelcast Jet de exemplu poate foarte ușor crea o sursă din jurnalul unei mape distribuite oferite de Hazelcast IMDG:
Pipeline p = Pipeline.create();
StreamSourceStage>
fromMap = p.drawFrom(
Sources.mapJournal("inputMap",
START_FROM_CURRENT));
Pentru toleranță la erori, avem nevoie în general ca sursa de date să suporte reluarea (replay-ul) datelor. Dacă apare orice defecțiune, datele pot fi reluate din sursă (parțial sau total) și calculul poate fi repetat.
Sursele finite suportă de obicei, reluarea (fișiere, HDFS). Reluarea datelor infinite este limitată de dimensiunea spațiului de stocare disponibil. De exemplu, Apache Kafka oferă replay, dar cu anumite limitări. Pe de altă parte, unele surse pot fi citite doar o singură dată, de exemplu socketuri TCP și cozi JMS.
Un alt aspect al replay-ului este că în multe situații nu ar fi prea practic să se poată face doar de la începutul fluxului de date. Din acest motiv, este nevoie de așa numitul checkpointing, adică capabilitatea de a putea relua datele doar de la un anumit punct (offset) specificat de noi. Atât Kafka cât și Hazelcast Event Journal suportă această funcționalitate.
Un sistem computațional distribuit preferă să lucreze cu surse de date distribuite pentru a maximiza performanța. Dacă resursa nu-i distribuită, atunci va trebui ca toți membrii clusterului de procesare să se lupte pentru acces shared. În acest fel, scalabilitatea va fi practic non-existentă. Kafka, HDFS și Hazelcast IMap sunt toate distribuite. Cu toate acestea, un fișier nu e distribuit, dar poate să aibă copii identice în mai multe locuri, iar diferiți membri ai clusterului pot citi diferite părți din el.
După ce am luat la cunoștință despre ceea ce înseamnă un sistem de procesare de streamuri , remarcăm că sistemele tradiționale de procesare în loturi nu reprezintă altceva decât o implementare primitivă al unui caz special de streaming. Aceea cu sursă finită și cu o singură fereastră globală.
Ușurința de utilizare, toleranța la erori și performanța ridicată a sistemelor de streaming nu pot fi negate nici de cei mai înrăiți fani ai batchingului .