TSM - Scala și Big Data – Spark Framework

Andrei Muja - Scala Engineer @ Zenitech

Bine ați revenit! Continuăm să ne îmbogățim cunoștințele în Scala, de data asta, cu o serie despre Big Data. Acest articol va fi o introducere în frameworkul Spark, un engine de procesare a datelor. În cele ce urmează, voi prezenta ceea ce este Spark folosind Scala pentru crearea aplicațiilor distribuite. De asemenea, vom face niște exerciții prin intermediul cărora vom analiza un set de date folosind doar Spark Low-Level API - RDDs. Următoarele articole vor avea mai multe exemple practice, dar cu Spark High-Level API.

Vă sugerez să citiți articolele mele anterioare despre particularitățile limbajului, avantajele și dezavantajele programării funcționale folosind Scala (ScalaFP), precum și cele mai utilizate metode aplicate pe colecții de date immutable (ScalaHOF). Sunt necesare pentru a înțelege conținutul din acest articol. Opțional, puteți vedea punerea în practică a metodelor HOF rezolvând probleme în stil pur funcțional (nu este obligatoriu, dar e bine să-l străbateți - ScalaProbleme).

Vă mai recomand un articol în engleză: Scala Introduction, precursor al celor din revistă, unde explic în detaliu fiecare element basic al limbajului.

Notă: voi folosi Scala 2.13, cu versiunea Spark 3.3.2, ultimul releases stabil la data publicării articolului. Versiunea Java utilizată este 11.

Big Data

Big Data reprezintă acele colecții de date cu un volum imens și creștere continuă, dar și dimensiuni de ordinul sutelor de GB sau mai mult. Un singur computer nu ar putea să le stocheze și proceseze eficient. Datele pot fi structurate (tabele), nestructurate (combinație de imagini, text, audio) sau semi structurate (XML). Cum procesoarele cresc incremental în putere, posibilitățile lor de creare au atins noi limite fizice. În același timp, stocarea și prelucrarea datelor este din ce în ce mai mare, facilă și ieftină. Mai mult, ele trebuie distribuite și procesate în paralel, iar computerele cu un singur procesor nu pot scala la nesfârșit. Așa a apărut Spark.

Ce este Spark?

Inițial un proiect la UC Berkeley în 2009, Apache Spark este un framework multi-language de procesare a datelor care poate executa multiple operațiuni asupra unor seturi foarte mari de date, precum încărcarea și analiza lor, data science, machine learning, interogări SQL și streaming. De asemenea, poate distribui procesele între unul sau mai multe computere numite noduri, dar și în clustere diferite, de unul singur sau împreună cu alte tooluri de distributed computing (Hadoop). În termeni tehnici, se mai numește și Unified Computing Engine, în sensul că are API-uri consistente în mai multe limbaje, dar asemănătoare. Sunt ușor de utilizat și au un nivel de abstractizare ridicat. Amintim aici Java, Scala, Python, R, chiar și SQL.

Spark se ocupă de optimizarea tuturor librăriilor. Să presupunem că suntem în echipa de Data Science și avem de executat niște operațiuni de tip machine learning asupra unor seturi de date obținute prin SparkSQL (două din componentele sale majore). Spark va optimiza codul în nod, cluster, peste librăriile de machine learning și peste SQL. Este independent de sursa datelor, nu contează de unde vin acestea și unde sunt stocate.

Utilizat, printre altele, de Amazon, Ebay, NASA Deep Space Network, TripAdvisor, Baidu, Shopify, Yahoo.

Arhitectură

Spark urmează arhitectura "master-slave". Clusterul are un singur "master", dar mai mulți "slave" și se bazează pe două concepte abstracte majore: RDD (Resilient Distributed Dataset) și DAG (Directed Acyclic Graph). RDDs sunt un grup de date care pot fi stocate in-memory în nodurile de lucru Spark. Despre RDD voi vorbi mai târziu în detaliu, fiindcă este subiectul principal al articolului, plus exemple de cod.

