Making Hadoop 2.6 + Spark-Cassandra driver play nice together
We have been using Spark Standalone deploy for more than one year now, but recently I tried to use Azure’s HDInsight which runs on Hadoop 2.6 (YARN deploy).
After provisioning the servers, all small tests worked fine, I have been able to run Spark-Shell, read and write to Blob Storage, until I tried to write to Datastax Cassandra cluster which constantly returned a error message: Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {10.0.1.4}:9042
I have to confess I am a little bit ashamed of spending a lot of time on the Failed to open native connection to Cassandra at {10.0.1.4}:9042
error part, instead of looking for the original cause, which was: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback
Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {10.0.1.4}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:241)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:263)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at App$.main(App.scala:30)
at App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/ListenableFuture;
at com.datastax.driver.core.Connection.initAsync(Connection.java:176)
at com.datastax.driver.core.Connection$Factory.open(Connection.java:724)
at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:250)
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:196)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:84)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1269)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:338)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
... 21 more
After spending a few hours making sure the network part was indeed correctly configured, I starting looking for the error com.google.common.util.concurrent.Futures.withFallback
, I found a lot people asking questions about:
-
Datastax Group: Compatibility with Spark 1.4.1 and Hadoop 2.6?
-
SO: Running spark job using Yarn giving error:com.google.common.util.concurrent.Futures.withFallback
And the most significant discussions that led me to the solution were:
#
Solution
After reading those tickets the solution was pretty obvious, all I had to do was to shade Guava on my package, which is pretty simple considering I’m using the latest version of sbt-assembly. All I had to do was change the references of com.google.**
to rename classes as shadeio.**
and everything worked as it should be!
Below a snippet of my project/assembly.sbt
and build.sbt
files:
// project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
// build.sbt
import sbt.Keys._
name := "spark-dse-test"
version := "1.0"
scalaVersion := "2.10.5"
scalacOptions := Seq("-deprecation", "-unchecked", "-feature")
libraryDependencies ++= Seq(
"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M2",
"org.apache.spark" %% "spark-core" % "1.5.1" % "provided",
"org.apache.spark" %% "spark-sql" % "1.5.1" % "provided"
)
// There is a conflict between Guava versions on Cassandra Drive and Hadoop
// Shading Guava Package
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("com.google.**" -> "shadeio.@1").inAll
)
assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
assemblyMergeStrategy in assembly := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case _ => MergeStrategy.first
}
I hope this make fixing this issue easier than it was to me :)
.