Processing Twitter’s top stories with Apache Spark (part 1)

Since I got to know it surprises me with its simplicity and yet power to recommend me the best stories to read. I always thought it would be fun try to build something similar. So I decided to create a PoC of Twitter’s top stories using Apache Spark.

DISCLAIMER: this is a PoC, mainly focused on learning Spark, this architecture doesn’t represent a production level product neither I consider recommending stories for only one user as a big data problem.

# TL;DR;

The code is available at Github, just create a valid Twitter API credentials and you can run it.

# Solution

The solution is based on two stages:

  1. Collect tweets from the stream, analize them and store those tweets whthat contains a link, expanding the link to its final destination (removing shortening and click counters)

  2. Run a batch job to process the data from previous stage and create a top 10 list


# Building & Running

I use Eclipse + Scala and typesafe plugin sbteclipse to create an Eclipse project.

$ git clone [email protected]:arjones/spark-news.git
$ cd spark-news

Now edit src/main/resources/ adding your credentials and rename it to src/main/resources/

twitter4j.oauth.consumerKey= //
twitter4j.oauth.consumerSecret= //
twitter4j.oauth.accessToken= //
twitter4j.oauth.accessTokenSecret= //

Let’s build our project:

$ sbt/sbt
> eclipse with-source=true
> assembly
> exit

And run it:

$ spark-submit \
 --class \
 target/scala-2.10/SparkNews-assembly-0.1-SNAPSHOT.jar \

Edit following.txt adding accounts that you find interesting!

If everything is working properly, each 5 minutes you going to see a new folders at ./urls/999999999/ the numbers represent the unix timestamp, rounded down to minute. For example:

urls/1404627900/ --> 7/5/2014 11:25:00 PM GMT-7

You can check the activity, using Spark UI

# Internals

Setup a StreamingContext with a 5 minutes window, load the accounts and create the Twitter Stream

// Setup the Streaming Context
val ssc = new StreamingContext(new SparkConf(), Seconds(300))
val tweets = TwitterUtils.createStream(ssc, None, followingList)

For each Tweet that contains a URL, extract it and if there are more than one url, extracts only the first:

// Consider only 1st URL on the Tweet
val url = URLExpander.expandUrl(status.getURLEntities.head.getExpandedURL)

Use SparkSQL to implicit convert a RDD[TweetContent] into a Schema RDD:

val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
import sqlContext._

So we can go through each SchemaRDD and saveAsParquet to disk

urlsDStream.foreach { schemaRDD =>
  links.saveAsParquetFile("urls/" + folderTimestamp) + ".parquet"

# What’s next?

Leave this code running for a few hours so we have data for the next part, which involves calculating the best stories!