Creșterea rapidă a platformelor de tip Cloud Data Lakehouse din ultimii ani a permis dezvoltatorilor să reducă timpul până la lansare prin oferirea unui mediu unificat pentru multe dintre sarcinile comune, ce se întind de la ETL (extract-transform-load) de bază, prin Business Intelligence până la Machine Learning și Inteligență Artificială.
Acest articol analizează provocările pe care le întâlnim atunci când încercăm să aplicăm una dintre cele mai bune practici din dezvoltarea software, testarea continuă în Data Science, propunând o soluție potențială.
Databricks este o platformă de tip Lakehouse în cloud care permite Data Engineering la scară largă, Data Science colaborativ, soluții complete pentru Machine Learning și Business Analytics.
Având suport pentru mai multe limbaje de programare ca Python, R, Scala și Java, Databricks se integrează cu IDE-uri (Integrated Development Environment) precum Visual Studio Code, PyCharm, IntelliJ IDEA și Eclipse.
În decursul acestui articol vom folosi limbajul de programare Python pentru a ilustra conceptele.
Unul dintre cele mai comune tipuri de testare, Unit Testing, își propune să ofere cât mai mult code coverage cu putință. În Data Science codul e compus în cea mai mare parte din SQL queries care ar putea fi acoperite 100% dintr-o singură execuție. Din acest motiv, line și branch coverage nu ne spun aproape nimic despre calitatea codului.
Deoarece aggregation queries din notebookuri sunt intenționate pentru surse de date reale, crearea unui mecanism de "stubbing" în scopul testării poate deveni problematic, în special când codul nu ar trebui să știe că este testat.
Databricks folosește execuția și distribuția de live-code, o abordare bazată pe Jupyter notebookuri, ce sunt foarte utile în sarcinile din Data Science precum Exploratory Data Analysis (EDA), curățare și transformare de date, vizualizare de date, modelare statistică, Machine Learning și Deep Learning.
Dezvoltatorii sunt încurajați să folosească Databricks Connect și dbx by Databricks Labs (pentru a reduce timpul de dezvoltare).
Prin comparație cu dezvoltarea software tradițională, aici nu întâlnim foarte des testare continuă prin CI (Continuous Integration) pipelines externe și dedicate, în special datorită următoarelor aspecte:
Dezvoltatorii depind de clusterul Databricks. Aceștia nu au capacitatea de a fabrica medii de test locale temporare ce au toate aptitudinile, bibliotecile și contextul clusterului ce facilitează dezvoltarea. Instrumentele similare cu JUnit/pytest și în stare să decupleze codul de mediul în care rulează lipsesc de pe piață. Cele mai multe dintre ele sunt proiectate să ruleze în interiorul Databricks în loc să fie executabile de sine stătătoare.
Testarea de Integrare Continuă pentru notebookuri Databricks este o necesitate pentru proiecte de nișă, de genul celor care au un țel pentru calitate superioară și consideră joburile și notebookurile Databricks ca fiind continously delivered software și nu soluții livrate o singură dată, ce rezolvă doar probleme specifice de Data Science.
Cerințele (R) pentru Integration Testing provin din dezvoltarea software tradițională și le vom descrie în acest capitol.
R1: Integration Tests trebuie să demonstreze că modulele software, în acest caz funcțiile UDF [15] (user defined functions), bibliotecile instalate [16] și Spark SQL [17] queries funcționează cum ne așteptăm atunci când sunt legate împreună prin Notebook Command.
R2: Odată dezvoltate, testele sunt executate automat, ca parte din pipeline-urile de Continuous Integration.
R3: La sfârșitul execuției se generează și se stochează un raport detaliat. Raportul include lista testelor ce au fost executate, rezultatele evaluării și alte detalii relevante.
R4: Toate datele și resursele asociate cu testele trebuie curățate de îndată ce nu mai este nevoie de ele.
Diagrama de mai jos ilustrează componentele principale ce sunt implicate în soluția propusă:
Test Data sunt fixe, disponibile întotdeauna la aceeași locație pe durata dezvoltării Testelor de Integrare precum și pe durata execuției lor.
Databricks oferă Development Workspace pentru implementarea Testelor de Integrare și execuția lor automată ulterioară, într-un Temporary Workspace al cărui ciclu de viață este gestionat în întregime prin intermediul Databricks Workspace API.
Un Git Repository urmărește codul sursă al Notebook1.py și cel al TEST-Notebook1.py asociat.
CI Pipeline descarcă fișierele codului sursă, invocă Execution Script, iar ulterior publică Test Report.
Să aruncăm o privire mai îndeaproape asupra câtorva componente și să analizăm relațiile lor. Următoarele două capitole se concentrează pe aspectele cheie din implementarea soluției propuse.
În acest articol considerăm că Notebook1 este codul țintă, iar TEST-Notebook1 își propune să îl testeze. Următoarea diagramă prezintă activitățile din cadrul celor două nootebookuri.
În Pasul (1) și Pasul (2), aducem și inițializăm bibliotecile de care avem nevoie în pregătirea datelor de test și efectuarea Testelor de Integrare, spre exemplu:
%pip install pytest ipytest
import ipytest
...
ipytest.clean_tests(...) ipytest.autoconfig()
Pasul (3) generează identificatori aleatorii unici pentru tabelele de date, rapoarte de test și altele, în așa fel încât fiecare execuție își poate asocia acești identificatori unde consideră că e important, ca și în următorul exemplu:
IT_EXEC_ID = str(uuid.uuid4().hex)
INPUT_DATA_TABLE = 'input_' + IT_EXEC_ID
OUTPUT_DATA_TABLE = 'output_' + IT_EXEC_ID
TEST_REPORT_FILENAME = '/dbfs/' + IT_EXEC_ID + '.xml'
Pasul (4) pregătește datele de intrare în INPUT_DATA_TABLE, Pasul (5) invocă Notebook1 folosind parametrii personalizați, iar Pasul (10) verifică rezultatele, spre exemplu:
def test1():
dbutils.notebook.run( 'Notebook1',0,
{
'input_data_table': INPUT_DATA_TABLE,
'output_data_table': OUTPUT_DATA_TABLE
}
)
result_count = spark.sql(
'select count(*) as total from ' +
OUTPUT_DATA_TABLE) assert(
result_count.collect()[0].total > 0)
Pe durata execuției testelor, Notebook1 își parcurge proprii pași. Pentru a putea fi testabil, Pasul (6) și Pasul (7), trebuie să se asigure că Notebookul este parametrizabil. Parametrii pot fi obținuți folosind următoarele instrucțiuni:
Pasul (8) efectuează Business Logicul principal, dar între limitele impuse anterior (se bazează pe variabilele INPUT_TABLE și OUTPUT_TABLE).
Notebook1 are obligația să prevină alte invocări de notebookuri (ale lui însuși sau altele) în timp ce rulează sub testele de integrare, pentru ca scopul acestor teste să rămână în continuare previzibile. Această discriminare poate fi realizată spre exemplu, prin compararea valorilor oferite pentru INPUT_TABLE
și OUTPUT_TABLE
cu valorile implicite cunoscute (ce sunt folosite în execuțiile reale).
Execuția testelor este inițiată de fapt de către pasul (11), ce are și scopul de a colecta Test Report generat la sfârșit (în format JUnit), folosind următoarea comandă:
ipytest.run('-p', 'no:cacheprovider', '-v', '-rA',
'--junitxml=' + TEST_REPORT_FILENAME)
După ce am efectuat curățenia de la Pasul (12), execuția testelor se finalizează cu Pasul (13) - predarea locației unde se află Test Report:
Databricks oferă câteva API-uri REST pentru a facilita interacțiunea din exterior, iar soluția descrisă în acest articol folosește următoarele trei:
Workspace API, care ne permite să afișăm lista, importăm, exportăm și să ștergem notebookuri și directoare.
Jobs API, care ne permite să creăm, modificăm, rulăm și să ștergem joburi.
Următoarea diagramă prezintă interacțiunile inițiate de către Execution Script.
Prin folosirea Workspace API, scriptul creează un spațiu de lucru temporar cu Pasul (1) și încarcă toate notebookurile din codul lor sursă (inclusiv fișierele TEST-) cu Pasul (2).
În Databricks nu se pot rula notebookuri din exterior, decât atunci când acestea sunt împachetate în cadrul unor joburi. Jobs API facilitează crearea unui Job temporar cu Pasul (3), execuția lui cu Pasul (4) și inspectarea lui cu Pasul (5).
La sfârșitul execuției raportul poate fi descărcat folosind DBFS API cu Pasul (6), și publicat de către pipeline la locația configurată.
Realizăm curățenia în ordine inversă a apelărilor - prima dată DBFS cu Pasul (7), apoi Job cu Pasul (8), iar în final Workspace cu Pasul (9).
Există și câteva limitări tehnice, spre exemplu limita maximă a unui request spre Workspace API este de 10MB, sau conținutul descărcat din DBFS este oferit în reprezentare Base64 în loc de fișiere concrete, dar acestea pot fi ușor rezolvate prin intermediul unor soluții alternative.
Cea mai mare dificultate o întâmpinăm din cauză că instanța de Databricks trebuie să fie întotdeauna pregătită pentru folosire. În alte cuvinte, soluția descrisă în acest articol presupune că toate dependințele sunt deja la locul lor (biblioteci, configurări specifice de cluster, variabile de mediu și așa mai departe).
În acest articol am demonstrat cum testele de integrare, în adevăratul lor sens sunt posibile pentru notebookurile Databricks. În teorie, soluția propusă poate fi materializată în așa fel încât să detecteze Testele de Integrare existente în proiect, execuția lor și publicarea rapoartelor, totul într-un mod automat și continuu.