MapReduce este principala tehnologie de procesare de date de volum mare a proiectului Apache Hadoop. A fost dezvoltată de către Google. În 2004, ei au publicat un articol care descria conceptul MapReduce.
În 2006, Dug Cutting a reușit să implementeze acest concept și să îl includă într-un proiect Apache, mai exact în Apache Hadoop. Prima lansare a avut loc în 14 Septembrie 2007.
Acesta a fost începutul Marilor Date (Big Data) pentru toată lumea, începând de la persoane pur și simplu curioase, până la toate tipurile de companii. În scurt timp, Apache Hadoop a ajuns la dimensiunea unei comunități foarte puternice, atrăgând de asemenea jucători mari precum Yahoo, Facebook, Ebay, IBM, Linkedin și alții.
Pentru o adaptare mai ușoară a tuturor, alte tehnologii au fost dezvoltate peste MapReduce, care sunt mult mai ușor de învățat și de utilizat. Un exemplu este Apache Hive, care a fost dezvoltat la Facebook. Deoarece aproape toți cei din domeniul computer science au cunoștințe SQL, Facebook a dezvoltat Hive, care le-a permis să-și interogheze și să-și analizeze seturile de date prin simpla utilizare a limbajului HiveQL, care este foarte asemănător cu SQL. Astfel, oricine din echipa Facebook, care are cunoștințe SQL, avea capacitatea de a utiliza puterea tehnologiei MapReduce.
MapReduce este o tehnologie distribuită, care funcționează pe produse hardware obișnuite și se folosește pentru prelucrarea datelor. Are două faze principale, faza Map și faza Reduce și o altă fază, Shuffle, care nu este atât de bine cunoscută, dar în unele cazuri de utilizare, poate să vă încetinească sau să vă stimuleze întreaga execuție.
Pentru majoritatea cazurilor de utilizare a prelucrării de date folosind MapReduce, faza Map străbate toate seturile de date și aplică mai multe filtre, iar faza Reduce este locul unde ne aplicăm efectiv algoritmii.
Pentru a înțelege mai bine cum funcționează MapReduce, vă recomand să citiți mai multe despre MapReduce HelloWorld, mai precis exemplul Wordcount. Acesta ne ajută să găsim frecvența fiecărui cuvânt dintr-un set de date. Frumusețea MapReduce constă în faptul că același cod care funcționează pentru un set de date de câțiva MBs poate funcționa pe seturi mult mai mari, de TBs, PBs sau chiar mai mult de atât, fără vreo modificare de cod în programul nostru. Aceasta se datorează naturii execuției distribuite a MapReduce, care are grijă în mod automat de distribuția muncii și de eșecul proceselor.
Mai jos, puteți observa reprezentarea pseudocodului exemplului Wordcount.
mapper (filename, file-contents):
for each word in file-contents:
emit (word,1)
reducer (word, values):
sum=0
for each value in values:
sum=sum + value
emit (word, sum)
În imaginea următoare, puteți vedea procesul general al MapReduce pentru execuția Wordcount. Fiecare fază Map îsi primește setul de date și pregătește chei intermediare ca perechi de (cheie,valoare), unde "cheie" (key) este cuvântul real, iar "valoare" (value) este frecvența actuală a cuvântului, mai exact 1. Faza de shuffling garantează faptul că toate perechile cu aceeași cheie vor servi drept intrare pentru un singur reducer, astfel că în faza de reduce putem calcula foarte ușor frecvența fiecărui cuvânt.
În primul rând, următoarele proprietăți și pași de configurare implicați în tuning-ul MapReduce se referă la MapReduce V1. Există o nouă versiune MapReduce V2, care poate să aibă foarte puține modificări. Sunt necesare cunoștințe MapReduce mai mult decât elementare pentru a înțelege următoarele secțiuni.
După cum am menționat mai sus, în cadrul unei executări MapReduce complete, există două faze principale, map și reduce și o altă fază, shuffle între ele.
Fiecare fază Map primește ca intrare un block (input split - divizare de intrare) dintr-un fișier stocat în HDFS. Valoarea implicită pentru un fișier block este de 64 MB. Dacă dimensiunea totală a fișierului este mai mică de 64 MB, atunci faza Map va primi ca intrare întregul fișier.
Când faza Map începe să producă rezultate, acestea nu sunt scrise direct pe disc. Procesul este mai amplu și profită de memoria RAM prin alocarea unui tampon (buffer) acolo unde rezultatele intermediare sunt stocate. În mod implicit, dimensiunea acestui tampon este de 100 MB, însă poate fi reglată prin modificarea proprietății io.sort.mb. Când se atinge peste 80% din dimensiunea tamponului, un proces de fundal va vărsa conținutul pe disc. Pragul de 80% poate fi de asemenea modificat folosind proprietatea io.sort.spill.percent.
Înainte ca datele să fie vărsate pe disc, acesta este partiționat pe baza numărului de procese de reduce. Pentru fiecare partiție, se execută o sortare în memorie după cheie și dacă este disponibilă și o funcție de combinare, aceasta este rulată pe ieșirea procesului de sortare. Având o funcție de combinare, ne ajută să compactăm rezultatul functiei de map și astfel vom avea mai puține date de scris pe disc și de transferat prin rețea. De fiecare dată când pragul tamponului este atins, un nou fișier de vărsare este creat, astfel încât, în majoritatea executărilor de map, la final, putem să avem fișiere de vărsare multiple în cadrul unei executări a unui map.
După ce faza de map este încheiată, toate fișierele de vărsare sunt unite într-un unic fișier partiționat și sortat. Este recomandat, de asemenea, să comprimați rezultatul map-ului pe măsură ce este scris pe disc pentru a grăbi scrierea pe disc, pentru a economisi spațiul pe disc, dar și pentru a reduce cantitatea de date transferată reducerilor. Opțiunea de comprimare este dezactivată în mod implicit, însă poate fi modificată foarte ușor prin setarea proprietății mapred.compress.map.output pe "true". Algoritmii de comprimare susținuți sunt DEFLATE, gzip, bzip2, LZO, LZ4 și Snappy.
Faza Reduce își primește setul de date printr-o metodă de cerere de date folosind protocolul HTTP. Să vedem ce se întâmplă în partea de reduce.
După ce execuția map-ului este încheiată, este informat coordonatorul execuției (jobtracker), care știe la care procese de reduce să trimită fiecare partiție. Mai departe, reduce-ul are nevoie de rezultatele a mai multor faze de map care încep să fie copiate odată ce acestea sunt finalizate.
Rezultatele funcțiilor de map sunt copiate direct în memoria JVM a funcției de reduce, dacă sunt suficient de mici. În caz contrar, acestea sunt copiate pe disc. Când tamponul în memorie (in-memory) atinge o anumită dimensiune (controlată de mapred.job.shuffle.merge.percent) sau atinge un număr maxim de rezultate de map (mapred.inmem.merge.threshold), este unit și vărsat pe disc. Dacă este specificat un combinator, acesta va fi rulat în timpul unirii pentru a reduce cantitatea de date scrise pe disc. Dacă în final vom avea multiple fișiere de vărsare pe disc, acestea sunt de asemenea unite în fișiere mai mari, sortate, pentru a economisi timp pentru mai târziu.
Când toate funcțiile de map sunt încheiate și rezultatele lor sunt copiate pentru fazele de reduce, intrăm în faza de unire, care unifică toate rezultatele funcțiilor de map, păstrându-le ordinea de sortare după cheie. Rezultatul acestei uniri servește drept intrare pentru faza de reduce. În timpul fazei de reduce, funcția de reduce este invocată pentru fiecare cheie din ieșirea sortată. Ieșirea acestei faze este scrisă direct pe sistemul de fișiere, de obicei HDFS.
Faza de shuffle înseamnă toate procesele din punctul în care functia map produce rezultate până unde funcția de reducerea le consumă. Cu alte cuvinte, faza de shuffle implică sortare, unire și copierea datelor între map și reduce.
După ce am văzut pașii interni ai MapReduce și îi înțelegem mai bine, putem începe acum să îmbunătățim execuția generală a MapReduce.
Acum vă voi oferi câteva sfaturi generale despre cum să vă reglați execuția MapReduce.
În general, este mai bine să acordați fazei de shuffle cât mai multă memorie posibilă, astfel încât datele vor fi prelucrate în memoria RAM în loc de disc. Deoarece faza de shuffle folosește memoria RAM din memoria atribuită fazelor de map și reduce, trebuie să fim atenți în a lăsa suficientă memorie pentru ele. De aceea este mai bine să scrieți funcția de map și reduce pentru a utiliza cât mai puțină memorie posibil (evitarea acumulării valorilor într-o colecție, spre exemplu).
Cantitatea de memorie alocată fiecărei funcții de map și reduce este atribuită de proprietatea mapred.child.java.opts. Ar trebui să le alocăm cât mai multă memorie posibil, dar să nu depășească totuși cantitatea de memorie RAM a serverului.
Pe partea de map, cea mai bună performanță poate fi obținută prin evitarea vărsărilor multiple pe disc, una singură este vărsarea optimă. Pentru aceasta, ar trebui să detectăm dimensiunea rezultatelor funcției de map și să modificăm proprietățile corespunzătoare (de ex. io.sort.mb), pentru minimizarea numărului de fișiere de vărsare pe disc.
Pe partea de reduce, cea mai bună performanță se obține când datele intermediare pot să stea în totalitate în memorie. În mod implicit, aceasta nu se întâmplă, deoarece pentru cazul general, toată memoria este rezervată funcției de reduce. Însă, dacă funcția Dvs. de reduce are cerințe de memorie puține, atunci setarea proprietăților corecte poate să vă sporească performanța. Pentru aceasta, aruncați o privire la proprietățile mapred.inmem.merge.threshold și mapred.job.reduce.input.buffer.percent.
Dacă doriți să faceți doar o încercare a tehnologiei MapReduce, nu vă recomand să vă deranjați cu reglarea (tuning-ul) de mai sus, deoarece configurația implicită funcționează destul de bine. Însă, dacă într-adevăr lucrați cu seturi mari de date și doriți să așteptați doar trei zile în loc de cinci zile pentru rezultatele analizelor Dvs., vă recomand cu încredere să luați în considerare reglarea (tuning-ul). Aici, în Cluj-Napoca, avem o comunitate BigData puternică, unde am început să avem subiecte și ateliere relevante despre BigData. Alăturați-vă nouă dacă doriți să descoperiți mai multe! Următoarea întâlnire va fi cea mai mare de până acum, avem vorbitori excepționali din București și Timișoara, care vor vorbi despre Elasticsearch, Spark, Tachyon și Machine Learning.