ABONAMENTE VIDEO REDACȚIA
RO
EN
NOU
Numărul 140
Numărul 139 Numărul 138 Numărul 137 Numărul 136 Numărul 135 Numărul 134 Numărul 133 Numărul 132 Numărul 131 Numărul 130 Numărul 129 Numărul 128 Numărul 127 Numărul 126 Numărul 125 Numărul 124 Numărul 123 Numărul 122 Numărul 121 Numărul 120 Numărul 119 Numărul 118 Numărul 117 Numărul 116 Numărul 115 Numărul 114 Numărul 113 Numărul 112 Numărul 111 Numărul 110 Numărul 109 Numărul 108 Numărul 107 Numărul 106 Numărul 105 Numărul 104 Numărul 103 Numărul 102 Numărul 101 Numărul 100 Numărul 99 Numărul 98 Numărul 97 Numărul 96 Numărul 95 Numărul 94 Numărul 93 Numărul 92 Numărul 91 Numărul 90 Numărul 89 Numărul 88 Numărul 87 Numărul 86 Numărul 85 Numărul 84 Numărul 83 Numărul 82 Numărul 81 Numărul 80 Numărul 79 Numărul 78 Numărul 77 Numărul 76 Numărul 75 Numărul 74 Numărul 73 Numărul 72 Numărul 71 Numărul 70 Numărul 69 Numărul 68 Numărul 67 Numărul 66 Numărul 65 Numărul 64 Numărul 63 Numărul 62 Numărul 61 Numărul 60 Numărul 59 Numărul 58 Numărul 57 Numărul 56 Numărul 55 Numărul 54 Numărul 53 Numărul 52 Numărul 51 Numărul 50 Numărul 49 Numărul 48 Numărul 47 Numărul 46 Numărul 45 Numărul 44 Numărul 43 Numărul 42 Numărul 41 Numărul 40 Numărul 39 Numărul 38 Numărul 37 Numărul 36 Numărul 35 Numărul 34 Numărul 33 Numărul 32 Numărul 31 Numărul 30 Numărul 29 Numărul 28 Numărul 27 Numărul 26 Numărul 25 Numărul 24 Numărul 23 Numărul 22 Numărul 21 Numărul 20 Numărul 19 Numărul 18 Numărul 17 Numărul 16 Numărul 15 Numărul 14 Numărul 13 Numărul 12 Numărul 11 Numărul 10 Numărul 9 Numărul 8 Numărul 7 Numărul 6 Numărul 5 Numărul 4 Numărul 3 Numărul 2 Numărul 1
×
▼ LISTĂ EDIȚII ▼
Numărul 125
Abonament PDF

Provocările procesării Big Data în ordine cronologica

Sorina Bîrla
Technical Lead @ Centrul de inginerie Bosch Cluj



Sebastian Urda
Software Engineer @ Centrul de inginerie Bosch Cluj



PROGRAMARE

Accesul la platforme care oferă resurse de calcul nelimitate a adus o mulțime de soluții, dar și un nou set de probleme care necesită noi soluții inovatoare. Algoritmii care se ocupă de procesare big data doresc să integreze în aceste noi platforme vechi soluții care nu sunt perfect compatibile.

Acest articol dorește să prezinte analiza unor posibile soluții menite să depășească limitările și provocările ridicate de procesarea secvențială a datelor cronologice, dependente de starea lor curentă.

Introducere

Datele constituie un mijloc care ajută la obținerea rezultatelor dorite, fiind colectate din mai multe surse la o frecvență înaltă și la scară largă, creând nevoia de tehnologii capabile să achiziționeze, să proceseze și să analizeze date mari.

Databricks este o platformă de analiză unificată, peste Apache Spark, care accelerează inovația prin unificarea domeniilor precum știința datelor, inginerie, afaceri și un motor foarte rapid și ușor de utilizat în contextul de big data și machine learning [1].

Scopul acestui articol este de a evidenția un set de soluții posibile atunci când prelucrarea unor date foarte mari este necesară și dependentă de informații anterioare. Propunerile noastre se vor baza pe capacitățile puse la dispoziție de către Azure Databricks, care lucrează în principal cu Databricks Utilities și funcții Spark în cadrul unui notebook orchestrate într-un job.

Provocări

Una dintre cele mai mari provocări atunci când avem de-a face cu foarte multe date este chiar dimensiunea datelor. Prin urmare, diferite instrumente și strategii sunt necesare pentru a putea procesa datele într-un mod determinist având totodată resurse limitate.

