OpenStreetMap este o hartă considerată o Wikipedia a hărților. Există foarte multă informație online despre acest proiect, chiar și cărți pe această temă, dar nu vom discuta despre istoria OSM. Ne propunem să vă oferim o descriere generală a setului impresionant de date care stă la baza OSM și a modului în care o astfel de hartă mare poate fi analizată prin intermediul unor tehnologii moderne precum Apache Spark.
Unul din motivele pentru care OSM a ajuns până în acest punct este faptul că este un model simplu pentru reprezentarea datelor. Există doar trei tipuri de entități:
Fiecare entitate poate avea mai multe etichete (tags) (de exemplu, o cale marcată cu highway=residential ne arată că e vorba de un drum rezidențial).
De asemenea, se stochează informație adițională despre utilizatorul care a adăugat fiecare entitate, cât și momentul adăugării acesteia.
Datele sunt disponibile, în două formate:
Proiectul OSMstats arată evoluția zilnică a hărții.
Poate fi dificil să începeți să lucrați cu OpenStreetMap la scară mare. Acum câțiva ani eram uimiți să vedem colegii așteptând ore sau zile pentru a încărca părți din OSM în PostgreSQL pe mașini imense. Era totuși normal ... pentru că nu era Big Data.
Între timp, am început să lucrăm cu diverse analize geo-spațiale în cadrul unor proiecte experimentale folosind tehnologii Big Data din lumea Hadoop. Dar din nou, felul în care sunt organizate datele OSM ne-a constrâns să folosim o unealtă numită osmosis rulată pe un fișier uriaș PBF pentru a extrage doar anumite părți de interes și salvarea acestora în fișiere CSV, ca ulterior acestea să fie folosite în cadrul unor joburi MapReduce. Chiar dacă acest lucru funcționează, nu are cel mai bun randament.
Ulterior, când am început să utilizăm la scară mare Apache Spark, aveam nevoie de o soluție generală pentru a reprezenta toate datele OSM. Soluția pe care am ales-o, a fost să folosim un format în coloană numit Apache Parquet.
Convertorul este disponibil la adresa: github.com/adrianulbona/osm-parquetizer
Mai puțin de un minut pentru romania-latest.osm.pbf și ~3 ore (pe un laptop cu SSD) pentru planet-latest.osm.pbf.
Convertorul menționat mai sus are ca intrare un singur fișier și nu doar convertește datele, ci le și împarte în trei fișiere, unul pentru fiecare tip de entitate OSM - fiecare fișier reprezentând în fond o colecție de date structurate (un tabel). Schemele tabelelor sunt următoarele:
node
|-- id: long
|-- version: integer
|-- timestamp: long
|-- changeset: long
|-- uid: integer
|-- user_sid: string
|-- tags: array
| |-- element: struct
| | |-- key: string
| | |-- value: string
|-- latitude: double
|-- longitude: double
way
|-- id: long
|-- version: integer
|-- timestamp: long
|-- changeset: long
|-- uid: integer
|-- user_sid: string
|-- tags: array
| |-- element: struct
| | |-- key: string
| | |-- value: string
|-- nodes: array
| |-- element: struct
| | |-- index: integer
| | |-- nodeId: long
relation
|-- id: long
|-- version: integer
|-- timestamp: long
|-- changeset: long
|-- uid: integer
|-- user_sid: string
|-- tags: array
| |-- element: struct
| | |-- key: string
| | |-- value: string
|-- members: array
| |-- element: struct
| | |-- id: long
| | |-- role: string
| | |-- type: string
Încărcarea datelor în Apache Spark devine acum extrem de simplă:
val nodeDF = sqlContext.read.parquet(
"romania.osm.pbf.node.parquet")
nodeDF.createOrReplaceTempView("nodes")
val wayDF = sqlContext.read.parquet(
"romania.osm.pbf.way.parquet")
wayDF.createOrReplaceTempView("ways")
val relationDF = sqlContext.read.parquet(
"romania.osm.pbf.relation.parquet")
relationDF.createOrReplaceTempView("relations")
Să considerăm următoarea cerință:
Pentru cei mai activi contributori OSM, să se prezinte distribuția muncii acestora în timp.
În cazul utilizării Spark DataFrames API, o posibilă soluție este următoarea:
val nodeDF = nodeDF
.withColumn("created_at", ($"timestamp" / 1000)
.cast(TimestampType))
.createOrReplaceTempView("nodes")
val top10Users = nodeDF.groupBy("user_sid")
.agg(count($"id").as("node_count"))
.orderBy($"node_count".desc)
.limit(10)
.collect
.map({ case Row(user_sid: String, _) => user_sid })
nodeDF.filter($"user_sid".in(top10Users: _*))
.groupBy($"user_sid", year($"created_at").as("year"))
.agg(count("id").as("node_count"))
.orderBy($"year")
.registerTempTable("top10UsersOverTime")
Dacă se folosește Spark SQL:
select
user_sid,
year(created_at)) as year,
count(*) as node_count
from
nodes
where
user_sid in (
select user_sid from (
select
user_sid,
count(*) as c
from
nodes
group by
user_sid
order by
c desc
limit 10
)
)
group by
user_sid,
year(created_at)
order by
year
Ambele soluții sunt echivalente și returnează rezultatele din figura de mai jos. Chiar dacă exemplele au fost ilustrate doar în cadrul unei zone restrânse, de interes pentru comunitatea locală de editori OSM, nimic nu ne oprește acum să executam exact aceleași interogări în mod scalabil pentru toată planeta.
de Ovidiu Mățan
de Ovidiu Mățan
de Dan Sabadis