DAG este un graf direcționat finit care execută o secvență de operațiuni. Fiecare nod este o partiție RDD, iar muchiile reprezintă o transformare aplicată datelor. Graful reprezintă navigarea, iar aciclitatea și direcția sunt legate de secvența operațiilor, de la un stagiu la altul, succesiv.

Driver Program este un proces care rulează în main() și creează obiectul SparkContext. El are rolul de a coordona aplicațiile Spark, acționând ca un set independent de procese în cluster.

Pentru a rula în cluster, SparkContext se conectează la diferiți cluster manageri unde:

  1. Preia executors aflați în noduri de lucru, aducându-i în cluster;

  2. Trimite codul aplicației la acei executors. Codul poate fi definit ca arhiva JAR sau fișiere Python care sunt pasate către SparkContext;

  3. La final, SparkContext trimite taskurile la executors ca să ruleze.

Cluster managerul alocă resurse aplicației. Spark este capabil să managerieze multe clustere. Ele pot fi de mai multe tipuri: Hadoop YARN, Apache Mesos - ultimul depreciat. Mai există un tip, Standalone Scheduler, un Spark cluster manager care permite instalarea Spark pe un set nou de computere.

Worker Node rulează codul aplicației în cluster, iar Executor este un proces lansat ca aplicația să ruleze într-un worker node. Rulează taskuri și salvează data în memorie proprie sau disk storage. De asemenea, citește și scrie data către/din surse externe. Fiecare aplicație are propriul său executor. Mai avem Task, care este o unitate de lucru ce este trimisă doar unui singur executor.

Componente

Există două straturi/layere majore, arătate în diagrama de mai sus. Avem low-level API: Spark Core, care se ocupă de funcțiile de input/output, task scheduling, monitorizare, fault recovery. Tot el gestionează memoria și interacționează cu sistemul de stocare a datelor. Este format din RDDs (o structură specială, vom vedea mai jos ce este) și variabile distribuite.

Apoi, avem high-level APIs, format din Datasets, DataFrame și:

  1. Streaming pentru flux de date, data analysis și batch pentru date istorice, acceptă conexiuni de la Amazon Kinesis, Azure, GCP, Kafka, Flink etc.

  2. MLLib cu diverși algoritmi genetici și suport pentru machine learning, codul poate fi scalat și folosit pe zeci de mașini;

  3. GraphX pentru manipularea grafurilor de date directe cu proprietăți în noduri și muchii care realizează analiza datelor de dimensiuni chiar și de petabytes fără probleme;

  4. SparkSQL: suport pt SQL, dar și HQL (Hive Query Language), plus fișiere de tip JSON, Parquet, CSV, conectare ODBC și JDBC. Execută query-uri mai rapid de multe ori decât un data warehouse. Ele vor fi subiectul articolelor următoare.

Hadoop MapReduce

MapReduce este un pattern de programare al frameworkului Hadoop pentru accesarea datelor cu volum mare în Hadoop File System (HDFS). Procesarea este făcută prin separarea volumului de date în bucăți mici, în paralel, prin multiplele servere Hadoop. La final, datele sunt agregate pentru a returna un singur output. La bază, sunt două funcții: Map și Reduce. Acestea se execută secvențial, una după alta. Map preia datele de intrate ca perechi , procesează și creează un set intermediar . Reduce ia tot un input și produce un alt . Mai sunt doi pași intermediari: Combine și Partition.

Primul rulează individual pe fiecare server mapat. Micșorează data și mai mult până la o formă simplificată. Cel de-al doilea transforma perechile rezultate într-un nou set care intră în reducer. El decide cum arată data și alege către care reducer să meargă.

Spark vs MapReduce

Spark poate fi de până la de 100x mai performant decât MapReduce! Cel din urmă era ineficient pentru aplicațiile de mari dimensiuni și inteligență artificială. Fiecare pas necesita de multe ori crearea unei noi aplicații și deploy pe cluster. Spark permite crearea unui API funcțional simplu, optimizarea aplicațiilor multi-step, alături de partajarea datelor între noduri, operație numită shuffling.

