Accesul la platforme care oferă resurse de calcul nelimitate a adus o mulțime de soluții, dar și un nou set de probleme care necesită noi soluții inovatoare. Algoritmii care se ocupă de procesare big data doresc să integreze în aceste noi platforme vechi soluții care nu sunt perfect compatibile.
Acest articol dorește să prezinte analiza unor posibile soluții menite să depășească limitările și provocările ridicate de procesarea secvențială a datelor cronologice, dependente de starea lor curentă.
Datele constituie un mijloc care ajută la obținerea rezultatelor dorite, fiind colectate din mai multe surse la o frecvență înaltă și la scară largă, creând nevoia de tehnologii capabile să achiziționeze, să proceseze și să analizeze date mari.
Databricks este o platformă de analiză unificată, peste Apache Spark, care accelerează inovația prin unificarea domeniilor precum știința datelor, inginerie, afaceri și un motor foarte rapid și ușor de utilizat în contextul de big data și machine learning [1].
Scopul acestui articol este de a evidenția un set de soluții posibile atunci când prelucrarea unor date foarte mari este necesară și dependentă de informații anterioare. Propunerile noastre se vor baza pe capacitățile puse la dispoziție de către Azure Databricks, care lucrează în principal cu Databricks Utilities și funcții Spark în cadrul unui notebook orchestrate într-un job.
Una dintre cele mai mari provocări atunci când avem de-a face cu foarte multe date este chiar dimensiunea datelor. Prin urmare, diferite instrumente și strategii sunt necesare pentru a putea procesa datele într-un mod determinist având totodată resurse limitate.
Mai mult decât atât, atunci când execuția pe date istorice intră în discuție, acest lucru adaugă un efort suplimentar în conturarea celei mai bune soluții de prelucrare a datelor. Seriile de date cronologice necesită de obicei păstrarea ordinii în execuție, permițând urmărirea modificărilor în timp. Ca o consecință a acestui fapt, proiectarea unui sistem stateful trebuie să fie luată în considerare pentru a îndeplini această cerință. În secțiunile următoare, această provocare va fi descrisă ca stare, adică o anumită informație din seriile de date procesate anterior.
Deoarece python este limbajul cel mai comun în lumea unui analist de date, spark a adăugat un Python API din versiunea 0.7 care oferă suport în a integra funcții definite de utilizator (user defined functions - UDFs). Aceste Pandas UDFs folosesc Apache Arrow , rolul lui principal fiind de a translata dintr-un spark data frame într-un pandas data frame. Limita de memorie este setată la valoarea maximă a unui int32, pentru versiunile mai mici de 0.16 și int64, pentru versiunile mai mari, ceea ce înseamnă că dimensiunea necomprimată a unei partiții trebuie să se încadreze în aceste limite.
Memoria de care dispune un spark driver este limitată, colectarea tuturor datelor pe driver nefiind considerată o bună practică. Partiționarea orizontală a seriilor de date cronologice este necesară în acest sens, iar utilizarea funcționalității spark lazy evaluation este obligatorie.
Prima provocare pe care a trebuit să o rezolvăm a fost limitarea cantității de date care trece prin setul de algoritmi de procesare, păstrând totodată cronologia datelor. În cazul nostru, o partiție este un set de rânduri identificate unic prin momentul achiziției datelor. Din cauză că nu toate partițiile acoperă același interval de timp, a apărut nevoia implementării unui mecanism care să aleagă dinamic următoarea secvență de date neprocesate. Dimensiunea acestui subset este de o zi, adică totalitatea datelor ce au fost achiziționate la aceeași dată calendaristică, între ora 00:00:00.000 și ora 23:59:59.999. Algoritmii de procesare aveau deja implementată noțiunea de stare, dar pentru a obține cele menționate anterior mai este nevoie de câteva funcționalități:
să păstrăm informația cu privire la momentul de timp până când datele au fost deja procesate; selectarea subsetului de date aferent zilei următoare, după ultima zi procesată;
Figura 1
În Figura 1 se poate observa distribuția partițiilor referitoare la timp și ceea ce am dorit să obținem: toate pătratele marcate cu Ziua 1 din fiecare partiție trebuie să fie procesate concomitent, înaintea oricărei alte zile din fiecare partiție. Toate pătratele din Ziua 2 trebuie să le urmeze imediat și așa mai departe.
Pentru a putea păstra informația cu privire la ultima zi de date procesată și pentru a putea asigura continuitatea procesării, tabelului deja existent, ce ținea starea internă a algoritmilor, i s-a adăugat o coloană ce păstrează ultima zi procesată (Figura 2).
Figura 2
În pasul 1 din figura precedentă, se face un join între datele din date de intrare și date de stare, iar datele care au dată mai mică decât dată stare sunt excluse din fiecare partiție în pasul 2.
În acest moment datele sunt gata a fi procesate (pasul 3). La finalul procesării, tabelele ce conțin date de ieșire și cele care conțin starea sunt actualizate, așa cum este menționat în pasul 4.
Ultima parte a soluției este trecerea la următoarele zile de date sau retriggering. În continuare vom prezenta mai multe modalități prin care acest lucru poate fi obținut.
Prima implementare se folosește de funcții din dbutils (Databricks Utilities) . Ideea era să lăsăm notebookul să se auto-apeleze până când nu mai sunt date de procesat, și apoi să trecem la următorul. Mai jos este semnătura metodei run:
run( path: String,
timeoutSeconds: int,
arguments: Map)
Al doilea argument al funcției run() exprimă cât timp ar trebui notebookul ce face apelul să aștepte după notebookul apelat să termine. Acest parametru a fost setat la 0, ceea ce înseamnă așteptare infinită, care a creat un lanț de notebookuri ce așteptau unul după altul la nesfârșit. Un dezavantaj al acestei soluții este lipsa de trasabilitate a sistemului și îngreunarea proceselor.
Cea de-a doua implementare este foarte asemănătoare la nivel conceptual, diferența fiind că la finalul unui job se va lansa o altă execuție a aceluiași job, însă cu parametrii diferiți, primul încheindu-și execuția.
Pentru a obține asta, toți algoritmii de procesare au devenit parte a aceluiași job în calitate de task, fiecare trebuind să-și gestioneze propria stare internă, iar ultimul task având responsabilitatea de a porni următorul job pe baza informațiilor primite de la taskurile anterioare. Pentru a putea transmite informația între taskuri am folosit: taskValues , care face parte din databricks-utils. Oricare dintre taskurile aflate în același job pot să trimită informații către celelalte.
dbutils.jobs.taskValues.set(
key = "finished",
value = True)
Toate task-urile următoare pot evita execuția folosindu-se de aceste informații:
isFinished = dbutils.jobs.taskValues
.get(taskKey = "task_that_wrote_this",
key = "finished",
default = False)
Ultimul task va decide dacă este necesar să pornească următorul job, evaluând valoarea acestei variabile isFinished. Ulterior, să trimită comanda de pornire folosind un HTTP POST request către jobs/run-now prin Azure Databricks API [8]. Pentru a putea obține repornirea este nevoie de job id, care poate fi obținut astfel:
context_str = dbutils.notebook.entry_point
.getDbutils()
.notebook()
.getContext()
.toJson() context = json.loads(context_str)
job_id = context.get('tags', {})
.get('jobId', None)
Figura 3
În Figura 3 este prezentat acest mecanism. Răspunsul primit în urma requestului HTTP conține un run_id care poate fi folosit pentru a îmbunătăți trasabilitatea sistemului.
Note:
{"run_id": 71407712, "number_in_job": 71407712, "job_id": "923771151139276"}
- pentru a putea mări trasabilitatea sistemului răspunsului i s-a adăugat un job_id
;
un avantaj al acestei implementări este că datorită stărilor interne ale fiecărui algoritm și task, sistemul are mereu un punct din care să continue în cazul în care ceva eșuează;
sistemul este mai ușor de urmărit datorită returnării variabilei run_id
;
deoarece următoarea porțiune de date ce trebuie procesată este calculată dinamic, adăugarea în timpul execuției a unor date noi va permite procesarea acestora în ordine cronologică, fără a se pierde;
Principala problemă la utilizarea metodei run(), în implementarea anterioară, a fost lanțul de notebookuri obținut. O altă soluție ar fi utilizarea acestei metode într-un mod în care apelarea recursivă să fie evitată, folosindu-se două notebookuri.
Această abordare se bazează pe următorii pași:
primul notebook, numit notebook orchestrator obține intervalul în care datele trebuie să fie procesate, ca fiind intervalul dintre cea mai veche și cea mai nouă dată din tabelul de intrare;
intervalul selectat este împărățit în zile, similar cu abordarea din soluția anterioară, iar apoi cel de-al doilea notebook este apelat pentru fiecare zi obținută;
Figura 4 arată modalitatea în care datele vor fi procesate, folosindu-se de această abordare.
Figura 4
Această abordare nu a fost folosită pe termen lung deoarece:
partițiile nu au aceeași lungime și nu sunt sincronizate; această abordare va face ca cele mai scurte partiții să fie procesate în același ritm cu cea mai lungă dintre ele, în timp ce abordarea anterioară le va termina pe cele mai scurte mai repede, datorită dinamicității;
în cazul în care o partiție este adăugată după ce execuția a început deja, aceasta conținând date mai vechi decât data la care a ajuns execuția, acele date nu vor fi niciodată procesate;
Databricks oferă suport pentru table streaming mechanism, procesarea datelor fiind posibilă la nivel de 'micro-batches'. Dimensiunea unui batch poate fi controlată prin setarea variabilelor maxBytesPerTrigger sau maxFilesPerTrigger.
În momentul în care procesarea datelor în ordine cronologică este necesară, această metodă are o limitare. Ordinea în care librăria procesează datele este impusă de timpul când documentele care conțin datele au fost actualizate/salvate în tabel. Se poate forța un anumit tip de ordonare, setând withEventTimeOrder, dar nu poate fi impusă pe baza niciunei coloane din tabel, acest lucru aplicându-se și în cazul dimensiunii unui batch.
Cea mai comună soluție care a fost investigată a fost utilizarea unui mecanism de iterare, care ar lua rând pe rând datele din fiecare partiție, împărțite pe zile, ar executa algoritmii de procesare pe ele, ar salva stările și ar continua mai departe.
Folosirea unei astfel de metode ar încălca oarecum scopul pentru care spark a fost conceput, introducând un timp de execuție adițional pentru fiecare dintre algoritmii de procesare.
Am reușit să depășim provocările și limitările descrise prin furnizarea unui set de soluții care pot fi aplicate cu ușurință în funcție de anumite cerințe, folosind funcționalitățile care ne sunt puse la dispoziție de mediul în care lucrăm.
https://www.databricks.com/spark/getting-started-with-apache-spark
https://learn.microsoft.com/en-us/azure/databricks/dev-tools/databricks-utils
https://learn.microsoft.com/en-us/azure/databricks/workflows/jobs/jobs
https://www.databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
https://www.databricks.com/glossary/what-are-spark-applications
https://learn.microsoft.com/en-us/azure/databricks/dev-tools/databricks-utils#dbutils-jobs- taskvalues
https://learn.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/jobs