Skip to main content

Mutable Ideas

Making Hadoop 2.6 + Spark-Cassandra driver play nice together

We have been using Spark Standalone deploy for more than one year now, but recently I tried to use Azure’s HDInsight which runs on Hadoop 2.6 (YARN deploy).

After provisioning the servers, all small tests worked fine, I have been able to run Spark-Shell, read and write to Blob Storage, until I tried to write to Datastax Cassandra cluster which constantly returned a error message: Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {10.0.1.4}:9042

stack trace

I have to confess I am a little bit ashamed of spending a lot of time on the Failed to open native connection to Cassandra at {10.0.1.4}:9042 error part, instead of looking for the original cause, which was: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback

Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {10.0.1.4}:9042
	at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
	at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
	at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
	at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
	at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
	at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
	at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:241)
	at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:263)
	at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
	at App$.main(App.scala:30)
	at App.main(App.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/ListenableFuture;
	at com.datastax.driver.core.Connection.initAsync(Connection.java:176)
	at com.datastax.driver.core.Connection$Factory.open(Connection.java:724)
	at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:250)
	at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:196)
	at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:84)
	at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1269)
	at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:338)
	at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
	... 21 more

After spending a few hours making sure the network part was indeed correctly configured, I starting looking for the error com.google.common.util.concurrent.Futures.withFallback, I found a lot people asking questions about:

And the most significant discussions that led me to the solution were:

# Solution

After reading those tickets the solution was pretty obvious, all I had to do was to shade Guava on my package, which is pretty simple considering I’m using the latest version of sbt-assembly. All I had to do was change the references of com.google.** to rename classes as shadeio.** and everything worked as it should be!

Below a snippet of my project/assembly.sbt and build.sbt files:

// project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
// build.sbt

import sbt.Keys._

name := "spark-dse-test"

version := "1.0"

scalaVersion := "2.10.5"

scalacOptions := Seq("-deprecation", "-unchecked", "-feature")

libraryDependencies ++= Seq(
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M2",
  "org.apache.spark" %% "spark-core" % "1.5.1" % "provided",
  "org.apache.spark" %% "spark-sql" % "1.5.1" % "provided"
)

// There is a conflict between Guava versions on Cassandra Drive and Hadoop
// Shading Guava Package
assemblyShadeRules in assembly := Seq(
  ShadeRule.rename("com.google.**" -> "shadeio.@1").inAll
)

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
  case _ => MergeStrategy.first
}

I hope this make fixing this issue easier than it was to me :)

.