Spark este de multe ori o alternativă mai bună la Hadoop și poate rula peste un cluster Hadoop, avantajele fiind sistemul de fișiere distribuite și un YARN cluster manager. Spark are propriul său cluster manager, poate fi utilizat independent, dar nu este un înlocuitor.

De ce Scala pentru Spark?

Spark este scris în Scala. Prin urmare, ne putem folosi de același limbaj care oferă o eleganță și rapiditate în execuție -> codul în Spark este apropiat de cel din Python, ca sintaxă. Încă este mai rapid codul Spark în Scala decât Pyhton, deși în ultimii ani s-a apropiat destul de mult. Compilează în bytecode, aproape de lower level și are puțin boilerplate decât Java. Cel mai mare avantaj, totuși, îl constituie folosirea programării funcționale.

Instalare Spark

Mai jos, aveți un exemplu de configurare al fișierului build.sbt, cu librăriile necesare pentru a rula Spark. Se aplică tuturor articolelor din seria "Spark folosind Scala". IDE este IntelliJ IDEA Community.

version := "0.1"

scalaVersion := "2.13.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.2",
  "org.apache.spark" %% "spark-sql" % "3.3.2",
  "org.apache.spark" %% "spark-mllib" % "3.3.2",
  "org.apache.spark" %% "spark-streaming" % "3.3.2"
)

Observație: Lăsați sbt-ul (Scala Building Tool) să se ocupe de instalarea Spark. Este mult mai ușor fără bătăi de cap privind versiunile și compatibilitățile. Exemplele din articol sunt rulate în Windows. Pentru ca totul să fie ok și dacă doriți să replicați codul, accesați HadoopUtils și descărcați hadoop.dll și winutils.exe. Salvați-le într-un folder bin și setați "environment variables" adăugând o variabilă nouă "HADOOP_HOME" care să pointeze în calea salvată și modificați "Path": %HADOOP_HOME%bin. Totul ar trebui să funcționeze acum.

Pentru Mac, folosiți sbt, plus tutorialul SparkForMacOS.

RDDs

Nume complet: Resilient Distributed Dataset. Resilient restaurează informația în caz de eroare, Distributed distribuie dat între diferite noduri, iar Dataset este însuși grupul de date. Este low-level API, nivelul de abstractizare din care derivă derivă high-level API- urile pe care le vom vedea în articolele viitoare. Format din structurile de date ale limbajului folosit. În cazul nostru, orice structură/obiect/HOF din Scala, immutable. Are două tipuri de operații: transformări și acțiuni. Este îmbunătățirea adusă de Spark conceptului MapReduce, prin RDDs reducându-se latența algoritmilor.

Transformare: preia un RDD ca input și produce unul sau mai multe RDDs.

*Acțiune: preia un RDD ca input și produce o operație/funcționalitate asupra outputului*.

Să trecem la practică!

Avem un proiect Scala nou în IntelliJ. Orice cod Spark începe astfel: creăm o sesiune SparkSession, îi dăm un nume, o configurare key-value, precum și contextul, entrypoint în low-level API. Metoda getOrCreate() verifică dacă există o sesiune deja, fie globală sau locală. Dacă da, o returnează, aplicând opțiunile de configurare config(). Dacă nu, creează sesiunea și o setează ca globally default.

Mai jos vedem cum:

val spark = SparkSession.builder()
  .appName("Intro RDD - Distributed Datasets")
  .config("spark.master", "local")
  .getOrCreate()

val sc = spark.sparkContext

Setul de date utilizat este simulat: Mockaroo. Are legătură cu geografia politică: orașe, țări și populația într-un an. Sunt 1000 înregistrări, destul de mic pentru un proiect Big Data real, dar bun ca exemplu practic. Datele nu reflectă realitatea privind populația, dar asocierea oraș-țară este corectă. Fișierul este atașat aici: Demographics. El se va afla și în folderul de resurse ale proiectului.

Veți vedea cât de ușor putem crea și folosi RDD-uri cu ajutorul metodelor HOF din API-ul Scala.

Pentru a lucra cu aceste date, vom crea un case class Demographics, având câmpurile numele coloanelor din tabel. Datele sunt by default immutable. Apoi, citim fișierul. Voi arăta două moduri.

