Monday, January 26, 2015

Spark + Cassandra: The basics + connecting to Cassandra from spark-shell

A lot of people are getting excited about Apache Spark. The release of the open source Cassandra connector makes a technology like Spark even more accessible. Previously to get going you'd need a Hadoop infrastructure, now you can do away with all that and start using Spark directly against Cassandra, no HDFS required.

My last two posts on the topic were all about setting up a vagrant VM with Cassandra and Spark installed. That's all well and good if you already working in the JVM ecosystem, you know what vagrant and Ansible are and you love your self a bit of SBT, but now it is time to take a step back. This post is aimed at getting you started with Spark and Cassandra without the assumption you know what sbt assembly means! By the end of this one the goal is to be able to execute (and understand what is going on) Spark/Cassandra jobs in the Spark REPL, and the next article will be submitting a standalone job.

I'll assume you have Cassandra installed and that you have downloaded a Spark bundle from their website. It doesn't matter which version of Hadoop it has been built against as we're not going to use Hadoop. If you don't have a locally running Cassandra instance I suggest you just use the VM from the previous article, follow this article for Ubuntu, use homebrew if you are on Mac OSX or if all else fails just download the zip from the Apache website.

So first things first... why should you be excited about his?

  • If you're already using Cassandra your data is already distributed and replicated, the Cassandra connector for Spark is aware of this distribution and can bring the computation to the data, this means it is going to be FAST
  • Scala and the JVM might seem scary at first but Scala is an awesome language for writing data transformations
  • The Spark-Shell: this is a REPL we can use for testing out code, simply put: it is awesome
  • Spark can also connect with other data sources: files, RDMSs etc, which means you can do analytics combining data in Cassandra and systems like MySQL
  • Spark also supports streaming, meaning we can combine new data in semi-real time with out batch processing
  • Most importantly, you don't need to extract-transform-load your data from your operational database and put it in your batch processing system e.g. Hadoop

Lets get going

So what do we need to get all this magic working?
  • Java - Java 7 or 8
  • SBT - any 0.13.* will work. This is build tool used by the majority of Scala projects (Spark be Scala)
  • Scala - Spark doesn't officially support 2.11 yet so get 2.10
  • A Cassandra cluster
  • A Spark installation (we're going simple this time so all on one computer)
  • The Cassandra Spark connector with all of its dependencies bundled on the the classpath of the spark-shell for interactive use
  • A fat jar with all our dependencies if we want to submit a job to a cluster (for the next post)

Hold up: jargon alert. Bundled dependencies, classpath? Fatman?

Both Cassandra and Spark run on the JVM, we don't really care about Cassandra and we're not submitting code to run inside Cassandra, but that is exactly what we're going to do with Spark.

That means all the code and libraries that we use are going to have to go everywhere our computation goes. This is because Spark distributes your computation across a cluster of computers. So we have to be kind and bundle all our code + all the dependencies we use (other jar files e.g for logging). The JVM classpath is just how you tell the JVM where all your jars are. 

Getting the Spark-Cassandra connector on the classpath


If you're from JVM land you probably are used to doing things like "just build a fat jar and put it on the classpath" if you're not then that is just a lot of funny words. So the connector is not part of core Spark, so you can't use it by default in the spark-shell. To do that you need to put the connector and all its dependencies on the classpath for the spark-shell. This sounds tedious right? You'd have to go and look at the build system of the connector and work out what it depends on. Welcome to JVM dependency hell. 

SBT, Maven, Gradle to the rescue (sort of). Virtually all JVM languages have a build system that allow you to declare dependencies, then it is the build system's responsibility to go get them from magic online locations (maven central) when you build your project. In Scala land this is SBT + Ivy.

When you come to distribute a JVM based application it is very kind to your users to build a far jat, or an "executable jar". This contains your code + all your dependencies so that it runs by its self, well apart from depending on a Java Runtime. 

So what we need to do is take the connector and use SBT + the assembly plugin to build our selves a fat jar. The Spark-Cassandra connector already has all the necesary config in its build scripts so we're just going to check it out and run "sbt assembly".



Lets take this line by line:
  1. Line 1: Clone the Spark-Connector repo
  2. Line 11: Run the SBT assembly command
  3. Wait for ages
  4. Line 14: Tells us where SBT has put the fat jar
Now it is time to use this jar in the Spark Shell:


Nothing fancy here, just gone into the bin directory of where I unzipped Spark and ran spark-shell --help. The option we're looking for is --jars. This is how we add our magical fat jar onto the classpath of the spark-shell. If we hadn't built a fat jar we'd be adding 10s of jars here!

However before we launch spark-shell we're going to add some properties to tell spark where Cassandra is, in the file (you'll need to create it): {Spark Install}/conf/spark-defaults.conf add:

 spark.cassandra.connection.host=192.168.10.11

Replace the IP with localhost if your Cassandra cluster is running locally. Then start up Spark-shell with the --jars option:

Now lets look at the important bits:
  1. Line 1: Starting spark-shell with --jars pointing to the fat jar we built
  2. Line 10: Spark confirming that it has picked up the connector fat jar
  3. Line 11: Spark confirming that it has created us a SparkContext
  4. Line 13: Import the connector classes, Scala has the ability to extend existing classes. The effect of this import is that we now have cassandra methods on our SparkConext
  5. Line 16: Create a Spark RDD from a Cassandra table "kv" in the "test" keyspace
  6. Line 19: Turn the RDD into an array (forcing it to complete the execution) and print the rows
Well that's all folks, next post will be about submitting jobs rather than using the spark-shell.



8 comments:

oleksii.mdr said...

This is a lot of serious work you putting in. My phone suggested me to have a look at your blog O_o. Such a good find! Will have to go back and read all the posts. I appreciate this.

Krutz Murks said...

Hay Christopher thanks for the great post, it's a nice mixture between tldr-parts and description!

One thing, "git clone git@github.com:datastax/spark-cassandra-connector.git" did not work in my case (was like -> "Permission denied"). Instead of using git-protocol i switched to https, which worked out:
git clone https://github.com/datastax/spark-cassandra-connector.git

Krutz Murks said...

One more note:

If anyone receives an " java.lang.NoSuchMethodError getInputMetricsForReadMethod" exception, the current version from the git repository is 1.3, compared to the compatible versions (https://github.com/datastax/spark-cassandra-connector#version-compatibility) 1.3 is not compatible to anything.

In order to fix this, switch to the version 1.2.1-branch by:
git checkout tags/v1.2.1
afterwards you can start the build process with sbt assembly

Christopher Batey said...

Thanks Krutz

Chaman Jhinga said...

I wasn't able to 'sbt assembly'.
Kept getting "[error] (spark-cassandra-connector-java/*:assembly) deduplicate: different file contents found "
Then I switched to tags/v1.2.1 and it compiled.
Thanks Krutz! :-)

Saurav Kumar said...

hi,
I am using cqlsh 5.0.1 , spark 1.4 and datastax connector. But i am getting error :

error: bad symbolic reference. A signature in CassandraTableScanRDD.class refers to term driver
in package com.datastax which is not available.
It may be completely missing from the current classpath, or the version on
the classpath might be incompatible with the version used when compiling CassandraTableScanRDD.class.

Thanks.:)

Raja said...

Thanks for detailed post it works like a charm for me.

Sthitaprajna Sahoo said...

Thanks for the detailed explanation and it helps novice non-java guys like me.