TSM - OpenStreetMap în epoca Spark

Adrian Bona - adrian.bona@telenav.com

OpenStreetMap este o hartă considerată o Wikipedia a hărților. Există foarte multă informație online despre acest proiect, chiar și cărți pe această temă, dar nu vom discuta despre istoria OSM. Ne propunem să vă oferim o descriere generală a setului impresionant de date care stă la baza OSM și a modului în care o astfel de hartă mare poate fi analizată prin intermediul unor tehnologii moderne precum Apache Spark.

O scurtă anatomie a OpenStreetMap

Unul din motivele pentru care OSM a ajuns până în acest punct este faptul că este un model simplu pentru reprezentarea datelor. Există doar trei tipuri de entități:

Fiecare entitate poate avea mai multe etichete (tags) (de exemplu, o cale marcată cu highway=residential ne arată că e vorba de un drum rezidențial).

De asemenea, se stochează informație adițională despre utilizatorul care a adăugat fiecare entitate, cât și momentul adăugării acesteia.

Cât de mare este OpenStreetMap?

Datele sunt disponibile, în două formate:

Proiectul OSMstats arată evoluția zilnică a hărții.

Este OpenStreetMap accesibilă pentru lumea Big Data?

Poate fi dificil să începeți să lucrați cu OpenStreetMap la scară mare. Acum câțiva ani eram uimiți să vedem colegii așteptând ore sau zile pentru a încărca părți din OSM în PostgreSQL pe mașini imense. Era totuși normal ... pentru că nu era Big Data.

Între timp, am început să lucrăm cu diverse analize geo-spațiale în cadrul unor proiecte experimentale folosind tehnologii Big Data din lumea Hadoop. Dar din nou, felul în care sunt organizate datele OSM ne-a constrâns să folosim o unealtă numită osmosis rulată pe un fișier uriaș PBF pentru a extrage doar anumite părți de interes și salvarea acestora în fișiere CSV, ca ulterior acestea să fie folosite în cadrul unor joburi MapReduce. Chiar dacă acest lucru funcționează, nu are cel mai bun randament. 

Ulterior, când am început să utilizăm la scară mare Apache Spark, aveam nevoie de o soluție generală pentru a reprezenta toate datele OSM. Soluția pe care am ales-o, a fost să folosim un format în coloană numit Apache Parquet

Convertorul este disponibil la adresa: github.com/adrianulbona/osm-parquetizer

Cât de rapidă este conversia?

Mai puțin de un minut pentru romania-latest.osm.pbf și ~3 ore (pe un laptop cu SSD) pentru planet-latest.osm.pbf.

Care este rolul tehnologiei Spark în această poveste?

Convertorul menționat mai sus are ca intrare un singur fișier și nu doar convertește datele, ci le și împarte în trei fișiere, unul pentru fiecare tip de entitate OSM - fiecare fișier reprezentând în fond o colecție de date structurate (un tabel). Schemele tabelelor sunt următoarele:

node
 |-- id: long
 |-- version: integer
 |-- timestamp: long
 |-- changeset: long
 |-- uid: integer
 |-- user_sid: string
 |-- tags: array
 |    |-- element: struct
 |    |    |-- key: string
 |    |    |-- value: string
 |-- latitude: double
 |-- longitude: double

way
 |-- id: long
 |-- version: integer
 |-- timestamp: long
 |-- changeset: long
 |-- uid: integer
 |-- user_sid: string
 |-- tags: array
 |    |-- element: struct
 |    |    |-- key: string
 |    |    |-- value: string
 |-- nodes: array
 |    |-- element: struct
 |    |    |-- index: integer
 |    |    |-- nodeId: long

relation
 |-- id: long
 |-- version: integer
 |-- timestamp: long
 |-- changeset: long
 |-- uid: integer
 |-- user_sid: string
 |-- tags: array
 |    |-- element: struct
 |    |    |-- key: string
 |    |    |-- value: string
 |-- members: array
 |    |-- element: struct
 |    |    |-- id: long
 |    |    |-- role: string
 |    |    |-- type: string

Încărcarea datelor în Apache Spark devine acum extrem de simplă:

val nodeDF = sqlContext.read.parquet(
  "romania.osm.pbf.node.parquet")

nodeDF.createOrReplaceTempView("nodes")

val wayDF = sqlContext.read.parquet(
  "romania.osm.pbf.way.parquet")

wayDF.createOrReplaceTempView("ways")

val relationDF = sqlContext.read.parquet(
  "romania.osm.pbf.relation.parquet")

relationDF.createOrReplaceTempView("relations")

Să considerăm următoarea cerință:

Pentru cei mai activi contributori OSM, să se prezinte distribuția muncii acestora în timp.

În cazul utilizării Spark DataFrames API, o posibilă soluție este următoarea:

val nodeDF = nodeDF
  .withColumn("created_at", ($"timestamp" / 1000)
  .cast(TimestampType))
  .createOrReplaceTempView("nodes")

val top10Users = nodeDF.groupBy("user_sid")
  .agg(count($"id").as("node_count"))
  .orderBy($"node_count".desc)
  .limit(10)
  .collect
  .map({ case Row(user_sid: String, _) => user_sid })

nodeDF.filter($"user_sid".in(top10Users: _*))
  .groupBy($"user_sid", year($"created_at").as("year"))
  .agg(count("id").as("node_count"))
  .orderBy($"year")
  .registerTempTable("top10UsersOverTime")

Dacă se folosește Spark SQL:

select 
  user_sid, 
  year(created_at)) as year,
  count(*) as node_count
from 
  nodes
where 
  user_sid in (
      select user_sid from (
        select 
          user_sid, 
          count(*) as c 
        from 
          nodes 
        group by 
          user_sid 
        order by 
          c desc 
        limit 10
      )
  )
group by 
  user_sid, 
  year(created_at)
order by 
  year

Ambele soluții sunt echivalente și returnează rezultatele din figura de mai jos. Chiar dacă exemplele au fost ilustrate doar în cadrul unei zone restrânse, de interes pentru comunitatea locală de editori OSM, nimic nu ne oprește acum să executam exact aceleași interogări în mod scalabil pentru toată planeta.