Mai mult decât atât, atunci când execuția pe date istorice intră în discuție, acest lucru adaugă un efort suplimentar în conturarea celei mai bune soluții de prelucrare a datelor. Seriile de date cronologice necesită de obicei păstrarea ordinii în execuție, permițând urmărirea modificărilor în timp. Ca o consecință a acestui fapt, proiectarea unui sistem stateful trebuie să fie luată în considerare pentru a îndeplini această cerință. În secțiunile următoare, această provocare va fi descrisă ca stare, adică o anumită informație din seriile de date procesate anterior.

Limitări

Deoarece python este limbajul cel mai comun în lumea unui analist de date, spark a adăugat un Python API din versiunea 0.7 care oferă suport în a integra funcții definite de utilizator (user defined functions - UDFs). Aceste Pandas UDFs folosesc Apache Arrow , rolul lui principal fiind de a translata dintr-un spark data frame într-un pandas data frame. Limita de memorie este setată la valoarea maximă a unui int32, pentru versiunile mai mici de 0.16 și int64, pentru versiunile mai mari, ceea ce înseamnă că dimensiunea necomprimată a unei partiții trebuie să se încadreze în aceste limite.

Memoria de care dispune un spark driver este limitată, colectarea tuturor datelor pe driver nefiind considerată o bună practică. Partiționarea orizontală a seriilor de date cronologice este necesară în acest sens, iar utilizarea funcționalității spark lazy evaluation este obligatorie.

Soluții

Declanșare bazată pe job

Prima provocare pe care a trebuit să o rezolvăm a fost limitarea cantității de date care trece prin setul de algoritmi de procesare, păstrând totodată cronologia datelor. În cazul nostru, o partiție este un set de rânduri identificate unic prin momentul achiziției datelor. Din cauză că nu toate partițiile acoperă același interval de timp, a apărut nevoia implementării unui mecanism care să aleagă dinamic următoarea secvență de date neprocesate. Dimensiunea acestui subset este de o zi, adică totalitatea datelor ce au fost achiziționate la aceeași dată calendaristică, între ora 00:00:00.000 și ora 23:59:59.999. Algoritmii de procesare aveau deja implementată noțiunea de stare, dar pentru a obține cele menționate anterior mai este nevoie de câteva funcționalități:

Figura 1

În Figura 1 se poate observa distribuția partițiilor referitoare la timp și ceea ce am dorit să obținem: toate pătratele marcate cu Ziua 1 din fiecare partiție trebuie să fie procesate concomitent, înaintea oricărei alte zile din fiecare partiție. Toate pătratele din Ziua 2 trebuie să le urmeze imediat și așa mai departe.

Pentru a putea păstra informația cu privire la ultima zi de date procesată și pentru a putea asigura continuitatea procesării, tabelului deja existent, ce ținea starea internă a algoritmilor, i s-a adăugat o coloană ce păstrează ultima zi procesată (Figura 2).

Figura 2

În pasul 1 din figura precedentă, se face un join între datele din date de intrare și date de stare, iar datele care au dată mai mică decât dată stare sunt excluse din fiecare partiție în pasul 2.

În acest moment datele sunt gata a fi procesate (pasul 3). La finalul procesării, tabelele ce conțin date de ieșire și cele care conțin starea sunt actualizate, așa cum este menționat în pasul 4.

Ultima parte a soluției este trecerea la următoarele zile de date sau retriggering. În continuare vom prezenta mai multe modalități prin care acest lucru poate fi obținut.

Prima implementare se folosește de funcții din dbutils (Databricks Utilities) . Ideea era să lăsăm notebookul să se auto-apeleze până când nu mai sunt date de procesat, și apoi să trecem la următorul. Mai jos este semnătura metodei run:

run( path: String,
timeoutSeconds: int,
arguments: Map)

Al doilea argument al funcției run() exprimă cât timp ar trebui notebookul ce face apelul să aștepte după notebookul apelat să termine. Acest parametru a fost setat la 0, ceea ce înseamnă așteptare infinită, care a creat un lanț de notebookuri ce așteptau unul după altul la nesfârșit. Un dezavantaj al acestei soluții este lipsa de trasabilitate a sistemului și îngreunarea proceselor.

Cea de-a doua implementare este foarte asemănătoare la nivel conceptual, diferența fiind că la finalul unui job se va lansa o altă execuție a aceluiași job, însă cu parametrii diferiți, primul încheindu-și execuția.