Primul citește fișierul linie cu linie, elimină primul rând (pentru că reprezintă numele coloanelor, prin drop), separă valorile în tokenuri prin virgulă, mapează fiecare token cu un obiect Demographics și returnează o listă. Pentru a converti o colecție Scala într-un RDD, apelăm metoda parallelize.

def readDemographics(fileName: String): List[Demographics] =
  Source.fromFile(fileName)
    .getLines()
    .drop(1)
    .map(line => line.split(","))
    .map(demo => Demographics(demo(0), demo(1), 
    demo(2).toInt, demo(3).toDouble))
    .toList

val citiesRDD = sc.parallelize(readDemographics(
  "src/main/resources/data/demographics.csv"))

Al doilea mod apelează metoda textFile() care citește fișierul (acesta ar putea fi stocat și într-un Hadoop File System sau pasăm un URI), delimitează informația prin virgulă, construiește obiectele și returnează un RDD[Demographics]. Aici, Filter are rolul de a ignora coloanele, nu putem face drop pentru că nu știm ce entitate ar fi eliminată din RDD.

val readDemographicsRdd: RDD[Demographics] = sc.textFile("src/main/resources/data/demographics.csv")
  .map(line => line.split(","))
  .filter(tokens => tokens(0).toUpperCase() == 
     tokens(0))
  .map(demo => Demographics(demo(0), demo(1), 
     demo(2).toInt, demo(3).toDouble))

Trecem la transformări și acțiuni. Putem filtra informațiile despre orașe, de exemplu, după țară. Mai mult, vrem să știm numărul acestora. Pentru Franța, avem:

val franceCities = citiesRDD.filter(
  _.country == "France") // transformare lazy

val countFranceCities = franceCities.count() 
// acțiune eager

Transformarea este lazy, adică Spark nu va executa acest proces până când nu avem neapărat nevoie de el. Eager este atunci când executăm codul imediat ce a fost definit în pipeline.

Dar dacă vrem un RDD conținând elementele distincte pe baza anului pentru orașe? Simplu:

val distinctYearsRDD = citiesRDD.map(_.year)
   .distinct() // tot transformare lazy

Noțiunea de Scala implicit apare aici. Pentru a determina orașul cu cel mai mic sau mare număr de locuitori, este nevoie să definim un Ordering[Demographics], un trait ce reprezintă o strategie de sortare a unor tipuri de instanțe. Implicit pentru că mecanismul acesta ajută compilatorul să ajusteze codul. Practic, o extensie. El se va ocupa de sortarea datelor, abia ulterior putem apela metodele min() sau max():

implicit val citiesOrdering: Ordering[Demographics] =
  Ordering.fromLessThan[Demographics]
  ((da: Demographics, db: Demographics) => 
  da.population < db.population)

val minPopulation = franceCities.min() 
// acțiune, reduce la un singur element

Putem, de asemenea, grupa setul de date pe baza unui predicat. Cel mai simplu exemplu este prin groupBy pe baza unei chei, să zicem tot anul recensământului. Gruparea poate să fie complexă dacă includem numărul de partiții dorit sau un Partitioner, obiect ce determină modul de partiționare al datelor. Operația aceasta este costisitoare, datele fiind mutate între noduri.

// groupBy, cazul simplu
val groupedByYearDemo = citiesRDD.groupBy(_.year)

Partition și Coalesce

Spark separă datele în partiții și le procesează în paralel. Este foarte important ca un Big Data developer să știe cum este partajată informația și să o ajusteze manual, la nevoie, pentru a optimiza codul permanent. Este recomandat ca repartiționarea să fie făcută cât mai devreme într-un proiect cu milioane sau miliarde de volume de date, deoarece consumă multă memorie și presupune la rândul ei Shuffling. O dimensiune optimă a unei partiții este între 10-100MB.

Luăm RDD-ul creat mai devreme și îl repartiționăm în câte bucăți vrem, să fie 15. Apoi, facem conversia din RDD în DF sau DataFrame, salvăm valorile în fișiere externe cu modul de operare Overwrite (dacă tabela există, suprascrie cu ultimul conținut), format Parquet.

