Monday, January 26, 2015

Spark + Cassandra: The basics + connecting to Cassandra from spark-shell

A lot of people are getting excited about Apache Spark. The release of the open source Cassandra connector makes a technology like Spark even more accessible. Previously to get going you'd need a Hadoop infrastructure, now you can do away with all that and start using Spark directly against Cassandra, no HDFS required.

My last two posts on the topic were all about setting up a vagrant VM with Cassandra and Spark installed. That's all well and good if you already working in the JVM ecosystem, you know what vagrant and Ansible are and you love your self a bit of SBT, but now it is time to take a step back. This post is aimed at getting you started with Spark and Cassandra without the assumption you know what sbt assembly means! By the end of this one the goal is to be able to execute (and understand what is going on) Spark/Cassandra jobs in the Spark REPL, and the next article will be submitting a standalone job.

I'll assume you have Cassandra installed and that you have downloaded a Spark bundle from their website. It doesn't matter which version of Hadoop it has been built against as we're not going to use Hadoop. If you don't have a locally running Cassandra instance I suggest you just use the VM from the previous article, follow this article for Ubuntu, use homebrew if you are on Mac OSX or if all else fails just download the zip from the Apache website.

So first things first... why should you be excited about his?

  • If you're already using Cassandra your data is already distributed and replicated, the Cassandra connector for Spark is aware of this distribution and can bring the computation to the data, this means it is going to be FAST
  • Scala and the JVM might seem scary at first but Scala is an awesome language for writing data transformations
  • The Spark-Shell: this is a REPL we can use for testing out code, simply put: it is awesome
  • Spark can also connect with other data sources: files, RDMSs etc, which means you can do analytics combining data in Cassandra and systems like MySQL
  • Spark also supports streaming, meaning we can combine new data in semi-real time with out batch processing
  • Most importantly, you don't need to extract-transform-load your data from your operational database and put it in your batch processing system e.g. Hadoop

Lets get going

So what do we need to get all this magic working?
  • Java - Java 7 or 8
  • SBT - any 0.13.* will work. This is build tool used by the majority of Scala projects (Spark be Scala)
  • Scala - Spark doesn't officially support 2.11 yet so get 2.10
  • A Cassandra cluster
  • A Spark installation (we're going simple this time so all on one computer)
  • The Cassandra Spark connector with all of its dependencies bundled on the the classpath of the spark-shell for interactive use
  • A fat jar with all our dependencies if we want to submit a job to a cluster (for the next post)

Hold up: jargon alert. Bundled dependencies, classpath? Fatman?

Both Cassandra and Spark run on the JVM, we don't really care about Cassandra and we're not submitting code to run inside Cassandra, but that is exactly what we're going to do with Spark.

That means all the code and libraries that we use are going to have to go everywhere our computation goes. This is because Spark distributes your computation across a cluster of computers. So we have to be kind and bundle all our code + all the dependencies we use (other jar files e.g for logging). The JVM classpath is just how you tell the JVM where all your jars are. 

Getting the Spark-Cassandra connector on the classpath

If you're from JVM land you probably are used to doing things like "just build a fat jar and put it on the classpath" if you're not then that is just a lot of funny words. So the connector is not part of core Spark, so you can't use it by default in the spark-shell. To do that you need to put the connector and all its dependencies on the classpath for the spark-shell. This sounds tedious right? You'd have to go and look at the build system of the connector and work out what it depends on. Welcome to JVM dependency hell. 

SBT, Maven, Gradle to the rescue (sort of). Virtually all JVM languages have a build system that allow you to declare dependencies, then it is the build system's responsibility to go get them from magic online locations (maven central) when you build your project. In Scala land this is SBT + Ivy.

When you come to distribute a JVM based application it is very kind to your users to build a far jat, or an "executable jar". This contains your code + all your dependencies so that it runs by its self, well apart from depending on a Java Runtime. 

So what we need to do is take the connector and use SBT + the assembly plugin to build our selves a fat jar. The Spark-Cassandra connector already has all the necesary config in its build scripts so we're just going to check it out and run "sbt assembly".

Lets take this line by line:
  1. Line 1: Clone the Spark-Connector repo
  2. Line 11: Run the SBT assembly command
  3. Wait for ages
  4. Line 14: Tells us where SBT has put the fat jar
Now it is time to use this jar in the Spark Shell:

Nothing fancy here, just gone into the bin directory of where I unzipped Spark and ran spark-shell --help. The option we're looking for is --jars. This is how we add our magical fat jar onto the classpath of the spark-shell. If we hadn't built a fat jar we'd be adding 10s of jars here!

However before we launch spark-shell we're going to add some properties to tell spark where Cassandra is, in the file (you'll need to create it): {Spark Install}/conf/spark-defaults.conf add:

Replace the IP with localhost if your Cassandra cluster is running locally. Then start up Spark-shell with the --jars option:

Now lets look at the important bits:
  1. Line 1: Starting spark-shell with --jars pointing to the fat jar we built
  2. Line 10: Spark confirming that it has picked up the connector fat jar
  3. Line 11: Spark confirming that it has created us a SparkContext
  4. Line 13: Import the connector classes, Scala has the ability to extend existing classes. The effect of this import is that we now have cassandra methods on our SparkConext
  5. Line 16: Create a Spark RDD from a Cassandra table "kv" in the "test" keyspace
  6. Line 19: Turn the RDD into an array (forcing it to complete the execution) and print the rows
Well that's all folks, next post will be about submitting jobs rather than using the spark-shell.

Wednesday, January 21, 2015

Spark 1.2 with Cassandra 2.1: Setting up a SparkSQL + Cassandra environment

In my previous post on Cassandra and Spark I showed how to get a development environment setup with Vagrant/Ansible/VirtualBox without installing Cassandra/Spark on your dev station.

This update will get us to a point where we can run SQL (yes, SQL, not CQL) on Cassandra. It is just a trivial example to show the setup working.

The previous article was back in the days of Spark 1.0. With Spark 1.1+ we can now run SparkSQL directly against Cassandra.

I've updated the Vagrant/Ansible provisioning to install Spark 1.2 and Cassandra 2.1, and I've added a new "fatjar" with the latest Cassandra Spark connector so that we can use it in the Spark shell and show this magic working. The 1.2 connector isn't released yet so I have built against the Alpha, see here for details. We're just that cutting edge here...

So, once you have ran vagrant up (this will take a while as it downloads + install all of the above) you'll need to SSH in we can get into Spark shell.

I've setup the following alias so no worrying about classpaths:

alias spark-shell='spark-shell --jars /vagrant/spark-connector-1.2.0-alpha1-driver-2.1.4-1.0.0-SNAPSHOT-fat.jar

First lets jump into cqlsh and create a keyspace and table to play with:

Not the most exciting schema you'll ever see, but this is all about getting the Hello World of SparkSQL on Cassandra working!

Now we have some data to play with lets access it from Spark shell.

Lets go through what has happened here:

  • Lines 3-6: Mandatory Spark ASCII art
  • Line 12: Import the connector so we can access Cassandra
  • Line 15: Create a CassandraSQLContext
  • Line 18: Set it to our test Keyspace we created above
  • Line 20: Select the whole table (very exciting I know!)
  • Line 21: Get Spark to execute an action so all the magic happens

That's all for now, tune in later for a more complicated example :)

Here's the link to all the Vagrant/Ansible code.

Tuesday, January 6, 2015

Wiremock: Now with extension points (open source == awesome)

I have been using Wiremock as my preferred HTTP test double for some time now. I think it is a fantastic tool and I mentioned it quite a lot at a talk I gave at Skills matter and it turned out the author, Tom Akehurst, was in the audience.

Shamefully I had a private fork of Wiremock at the company I worked for, we'd hacked away at it and added support for copying our platform headers, adding our HMAC signatures to responses etc. We'd also used it for load testing and made a bunch of the Jetty tuning options configurable. Some of this, HMAC, was confidential, 90% not so much :)

So over the Christmas holidays, with the help of Tom, I've been hacking away with Wiremock, and the new release now contains:
  • Configurable number of container threads
  • Exposed Jetty tuning options: Acceptor threads & Accept queue size
  • Extension points
The first two were my PRs, the latter was by Tom, who (rightly) rejected my PR as it added too much latency to start up as it was reflection based. But kindly Tom hashed out an alternative documented here:

If you've used Wiremock before you'll know you run/interact it in two modes: via its Java API and as a standalone process. This means you can use it for unit/integration testing and black box acceptance testing. Let's look with the Java API, how to use this feature in standalone mode is documented on the Wiremock site:

This is the class you extend to extend Wiremock and here is a simple implementation that copies over headers that begin with Batey, this example is inspired by a platform requirement to copy over all platform headers when dealing with requests.

Simple! Now to use it from the Java API you add the following to your stubbing:

The name, CopiesBateyHeaders, in your implementation needs to match the stubbing. We can now test a piece of code that looks like this:

For both cases: When the dependency does copy the header over and when it doesn't. Here is the test for does:

And doesn't:

Now you're probably thinking we could have just primed this right?

Well I hate noise in tests, and we want a single test making sure we throw an error if the header isn't copied but for all the rest of the behaviour (obviously there isn't any in this example) we can now forget about the fact our dependency should copy the headers, thus reducing noise in the priming of all our other tests.

I find this particularly important in black box acceptance tests, which often get very noisy.

I love open source :) All the code for this example is on my github here.