Pentru a obține asta, toți algoritmii de procesare au devenit parte a aceluiași job în calitate de task, fiecare trebuind să-și gestioneze propria stare internă, iar ultimul task având responsabilitatea de a porni următorul job pe baza informațiilor primite de la taskurile anterioare. Pentru a putea transmite informația între taskuri am folosit: taskValues , care face parte din databricks-utils. Oricare dintre taskurile aflate în același job pot să trimită informații către celelalte.

dbutils.jobs.taskValues.set(
  key = "finished", 
  value = True)

Toate task-urile următoare pot evita execuția folosindu-se de aceste informații:

isFinished = dbutils.jobs.taskValues
    .get(taskKey = "task_that_wrote_this", 
         key = "finished",
         default = False)

Ultimul task va decide dacă este necesar să pornească următorul job, evaluând valoarea acestei variabile isFinished. Ulterior, să trimită comanda de pornire folosind un HTTP POST request către jobs/run-now prin Azure Databricks API [8]. Pentru a putea obține repornirea este nevoie de job id, care poate fi obținut astfel:

context_str = dbutils.notebook.entry_point
  .getDbutils()
  .notebook()
  .getContext()
  .toJson() context = json.loads(context_str) 

 job_id = context.get('tags', {})
                  .get('jobId', None)

Figura 3

În Figura 3 este prezentat acest mecanism. Răspunsul primit în urma requestului HTTP conține un run_id care poate fi folosit pentru a îmbunătăți trasabilitatea sistemului.

Note:

Rezoluție

Mecanism bazat pe un notebook de orchestrare

Principala problemă la utilizarea metodei run(), în implementarea anterioară, a fost lanțul de notebookuri obținut. O altă soluție ar fi utilizarea acestei metode într-un mod în care apelarea recursivă să fie evitată, folosindu-se două notebookuri.

Această abordare se bazează pe următorii pași:

Figura 4 arată modalitatea în care datele vor fi procesate, folosindu-se de această abordare.

Figura 4

Rezoluție

Această abordare nu a fost folosită pe termen lung deoarece:

Table streaming

Databricks oferă suport pentru table streaming mechanism, procesarea datelor fiind posibilă la nivel de 'micro-batches'. Dimensiunea unui batch poate fi controlată prin setarea variabilelor maxBytesPerTrigger sau maxFilesPerTrigger.

Rezoluție

În momentul în care procesarea datelor în ordine cronologică este necesară, această metodă are o limitare. Ordinea în care librăria procesează datele este impusă de timpul când documentele care conțin datele au fost actualizate/salvate în tabel. Se poate forța un anumit tip de ordonare, setând withEventTimeOrder, dar nu poate fi impusă pe baza niciunei coloane din tabel, acest lucru aplicându-se și în cazul dimensiunii unui batch.

Iterarea datelor dintr-o partiție

Cea mai comună soluție care a fost investigată a fost utilizarea unui mecanism de iterare, care ar lua rând pe rând datele din fiecare partiție, împărțite pe zile, ar executa algoritmii de procesare pe ele, ar salva stările și ar continua mai departe.

Rezoluție

Folosirea unei astfel de metode ar încălca oarecum scopul pentru care spark a fost conceput, introducând un timp de execuție adițional pentru fiecare dintre algoritmii de procesare.

Concluzii

Am reușit să depășim provocările și limitările descrise prin furnizarea unui set de soluții care pot fi aplicate cu ușurință în funcție de anumite cerințe, folosind funcționalitățile care ne sunt puse la dispoziție de mediul în care lucrăm.

Referințe

  1. https://www.databricks.com/spark/getting-started-with-apache-spark

  2. https://learn.microsoft.com/en-us/azure/databricks/dev-tools/databricks-utils

  3. https://learn.microsoft.com/en-us/azure/databricks/workflows/jobs/jobs

  4. https://www.databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

  5. https://arrow.apache.org

  6. https://www.databricks.com/glossary/what-are-spark-applications

  7. https://learn.microsoft.com/en-us/azure/databricks/dev-tools/databricks-utils#dbutils-jobs- taskvalues

  8. https://learn.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/jobs

  9. https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/delta-lake

NUMĂRUL 138 - Maps & AI

Sponsori

  • Accenture
  • BT Code Crafters
  • Accesa
  • Bosch
  • Betfair
  • MHP
  • Connatix
  • BoatyardX
  • .msg systems
  • Yardi
  • Colors in projects

INTERVIURI VIDEO