Monday, January 26, 2015

Spark + Cassandra: The basics + connecting to Cassandra from spark-shell

A lot of people are getting excited about Apache Spark. The release of the open source Cassandra connector makes a technology like Spark even more accessible. Previously to get going you'd need a Hadoop infrastructure, now you can do away with all that and start using Spark directly against Cassandra, no HDFS required.

My last two posts on the topic were all about setting up a vagrant VM with Cassandra and Spark installed. That's all well and good if you already working in the JVM ecosystem, you know what vagrant and Ansible are and you love your self a bit of SBT, but now it is time to take a step back. This post is aimed at getting you started with Spark and Cassandra without the assumption you know what sbt assembly means! By the end of this one the goal is to be able to execute (and understand what is going on) Spark/Cassandra jobs in the Spark REPL, and the next article will be submitting a standalone job.

I'll assume you have Cassandra installed and that you have downloaded a Spark bundle from their website. It doesn't matter which version of Hadoop it has been built against as we're not going to use Hadoop. If you don't have a locally running Cassandra instance I suggest you just use the VM from the previous article, follow this article for Ubuntu, use homebrew if you are on Mac OSX or if all else fails just download the zip from the Apache website.

So first things first... why should you be excited about his?

  • If you're already using Cassandra your data is already distributed and replicated, the Cassandra connector for Spark is aware of this distribution and can bring the computation to the data, this means it is going to be FAST
  • Scala and the JVM might seem scary at first but Scala is an awesome language for writing data transformations
  • The Spark-Shell: this is a REPL we can use for testing out code, simply put: it is awesome
  • Spark can also connect with other data sources: files, RDMSs etc, which means you can do analytics combining data in Cassandra and systems like MySQL
  • Spark also supports streaming, meaning we can combine new data in semi-real time with out batch processing
  • Most importantly, you don't need to extract-transform-load your data from your operational database and put it in your batch processing system e.g. Hadoop

Lets get going

So what do we need to get all this magic working?
  • Java - Java 7 or 8
  • SBT - any 0.13.* will work. This is build tool used by the majority of Scala projects (Spark be Scala)
  • Scala - Spark doesn't officially support 2.11 yet so get 2.10
  • A Cassandra cluster
  • A Spark installation (we're going simple this time so all on one computer)
  • The Cassandra Spark connector with all of its dependencies bundled on the the classpath of the spark-shell for interactive use
  • A fat jar with all our dependencies if we want to submit a job to a cluster (for the next post)

Hold up: jargon alert. Bundled dependencies, classpath? Fatman?

Both Cassandra and Spark run on the JVM, we don't really care about Cassandra and we're not submitting code to run inside Cassandra, but that is exactly what we're going to do with Spark.

That means all the code and libraries that we use are going to have to go everywhere our computation goes. This is because Spark distributes your computation across a cluster of computers. So we have to be kind and bundle all our code + all the dependencies we use (other jar files e.g for logging). The JVM classpath is just how you tell the JVM where all your jars are. 

Getting the Spark-Cassandra connector on the classpath


If you're from JVM land you probably are used to doing things like "just build a fat jar and put it on the classpath" if you're not then that is just a lot of funny words. So the connector is not part of core Spark, so you can't use it by default in the spark-shell. To do that you need to put the connector and all its dependencies on the classpath for the spark-shell. This sounds tedious right? You'd have to go and look at the build system of the connector and work out what it depends on. Welcome to JVM dependency hell. 

SBT, Maven, Gradle to the rescue (sort of). Virtually all JVM languages have a build system that allow you to declare dependencies, then it is the build system's responsibility to go get them from magic online locations (maven central) when you build your project. In Scala land this is SBT + Ivy.

When you come to distribute a JVM based application it is very kind to your users to build a far jat, or an "executable jar". This contains your code + all your dependencies so that it runs by its self, well apart from depending on a Java Runtime. 

So what we need to do is take the connector and use SBT + the assembly plugin to build our selves a fat jar. The Spark-Cassandra connector already has all the necesary config in its build scripts so we're just going to check it out and run "sbt assembly".



Lets take this line by line:
  1. Line 1: Clone the Spark-Connector repo
  2. Line 11: Run the SBT assembly command
  3. Wait for ages
  4. Line 14: Tells us where SBT has put the fat jar
Now it is time to use this jar in the Spark Shell:


Nothing fancy here, just gone into the bin directory of where I unzipped Spark and ran spark-shell --help. The option we're looking for is --jars. This is how we add our magical fat jar onto the classpath of the spark-shell. If we hadn't built a fat jar we'd be adding 10s of jars here!

However before we launch spark-shell we're going to add some properties to tell spark where Cassandra is, in the file (you'll need to create it): {Spark Install}/conf/spark-defaults.conf add:

 spark.cassandra.connection.host=192.168.10.11

Replace the IP with localhost if your Cassandra cluster is running locally. Then start up Spark-shell with the --jars option:

Now lets look at the important bits:
  1. Line 1: Starting spark-shell with --jars pointing to the fat jar we built
  2. Line 10: Spark confirming that it has picked up the connector fat jar
  3. Line 11: Spark confirming that it has created us a SparkContext
  4. Line 13: Import the connector classes, Scala has the ability to extend existing classes. The effect of this import is that we now have cassandra methods on our SparkConext
  5. Line 16: Create a Spark RDD from a Cassandra table "kv" in the "test" keyspace
  6. Line 19: Turn the RDD into an array (forcing it to complete the execution) and print the rows
Well that's all folks, next post will be about submitting jobs rather than using the spark-shell.



Wednesday, January 21, 2015

Spark 1.2 with Cassandra 2.1: Setting up a SparkSQL + Cassandra environment

In my previous post on Cassandra and Spark I showed how to get a development environment setup with Vagrant/Ansible/VirtualBox without installing Cassandra/Spark on your dev station.

This update will get us to a point where we can run SQL (yes, SQL, not CQL) on Cassandra. It is just a trivial example to show the setup working.

The previous article was back in the days of Spark 1.0. With Spark 1.1+ we can now run SparkSQL directly against Cassandra.

I've updated the Vagrant/Ansible provisioning to install Spark 1.2 and Cassandra 2.1, and I've added a new "fatjar" with the latest Cassandra Spark connector so that we can use it in the Spark shell and show this magic working. The 1.2 connector isn't released yet so I have built against the Alpha, see here for details. We're just that cutting edge here...

So, once you have ran vagrant up (this will take a while as it downloads + install all of the above) you'll need to SSH in we can get into Spark shell.

I've setup the following alias so no worrying about classpaths:

alias spark-shell='spark-shell --jars /vagrant/spark-connector-1.2.0-alpha1-driver-2.1.4-1.0.0-SNAPSHOT-fat.jar

First lets jump into cqlsh and create a keyspace and table to play with:


Not the most exciting schema you'll ever see, but this is all about getting the Hello World of SparkSQL on Cassandra working!

Now we have some data to play with lets access it from Spark shell.


Lets go through what has happened here:

  • Lines 3-6: Mandatory Spark ASCII art
  • Line 12: Import the connector so we can access Cassandra
  • Line 15: Create a CassandraSQLContext
  • Line 18: Set it to our test Keyspace we created above
  • Line 20: Select the whole table (very exciting I know!)
  • Line 21: Get Spark to execute an action so all the magic happens

That's all for now, tune in later for a more complicated example :)

Here's the link to all the Vagrant/Ansible code.



Tuesday, January 6, 2015

Wiremock: Now with extension points (open source == awesome)

I have been using Wiremock as my preferred HTTP test double for some time now. I think it is a fantastic tool and I mentioned it quite a lot at a talk I gave at Skills matter and it turned out the author, Tom Akehurst, was in the audience.

Shamefully I had a private fork of Wiremock at the company I worked for, we'd hacked away at it and added support for copying our platform headers, adding our HMAC signatures to responses etc. We'd also used it for load testing and made a bunch of the Jetty tuning options configurable. Some of this, HMAC, was confidential, 90% not so much :)

So over the Christmas holidays, with the help of Tom, I've been hacking away with Wiremock, and the new release now contains:
  • Configurable number of container threads
  • Exposed Jetty tuning options: Acceptor threads & Accept queue size
  • Extension points
The first two were my PRs, the latter was by Tom, who (rightly) rejected my PR as it added too much latency to start up as it was reflection based. But kindly Tom hashed out an alternative documented here: https://github.com/tomakehurst/wiremock/issues/214

If you've used Wiremock before you'll know you run/interact it in two modes: via its Java API and as a standalone process. This means you can use it for unit/integration testing and black box acceptance testing. Let's look with the Java API, how to use this feature in standalone mode is documented on the Wiremock site:


This is the class you extend to extend Wiremock and here is a simple implementation that copies over headers that begin with Batey, this example is inspired by a platform requirement to copy over all platform headers when dealing with requests.

Simple! Now to use it from the Java API you add the following to your stubbing:

The name, CopiesBateyHeaders, in your implementation needs to match the stubbing. We can now test a piece of code that looks like this:

For both cases: When the dependency does copy the header over and when it doesn't. Here is the test for does:

And doesn't:

Now you're probably thinking we could have just primed this right?

Well I hate noise in tests, and we want a single test making sure we throw an error if the header isn't copied but for all the rest of the behaviour (obviously there isn't any in this example) we can now forget about the fact our dependency should copy the headers, thus reducing noise in the priming of all our other tests.

I find this particularly important in black box acceptance tests, which often get very noisy.

I love open source :) All the code for this example is on my github here.

Monday, December 22, 2014

Getting started: Cassandra + Spark with Vagrant

I play with a lot of different technologies and I like to keep my work stations clean. I do this by having a lot of vagrant VMs. My latest is Apache Spark with Apache Cassandra. We're going to install a working setup of Cassandra/Spark using Vagrant and Ansible. The Vagrant/Ansible is on Github here.

To get going you'll need:
If you haven't used Ansible before ignore all the paid for Ansible Tower and install it with your favourite package manager e.g homebrew or apt. 

Once that's installed checkout the Vagrant file.

Then launch the VM with vagrant up. This can take some time as it actually installs:
  • Java
  • Cassandra
  • Spark
  • Spark Cassandra connector
I could have baked a virtual box with all this in but the Ansible also documents you install all of these (and me once I've forgotten). As well as being slow it has the disadvantage that if downloads Cassandra/Spark so if their repositories are down it won't work.

The VM runs on port 192.168.10.10. Your Spark master should be up and running on http://192.168.10.10:8080/



You'll also have ops centre installed at: http://192.168.10.10:8888/



To add the cluster simply click "Add existing cluster.." then enter the IP 192.168.10.10

If you want to use cqlsh then simply "vagrant ssh" in and then run "cqlsh 192.168.10.10"


To get spark shell up and running just "vagrant ssh" in and then run the spark-shell command:



Spark shell has been aliased to include the Cassandra spark connector so you can start using Cassandra backed RDDs right away!

Any questions or problems just ping me on twitter: @chbatey



Monday, December 8, 2014

Streaming large payloads over HTTP from Cassandra with a small Heap: Java Driver + RX-Java

Cassandra's normal use case is vast number of small read and write operations distributed equally across the cluster.

However every so often you need to extract a large quantity of data in a single query. For example, say you are storing customer events that you normally query in small slices, however once a day you want to extract all of them for a particular customer. For customers with a lot of events than this could be many hundreds of megabytes of data.

If you want to write a Java application that executes this query against a version of Cassandra prior to 2.0 then you may run into some issues. Let us look at the first one..

Coordinator out of memory:



Previous versions of Cassandra used to bring all of the rows back to the coordinator before sending them to your application, so if the result is too large for the coordinator's heap it would run out of memory.

Let's say you had just enough memory in the coordinator for the result, then you ran the risk of...

Application out of memory:




To get around this you had to implement your own paging, where you split the query into many small queries and processed them in batches. This can be achieved by limiting the results and issuing the next query after the last result of the previous query.

If your application was streaming the results over HTTP then the architecture could look something like this:


Here we place some kind of queue, say an ArrayBlockingQueue if using Java, between the thread executing the queries and the thread streaming it out over HTTP. If the queue fills up the DAO thread is blocked, meaning that it won't bring any more rows from Cassandra. If the DAO gets behind the WEB thread (perhaps a tomcat thread) blocks waiting to get more rows out of the queue. This works very nicely with the JAX-RS StreamingOutput.

This all sounds like a lot of hard work...

The 2.0+ solution


From version 2.0, Cassandra would no longer suffer from the coordinator out of memory. This is because the coordinator pages the response to the driver and doesn't bring the whole result into memory. However if your application reads the whole ResultSet into memory then your application running out of memory is still an issue.

However the DataStax driver's ResultSet pages as well, which works really nicely with Rx-Java and JAX-RS StreamingOutput. Time go get real, let's take the following schema:


And you want to get all the events for a particular customer_id (the partition key). First let's write the DAO:

Let's go through this line by line:

2: Async Execute of the query that will bring back more rows that will fit in memory.
4: Convert the ListenableFuture to an RxJava Observable. The Observable has a really nice callback interface / way to do transformation.
5: As ResultSet implements iterable we can flatMap it to Row!
6: Finally map the Row object to CustomerEvent object to prevent driver knowledge escaping the DAO.

And then let's see the JAX-RS resource class:

Looks complicated but it really isn't, first a little about JAX-RS streaming.

The way JAX-RS works is we are given a StreamingOutput interface which we implement to get a hold of the raw OutputStream. The container e.g Tomcat or Jetty, will call the write method. It is our responsibility to keep the container's thread in that method until we have finished streaming. With that knowledge let's go through the code:

5: Get the Observable<CustomerEvent> from the DAO.
6: Create a CountDownLatch which we'll use to block the container thread.
7: Register a callback to consume all the rows and write them to the output stream,
12: When the rows are finished, close the OutputStream.
16: Countdown the latch to release the container thread on line 33.
26: Each time we get a CustomerEvent, write it to the OutputStream.
33: Await on the latch to keep the container thread blocked.
39: Return the StreamingOutput instance to the container so it can call write.

Given that we're dealing with the rows from Cassandra asynchronously you didn't expect the code to be in order did you? ;)

The full working example is on my GitHub. To test it all I put around 600mb of data in a Cassandra cluster for the same partition. There is a sample class in the test directory to do this.

I then started the application with a MaxHeapSize of 256mb, then used curl to hit the events/stream endpoint:


As you can see 610M came back in 7 minutes. The whole time I had VisuamVM attached to the application and the coordinator and monitored the memory usage.

Here's the graph from the application:


The test was ran from 14:22 to 14:37. Even though we were pumping through 610M of data through the application the heap was gittering between 50m and 200m, easily able to reclaim the memory of the data we have streamed out.

For those new to Cassandra and other distributed databases this might not seem that spectacular, but I once wrote a rather large project to do what we can manage here in a few lines. Awesome work by the Apache Cassandra commitors and the DataStax Java driver team.

Friday, December 5, 2014

Cassandra summit EU - British Gas, i20 water, testing Cassandra and paging!

Yesterday was the EU Cassandra Summit in London, 1000 crazy Cassandra lovers. I've only just recovered from what was a hectic day.

Over the course of the day I got to do chat with two awesome companies, Michael Williams from i20 water and Josep Casals from British Gas Connected Homes. Both of these companies are using Cassandra to store time series data from devices, dare I use the ever popular buzz phrase Internet of Things?

But really they are, i20 water enable water companies to place sensors all around their network and gather the data to detect leaks, saving them 100s of millions of litres of water a day.

British Gas Connected homes are enabling their customers to turn their central heating on and off via their mobile, and are expanding into monitoring boilers and predicting when they'll fail/need a service.

In addition to speaking with Cassandra users I also snuck in a talk and a lightning talk. The talk was on how to test Cassandra applications and the lightning talk on server side paging.

Here are the slides for the talk, the video will no doubt be online soon:

 

And for the lightning talk:


Monday, December 1, 2014

Talking about Cassandra at the LJC open conference 2014

The LJC conference is a yearly event for Java/JVM developers in London to get together and see some (hopefully) great talks :)

IBM kindly provide the an awesome venue on South Bank at no charge.

I had the fortune of being scheduled to talk first which means I could get my talk done and then enjoy the rest of the day. I chose to speak about Cassandra for Java devs which went down really well and I had people coming to me all day asking about Cassandra.

Here are the slides:




Overall it was an awesome day and I look forward to next year :)