Parquet este un format de fișiere arhivate eficient în citirea și stocarea datelor de dimensiuni gigantice:

//Partitioning
import spark.implicits._

val repartitionedDemographicsRDD = 
  citiesRDD.repartition(15)

repartitionedDemographicsRDD.toDF.write
  .mode(SaveMode.Overwrite)
  .parquet(
   "src/main/resources/data/parquet/demographics15")

Tranziția la DataFrame se face importând implicits din SparkSession care extinde clasa abstractă SqlImplicits. Aici sunt colecții de metode utile în conversia obiectelor native Scala sau RDD în DataFrames/Datasets.

Coalesce va repartiza RDD în și mai puține partiții, formând un nou RDD. Nu necesită însă shuffling, informația nu este "plimbată" în tot clusterul. Iată cum:

// coalesce

val coalescedRDD = repartitionedDemographicsRDD
.coalesce(10) // nu necesită shuffling

coalescedRDD.toDF.write
  .mode(SaveMode.Overwrite)
  .parquet("src/main/resources/data/demographics10")

Notați că rulând acest program, nu vom avea rezultate printate pentru orice operație, ci doar informații despre Spark. Dacă totul a decurs bine, mesaje de informație și răspuns pozitiv ca acelea de mai jos ar trebui să fie afișate în consolă. Cum știm dacă ceea ce am rulat este corect? Urmăriți folderul de resurse unde au fost create două noi locații cu fișiere Parquet partiționate și repartiționate:

//Partitioning 
import spork.implicits._ 
val reportitionedDemographicsRDD = citiesRDD
 .repartition( nun1Partit1ons = 15) 

reportitionedDemographicsRDD.toDF.write 
  .mode(SaveMode.Overwrite) 
  .parquet( path = "src/main/resources/data"+
   " /parquet/demographicsl5") 

// coalesce 
val coolescedROO = reportitionedDemogrophicsRDD
  .coalesce( nun1Partit1ons = 10) 
//nu necesită shuffling 

coalescedRDD.toDF.write 
  .mode(SaveMode.Overwrite) 
  .parquet( path = "src/main/resources/"+
   "data/demographicslO")

Avantaje și dezavantaje RDDs

Acum că am văzut ce sunt, să trecem în revistă când este bine să folosim un RDD și când nu.

  1. Avantaje:

  2. Dezavantaje:

Concluzii

Big Data este un domeniu vast, iar ceea ce am prezentat aici este doar o introducere. Lucrurile pot fi, și sunt, de regulă, mai complicate de atât, în lumea reală.

Am văzut ce este Spark, cum funcționează și de ce e a fost nevoie existența lui. Am trecut, de asemenea, prin Spark Core, low-level API pentru a profita de avantajele structurilor de date immutable asupra volumelor de date imense. Programarea funcțională este cea mai bună opțiune pentru realizarea serviciilor distribuite cu multe requesturi.

Am amintit și dezavantajele. Ele constituie punctul de start pentru care un RDD nu este suficient pentru orice tip de informație. În articolele viitoare, vom vedea cum facem conversia dintre un RDD și DataFrame/Dataset, dar și în sens invers și ce probleme rezolvă acestea, plus exemple asupra altor seturi de date.

Ca resurse, las aceleași linkuri, la care mai adaug documentația oficială Spark și a API-ului în Scala:

  1. Baeldung -> o mulțime de tutoriale despre Java, Spring Framework, Spring Security, dar și Scala;
  2. RockTheJvm -> canal Youtube cu o multitudine de Scala tips and tricks - creatorul de conținut este român, are și cursuri despre Spark de la nivel de începător până la avansat;

  3. ScalaInTheCity -> canalul oficial al unora dintre cele mai mari conferințe despre Scala, cu ultimele tendințe în piață, dar și la ce proiecte lucrează anumite firme;

  4. SparkDoc, ScalaSparkAPI -> fiți la curent mereu cu ultimele versiuni, pentru a evita orice vulnerabilitate și probleme legate de compatibilități.