Processing JSON with Spark SQL
After our first day on Spark Summit 2014 I was very excited to try Spark SQL with JSON manipulation. So I download and compile the SNAPSHOT version of Spark to try this feature.
[EDIT]: Thanks to @BigsnarfDude, he posted about the same topic and referred to SQL json-datasets 1.0.1-rc1-docs. Although I still couldn’t figure out how to register a function on Spark SQL.
$ git clone [email protected]:apache/spark.git
$ cd spark/
$ ./sbt/sbt assembly
$ ./bin/spak-shell
And I did a small example, processing a JSON containing Facebook FQL data - just to try something different of Twitter Statuses JSON ;)
It was incredible easy to process even those complex JSON, thanks to the Schema inference for JSON available on sqlContext
.
I’ve been able to do a quick Likes by Day count and save it to Parquet format in just a few lines of code.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val postsRDD = sqlContext.jsonFile("/Users/arjones/Projects/spark-summit/mydata/facebook/posts/2014/05/14/14/facebook.1400076455720")
// Checking the Schema
println(postsRDD.schemaString)
// register SchemaRDD as table
postsRDD.registerAsTable("posts")
// run a SQL query and creates another SchemaRDD
val likesPerPost = sql("SELECT ((smx.crawled_time / 3600) * 3600) AS crawled_at, like_info.like_count AS likes FROM posts WHERE like_info.like_count > 0").cache()
likesPerPost.registerAsTable("likesPerPost")
// Run a aggregation query
val likesPerHour = sql("""
SELECT
crawled_at,
SUM(likes) AS likes
FROM likesPerPost
GROUP BY crawled_at
""")
// print results
likesPerHour.collect().foreach(println)
// save them to a Parquet file
likesPerHour.saveAsParquetFile("fb-posts-by-hour.parquet")
There where two things I couldn’t figure out so far:
- I’m using
.cache()
method but looking into the logs, seems Spark is still reading JSON file each time I run any command. - I couldn’t register a function to round date by hour, I couldn’t find it even grep’ing the snapshot code base. ie:
$ grep -r -i "registerFunction" *
Here is the shell transcription: