Monday, December 22, 2014

Getting started: Cassandra + Spark with Vagrant

I play with a lot of different technologies and I like to keep my work stations clean. I do this by having a lot of vagrant VMs. My latest is Apache Spark with Apache Cassandra. We're going to install a working setup of Cassandra/Spark using Vagrant and Ansible. The Vagrant/Ansible is on Github here.

To get going you'll need:
If you haven't used Ansible before ignore all the paid for Ansible Tower and install it with your favourite package manager e.g homebrew or apt. 

Once that's installed checkout the Vagrant file.

Then launch the VM with vagrant up. This can take some time as it actually installs:
  • Java
  • Cassandra
  • Spark
  • Spark Cassandra connector
I could have baked a virtual box with all this in but the Ansible also documents you install all of these (and me once I've forgotten). As well as being slow it has the disadvantage that if downloads Cassandra/Spark so if their repositories are down it won't work.

The VM runs on port Your Spark master should be up and running on

You'll also have ops centre installed at:

To add the cluster simply click "Add existing cluster.." then enter the IP

If you want to use cqlsh then simply "vagrant ssh" in and then run "cqlsh"

To get spark shell up and running just "vagrant ssh" in and then run the spark-shell command:

Spark shell has been aliased to include the Cassandra spark connector so you can start using Cassandra backed RDDs right away!

Any questions or problems just ping me on twitter: @chbatey

Monday, December 8, 2014

Streaming large payloads over HTTP from Cassandra with a small Heap: Java Driver + RX-Java

Cassandra's normal use case is vast number of small read and write operations distributed equally across the cluster.

However every so often you need to extract a large quantity of data in a single query. For example, say you are storing customer events that you normally query in small slices, however once a day you want to extract all of them for a particular customer. For customers with a lot of events than this could be many hundreds of megabytes of data.

If you want to write a Java application that executes this query against a version of Cassandra prior to 2.0 then you may run into some issues. Let us look at the first one..

Coordinator out of memory:

Previous versions of Cassandra used to bring all of the rows back to the coordinator before sending them to your application, so if the result is too large for the coordinator's heap it would run out of memory.

Let's say you had just enough memory in the coordinator for the result, then you ran the risk of...

Application out of memory:

To get around this you had to implement your own paging, where you split the query into many small queries and processed them in batches. This can be achieved by limiting the results and issuing the next query after the last result of the previous query.

If your application was streaming the results over HTTP then the architecture could look something like this:

Here we place some kind of queue, say an ArrayBlockingQueue if using Java, between the thread executing the queries and the thread streaming it out over HTTP. If the queue fills up the DAO thread is blocked, meaning that it won't bring any more rows from Cassandra. If the DAO gets behind the WEB thread (perhaps a tomcat thread) blocks waiting to get more rows out of the queue. This works very nicely with the JAX-RS StreamingOutput.

This all sounds like a lot of hard work...

The 2.0+ solution

From version 2.0, Cassandra would no longer suffer from the coordinator out of memory. This is because the coordinator pages the response to the driver and doesn't bring the whole result into memory. However if your application reads the whole ResultSet into memory then your application running out of memory is still an issue.

However the DataStax driver's ResultSet pages as well, which works really nicely with Rx-Java and JAX-RS StreamingOutput. Time go get real, let's take the following schema:

And you want to get all the events for a particular customer_id (the partition key). First let's write the DAO:

Let's go through this line by line:

2: Async Execute of the query that will bring back more rows that will fit in memory.
4: Convert the ListenableFuture to an RxJava Observable. The Observable has a really nice callback interface / way to do transformation.
5: As ResultSet implements iterable we can flatMap it to Row!
6: Finally map the Row object to CustomerEvent object to prevent driver knowledge escaping the DAO.

And then let's see the JAX-RS resource class:

Looks complicated but it really isn't, first a little about JAX-RS streaming.

The way JAX-RS works is we are given a StreamingOutput interface which we implement to get a hold of the raw OutputStream. The container e.g Tomcat or Jetty, will call the write method. It is our responsibility to keep the container's thread in that method until we have finished streaming. With that knowledge let's go through the code:

5: Get the Observable<CustomerEvent> from the DAO.
6: Create a CountDownLatch which we'll use to block the container thread.
7: Register a callback to consume all the rows and write them to the output stream,
12: When the rows are finished, close the OutputStream.
16: Countdown the latch to release the container thread on line 33.
26: Each time we get a CustomerEvent, write it to the OutputStream.
33: Await on the latch to keep the container thread blocked.
39: Return the StreamingOutput instance to the container so it can call write.

Given that we're dealing with the rows from Cassandra asynchronously you didn't expect the code to be in order did you? ;)

The full working example is on my GitHub. To test it all I put around 600mb of data in a Cassandra cluster for the same partition. There is a sample class in the test directory to do this.

I then started the application with a MaxHeapSize of 256mb, then used curl to hit the events/stream endpoint:

As you can see 610M came back in 7 minutes. The whole time I had VisuamVM attached to the application and the coordinator and monitored the memory usage.

Here's the graph from the application:

The test was ran from 14:22 to 14:37. Even though we were pumping through 610M of data through the application the heap was gittering between 50m and 200m, easily able to reclaim the memory of the data we have streamed out.

For those new to Cassandra and other distributed databases this might not seem that spectacular, but I once wrote a rather large project to do what we can manage here in a few lines. Awesome work by the Apache Cassandra commitors and the DataStax Java driver team.

Friday, December 5, 2014

Cassandra summit EU - British Gas, i20 water, testing Cassandra and paging!

Yesterday was the EU Cassandra Summit in London, 1000 crazy Cassandra lovers. I've only just recovered from what was a hectic day.

Over the course of the day I got to do chat with two awesome companies, Michael Williams from i20 water and Josep Casals from British Gas Connected Homes. Both of these companies are using Cassandra to store time series data from devices, dare I use the ever popular buzz phrase Internet of Things?

But really they are, i20 water enable water companies to place sensors all around their network and gather the data to detect leaks, saving them 100s of millions of litres of water a day.

British Gas Connected homes are enabling their customers to turn their central heating on and off via their mobile, and are expanding into monitoring boilers and predicting when they'll fail/need a service.

In addition to speaking with Cassandra users I also snuck in a talk and a lightning talk. The talk was on how to test Cassandra applications and the lightning talk on server side paging.

Here are the slides for the talk, the video will no doubt be online soon:


And for the lightning talk:

Monday, December 1, 2014

Talking about Cassandra at the LJC open conference 2014

The LJC conference is a yearly event for Java/JVM developers in London to get together and see some (hopefully) great talks :)

IBM kindly provide the an awesome venue on South Bank at no charge.

I had the fortune of being scheduled to talk first which means I could get my talk done and then enjoy the rest of the day. I chose to speak about Cassandra for Java devs which went down really well and I had people coming to me all day asking about Cassandra.

Here are the slides:

Overall it was an awesome day and I look forward to next year :)

Friday, November 7, 2014

Talking at @skillsmatter for the LJC about fault tolerant microservices

Had a great time giving a talk on building fault tolerant micro services at skills matter this week. It was great to share some of the good work my team and I have been doing at BSkyB. Here are the slides:


Skills matter were kind enough to record the event, here is the video:

I referenced some great tools such as wiremock and saboteur and coincidentally the author was in the audience, it is a small tech world we live in!

Tuesday, October 21, 2014

Building fault tolerant services: Do you handle timeouts correctly?

One of the simplest sounding requirements that you might get as a software engineer is that your service should either succeed or timeout within N seconds.

However as we move toward more distributed services a.k.a microservices, this is harder than it sounds.

Even if all your service did was call out to another HTTP service over TCP, do some logic, then return a response you have to deal with:
  • Thread creation duration
  • Socket connection timeout to the dependency
  • Socket read timeout to the dependency
  • Resource acquisition e.g how long a it takes for your request thread to get hold of a resource from a pool
If you stop there you might think you have all your bases covered:

Looks good, you are taking into account the time taken to get a resource from a resource pool for the third party, any connection timeouts in case you need to re-connect and then finally you set a socket read timeout on the request.

This covers timing out in most cases, but what happens if the dependency is feeding you data very slowly? 

Here a socket read time out won't help you as the underlying socket library you're using is receiving some data per read time out period. For large payloads this scenario can leave your application appearing to hang.

So how to you solve this? To be sure that you as an application will time out than you can't rely on network level timeouts. A common pattern is to have a worker queue and thread pool for each dependency, that way you can timeout on the request in process. A fantastic library for this is Netflix's Hystrix.

How do you do automated testing for this? If you're like me and love to test everything then this is a tough one. However the combination of running your dependency (or a mock like wiremock) on a separate VM that is provisioned for the test, and then using linux command like iptables and tc, then you can automate the tests for slow network. Saboteur is a small python library that does this for you, and offers a HTTP API for slowing network, dropping packets etc.

Isn't this slow? On a Jenkins server, the provision of the VM takes ~1 minute with vagrant after the base box is downloaded. For development I always have it running. 

The whole stack of an application under test, wiremock, vagrant and saboteur will be the topic of a follow up post that will contain a full working example.

This article showed how complicated this is for a single dependency, what about if you call out to many dependencies? I tend to use a library like Hystrix to wrap calls to dependencies, but in-house code to wrap the whole request and timeout. This allows one dependency to be slow while the others are fast, which is more flexible than taking your SLA and dividing it between your dependencies. 

Saturday, August 23, 2014

Using Hystrix with Dropwizard

I've previously blogged about Hystrix/Tenacity and Breakerbox. The full code and outline of the example is located here.

Subsequently I've been using Dropwizard and Hystrix without Tenacity/Breakerbox and found it far simpler. I don't see a great deal of value in adding Tenacity and Breakerbox as Hystrix uses Netflix's configuration library Archaius which already comes with dynamic configuration via files, databases and Zookeeper.

So lets see what is involved in integrating Hystrix and Dropwizard.

The example is the same from this article. Briefly, it is a single service that calls out to three other services:
  • A user service
  • A pin check service
  • A device service

To allow the application to reliably handle dependency failures we are going to call out to each of the three services using Hystrix commands. Here is an example of calling out using the Apache Http Client to a pin check service:

To execute this command you simply instantiate an instance and call execute(), Hystrix handles creating a work queue and thread pool. Each command that is executed with the same group will use the same work queue and thread pool. You tell Hystrix the group by passing it to super() when extending a Hystrix command. To configure Hystrix in a Dropwizard like way we can add a map to our Dropwizard YAML:

This will translate to a Map in your Dropwizard configuration class:

The advantage of using a simple map rather than a class with the property names matching Hystrix property names is this allows you to be completely decoupled from Hystrix and its property naming conventions. It also allows users to copy property names directly from Hystrix documentation into the YAML.

To enable Hystrix to pick these properties up it requires a single line in your Dropwizard application class. This simplicity is due to the fact that Hystrix uses Archaius for property management.

Now you can add as any of Hystrix's many properties to your YAML. Then later extend the Configuration you install to include a dynamic configuration source such as ZooKeeper.

I hope this shows just how simple it is to use Hystrix with Dropwizard without bothering with Tenacity. A full working example is on github

Thursday, August 21, 2014

Stubbed Cassandra at Skills Matter

Yesterday I gave a talk on how to test Cassandra applications using Stubbed Cassandra at the Skills Matter in London for the Cassandra London meetup group.

The talk was well attended with some where between 50 to 100 people attending.

The slides are on Slide share:

And the talk is on the skills matter website.

Thanks to Cassandra London and Skills Matter for having me!

Thursday, August 7, 2014

RabbitMQ and Highly Available Queues

RabbiqMQ is a AMQP broker with an interesting set of HA abilities. Do a little research and your head will start spinning working out the differences between making messages persistent, or queues durable, or was it durable messages and HA queues with transactions? Hopefully the following is all the information you need in one place.

Before evaluating them you need to define your requirements.

  • Do you want queues to survive broker failures? 
  • Do you want unconsumed messages to survive a broker failure?
  • What matters more, publisher speed, or the above? Or do you want a nice compromise?

RabbitMQ allows you to:

  • Make a cluster of Rabbits where clients can communicate with any node in the cluster
  • Make a queue durable, meaning the queue definition itself will survive broker failure
  • Make a message persistent, meaning that it will get stored to disk, which you do by setting a message's delivery_mode
  • Make a queue HA, meaning its contents will be replicated across brokers, either a specified list, all of them or a number of them 
  • Even an HA queue has a single master that handles all operations on that queue even if the client is connected to a different node in the cluster, the master sends information to the replicas, these are called slaves
Okay so you have a durable queue that is HA and you're using persistent messages (you really want it all!). How do you work with the queue correctly?

Producing to an HA queue

You have three options for publishing to a HA queue:
  • Accept the defaults, the publish will return with no guarantees in the result of broker failure
  • Publisher confirms
  • Transactions
The defaults: You went to all that effort of making a durable HA queue and send a persistent message and then you just fire and forget? Sounds crazy, but its not. You might have done the above to make sure you don't lose a lot of messages, but you don't want the performance impact of waiting for any form of acknowledgment. You're essentially accepting a few failures when you lose a rabbit that is the master for any of your queues.

Transactions: To use RabbitMQ transactions you do a txSelect on your channel. Then when you publish a message you call txCommit which won't return until your message has been accepted by all of the master and all of the queues slaves. If you message is persistent then that means it is on the disk of them all, you're safe! What's not to like? The speed! Every persistent message that is published in a transaction results in an fsync to disk. You need a compromise you say? 

Publisher confirms: So you don't want to lose your messages and you want to speed things up. Then you can enable publish confirms on your channel. RabbitMQ will then send you a confirmation when the message has made it to disk on all the rabbits but it won't do it right away, it will flush things to disk in batches. You can either block periodically or set up a listener to get notified. Then you can put logic in your publisher to do retries etc. You might even write logic to limit the number of published messages that haven't been confirmed. But wait, isn't queueing meant to be easy?

Consuming from a HA queue

Okay, so you have your message on the queue - how do you consume it? This is simpler:
  • Auto-ack: As soon as a message is delivered RabbitMQ discards it
  • Ack: Your consumer has to manually ack each message
If your consumer crashes and disconnects from Rabbit then the message will be re-queued. However if you have a bug and you just don't ack it, then Rabbit will keep a hold of it until you disconnect, then it will be re-queued. I bet that leads to some interesting bugs!

So what could go wrong?

This sounds peachy, you don't care about performance so you have a durable HA queue with persistent messages and are using transactions for producing and acks when consuming, you guaranteed exactly once delivery right? Well, no. Imagine your consumer crashes having consumed the message but just before sending the ack? Rabbit will re-send the message to another consumer.

HA queueing is hard!


There is no magic bullet, you really need to understand the software you use for HA queueing. It is complicated and I didn't even cover topics like network partitions. Rabbit is a great piece of software and its automatic failover is really great but every notch you add on (transactions etc) will degrade your performance significantly.

Monday, August 4, 2014

Getting started with Hystrix and Tenacity to build fault tolerant applications

Applications are becoming increasingly distributed. Micro service architecture is the new rage. This means that each application you develop has more and more "integration points".

Any time you make a call to another service or database, or use any third party library that is a black box to you, it can be thought of as an integration point.

Netflix's architecture gives a great example of how to deal with integration points. They have a popular open-source library called Hystrix which allows you to isolate integration points by executing all calls in its own worker queue and thread pool.

Yammer have integrated Hystrix with Dropwizard, enabling enhancement of applications to publish metrics and accept configuration updates.

Here is an example application that calls out to three HTTP services and collects the results together into a single response.
Rather then calling into a HTTP library on the thread provided by Jetty this application uses Yammer's Hystrix wrapper, Tenacity.

Let's look at one of the integration points:

Here we extend the TenacityCommand class and call the dependency in the run() method. Typically all this code would be in another class with the TenacityCommand just being a wrapper, but this is a self-contained example. Let's explain what it is doing:
  • Making an HTTP call using the Apache HTTP client
  • If it fails throw a Runtime exception
By instantiating this TenacityCommand and calling execute(), your code is automagically executed on its very own thread pool, and requests are queued on its very own work queue. What benefits do you get?
  • You get a guaranteed timeout, so no more relying on library's read timeout that never seems to work in production
  • You get a circuit breaker that opens if a configured % of calls fail, meaning you can fail fast and throttle calls to failing dependencies
  • You get endpoints that show the configuration and whether a circuit breaker is open

If the call to run() fails, times out or the circuit breaker is open Tenacity will call the optional getFallback() method, so you can provide a fallback strategy rather than failing completely. 

Another hidden benefit is how easy it is to move to a more asynchronous style of programming. Let's look at the resource class that pulls together the three dependencies:

Let's ignore the fact we are calling out to other HTTP services from a resource layer. The above code shows how to use Tenacity synchronously. Apart from the advantages you gain regarding failures, all the calls are still happening one by one as we call execute() which blocks so we don't call the second dependency until the first one has finished.

However, this doesn't have to be the case. Now you've snuck Tenacity into your code base you can change the code to something like this:

And without your colleagues realising you've made all of your calls to your dependencies execute asynchronously and (possibly) at the same time, then you block to bring them all together at the end.

We've barely scratched the surface of Hystrix and Tenacity but hopefully you can already see the benefits. All the code for this example a long with instructions on how to use wiremock to mock the dependencies is here.

Thursday, July 31, 2014

Rabbit MQ vs Apache QPID - picking your AMQP broker

Two of the top AMQP brokers are RabbitMQ and QPID. Which one is for you? Here are some of the considerations:

Both have the same programming model as you can connect with any AMQP compatible client, so how do you pick?

First off you need to decide if you're interested in the Java QPID broker or the C++ QPID broker. They have the same name but they are quite different. This article will compare the C++ QPID broker.


RabbitMQ has been developed in Erlang, and thus you need to install Erlang before you can run it. Think of this as the equivalent of needing a JVM for running something like Tomcat. Erlang is in most Linux distros packaging system so this is easy.

QPID C++ broker doesn't have a storage dependency like its Java equivalent. To enable any form of HA QPID depends on rgmanager.


RabbitMQ uses the built in Erlang database: Mnesia. A Mnesia database can be configured to either be in RAM or disk, which allows RabbitMQ to offer in memory / disk based queues very easily. It also means the developers of RabbitMQ haven't had all the complexities of developing a file based storage mechanism and don't rely on a database.

QPID stores its queues in memory or database. Without looking at the code that is all we're going to learn. If you see QPID referencing databases then you are looking at the QPID Java broker.

HA Queues and Failover

RabbitMQ runs in an active-passive mode for HA queues. Each queue has a master and writes are replicated to slaves. By default, RabbitMQ doesn't try and solve network partition within clusters. It expects a reliable LAN and should never be used across a WAN. However it has two optional ways of dealing with network partitions: pause_minority and autoheal. If you enable pause minority a rabbit in a cluster will disable its self by disconnecting clients if it can only communicate with a minority of nodes. The idea here is that clients will reconnect to the majority part of the cluster. Autoheal lets the nodes carry on as normal and when the network partition ends resets the nodes in the side that had the fewest client connections, how brutal!

QPID has overwhelming number of options when it comes to HA. Lets stick to in LAN HA. There you have active-passive messaging clusters. Here queues are replicated to a passive QPID node, then when the master fails you can either move a virtual IP or configure the client with both the master and the salve and it will failover. The big caveat is that QPID won't handle failover its self, you need a cluster manager like Rgmanager. The rational is that they want to avoid split brain, I'd rather have the functionality built in and not use it if I need to worry about split brain.

Cross DC? WAN compatible?

Who runs in a single DC these days? Not me! Most queueing solutions shy away from the WAN connection. Most have a clustering mode that will only work in a reliable LAN. When it comes to getting your data from one datacenter to another it is usually a bolt on.

RabbitMQ has two plugins to handle this. Lets take them individually. The Shovel plugin allows you to take messages from a queue in one cluster and put it on an exchange in another, perfect! It can be configured to create the queues/exchanges and handles connection failures. You can set properties like the ack mode depending on how resilient you want the putting of messages to be. You can also list multiple brokers so you can have it failover to another broker if you lose connection to the original. A great feature of shovels is that you can define them on every node of your cluster and it will run on a single node, if that node fails then it will start on another node.

RabbitMQ also has the federation plugin. This plugin allows you to receive messages from exchanges and queues from other brokers. It achieves the same goal as the shovel plugin but is slightly less configurable.

QPID has a very similar feature to shovels called broker federation where you define a source queue on one broker and a destination exchange on a broker running in a different exchange. Like RabbitMQ these "routes" can be replicated on a cluster of brokers and if the broker that is executing the route goes down another in the cluster can take its place.

It is a draw! However I would always consider a simpler option for cross WAN queueing. That is get the message in a queue in the datacenter is is produces but use a consumer from an application running in the other datacenter. After all it is your application that needs the message and is best places to handle failures.


RabbitMQ is far simpler to get going with, primarily due to fantastic documentation and very easy install process. I also like the the fact RabbitMQ uses Erlang's in built database and internode communication mechanism. The two modes for network partitions are very nice also. So RabbitMQ wins this tight race.

However, before you go and use RabbitMQ, checkout Apache Kafka as the wildcard!

Thursday, July 10, 2014

Installing Cassandra and Spark with Ansible

I am currently doing a proof of concept with Spark and Cassandra. I quickly need to be able to create and start Cassandra and Spark clusters. Ansible to the rescue!

I split this my ansible playbook into three roles:
  • Cassandra
  • Ops center
  • Spark
My main playbook is very simple:

I have some hosts defined in a separate hosts file called m25-cassandra. I've decided to install htop, I could have out this in a general server role.

I also define a few variables, these of course course could be defined else where per role:
  • cluster_name - this will replace the cluser name in each of the hosts cassandra.yaml
  • seeds - as above
So lets take a look at each role.


Here are the tasks:

This is doing the following:
  • Installing a JRE
  • Adding the Apache Cassandra debian repository
  • Adding the keys for the debian repository
  • Installing the latest version of Cassandra
  • Replacing the cassandra.yaml (details later)
  • Ensuring Cassandra is started
The template cassandra.yaml uses the following variables:
  • cluster_name: '{{ cluster_name }}' - So we can rename the cluster
  • - seeds: "{{ seeds }}" - So when we add a new node it connects to the cluster
  • listen_address: {{ inventory_hostname }} - Listen on the nodes external IP so other nodes can communicate with it
  • rpc_address: {{ inventory_hostname }} - So we can connect ops center and cqlsh to the nodes
Magic! Now adding new hosts to my hosts file with the tag m25_cassandra will get Cassandra installed, connected to the cluster and started.

Ops Center

The tasks file for ops center:

This is doing the following:
  • Adding the Datastax community debian repository
  • Adding the key for the repo
  • Installing Ops Center
  • Starting Ops Center
No templates here as all the default configuration is fine.


The spark maven build can build a debian package but I didn't find a public debian repo with it in so the following just downloads and unzips the Spark package:

I start the workers using the script from my local master do don't need to start anything on the nodes that have Cassandra on.


Ansible makes it very easy to install distributed systems like Cassandra. The thought of doing it manually fills me with pain! This is just got a PoC, I don't suggest downloading Spark from the public internet or always installing the latest version of Cassandra for your production systems. The full souce including templates and directory structure is here.

Sunday, June 15, 2014

Behaviour driven testing with Cucumber-JVM and Stubbed Cassandra

So you're tasked with building a service that facades your company's legacy user management system. You need to do this as your part of the company needs to handle the legacy system going down.

To add resiliency you are required to save a user's information in case the legacy system is down. You need to be able to handle restarts so this cache will need to be persistent.

You're going to use Cassandra as your persistence, so how do test this? You sit down your your analyst and QA and come up with the following feature:

How would you implement these features? Assuming that the legacy system is a HTTP service you can use Wiremock to mock it being up and down.

For example here is how to mock the legacy system being up with Wiremock:

And an example of it being down:

So the next requirement is that Cassandra being down doesn't make your service fail i.e.

The first one you could stop Cassandra, perhaps using the great tool CCM. However this is slow, and you need to write code to make sure it is back up/down, all of this in a different process. And how about the next test? How do we make Cassandra return the result slowly? Or produce a Write Timeout Exception?

That is where Stubbed Cassandra comes in handy. To get is started you can add some code like this to start it before the tests and close it after tests:

Now implementing the step definition to mimic Cassandra being down is as easy as:

Which is a lot quicker than turning off a real Cassandra, and starting it back up in the @Before is also very quick.

Now to mimic Write time outs in Cassandra:

Very similar things could be done for read timeouts and unavailable exceptions.

This article gave you an insight to how you can behaviour drive features relating to Cassandra being down. The full code and running tests are here. Full information on how to use Stubbed Cassandra is here, you'll probably want to documentation for the Java client for Scassandra which is here.

The modern developer: Understanding TCP

A couple of years ago I am not sure when interviewing candidates that I would have delved into their knowledge of the TCP protocol.

However I think I've changed my mind, why?

Reason one: The Cloud. The marketer tells us that the cloud means that we don't need to care where our servers are or where our applications are deployed. But for the developer I think the opposite is true. Suddenly rather than internal high speed network links our applications are being deployed onto commodity hardware with shoddy network links that regularly go down.

Reason two: Micro-service architecture. As we split up our applications into small components we're also adding to the number of integration points, most of those integration points will be over TCP.
So what should a well rounded developer know about their system and its dependencies:
  1. For any calls outside of their application's process:
    1. What is the underlying protocol?
    2. What is the connection timeout for that protocol?
    3. What is the read timeout for that protocol?
    4. What is the write timeout for that protocol?
  2. Are there any firewalls between your application and its dependencies?
  3. Does the traffic sent between your applications, or your application and its dependencies go over the Internet?
  4. What happens if a dependency responds slower than usual at the application level?
  5. What happens if a dependency responds slower, or not at all, at the protocol level?
And I think the best way to know the answer to these question is to test these scenarios. And to do that they need a good understanding of how TCP works and when it can go wrong. They need to be able to use tools like tcpdump, wireshark and netcat. How many "Java developers" do you think fall into this category and would test these scenarios? 

How many would say: Well I call a Java method that does the connection, what do I care?

As soon as you remember that the people writing these libraries are just as human as you, you might have second thoughts about not testing the "too edge-case". Most people are surprised to realise that a lot of libraries just use the operating system's timeout value for TCP connections, which is usually measured in minutes, not seconds. How do you explain to your users why your application hung for 10 minutes?

I no longer see any of the above scenarios as edge cases, it just might take a few weeks in production for each of them to happen.

Friday, June 13, 2014

Overriding Spring Boot HTTP status codes for health pages

One thing I found a little baffling when I started using spring boot was that the health pages returned 200 regardless of your 'Status'. So 'DOWN' would return a 200 with the actual information in a JSON payload. This means any monitoring system needs to parse the JSON rather than just know based on the HTTP status code.

Finally for Spring Boot 1.1 you can override this! For example if you want DOWN to be a 500, then add the following to your application properties file.[DOWN]=INTERNAL_SERVER_ERROR

Saturday, June 7, 2014

Using Stubbed Cassandra's new JUnit rule

In my first post about unit testing Java applications that use Cassandra you had to start Scassandra before your test, reset it between tests and finally stop it before it after your tests. Since then I've accepted a pull request which hides all this away in a JUnit rule.

So the old code in your tests looked like this for starting Scassandra:

This for stopping Scassandra:

And this for resetting Scassandra between tests:

Now you can simply do this:

Assigning it to a ClassRule means that it will start Scassandra before all your tests and stop it after all your tests. Then assigning it to a regular Rule means that the Activity client and Priming client will be reset between tests.

Wednesday, June 4, 2014

Unit testing Java Cassandra applications

I often get asked what is the best method for unit and and integration testing Java code that communicates with Cassandra.

Here are what I think the options are:
  1. Mocking libraries: Use mocking libraries such as Mockito to mock out the driver you are using
  2. Cassandra Unit
  3. Integration test the class in question with against a real Cassandra instance running locally
  4. Stubbed Cassandra (disclaimer: I made this)
Lets take these one by one and look at their respective advantages and disadvantages. The things to consider are:
  • Speed of the tests
  • Able to run the tests concurrently and how easy this is to achieve
  • Able to test everything? Even failures?
  • Are the tests brittle?
  • Readability of the tests
  • Requirements on environment e.g do you need a Cassandra instance installed?
  • Confidence it will work against a real Cassandra
  • Subjective nonsense by writer of this blog

1 Mock the library

You can use a combination of factories and mocking to stop the driver actually interacting with Cassandra. Then verify your code's interactions with these mocks.

  • Fast - no I/O
  • Execute tests concurrently
  • No Cassandra instance required on machine your code compile runs
  • Can test everything including the various failures as you can mock the library to throw ReadTimeout exceptions etc
  • You may mock the driver to behave in a different way to how it will behave 
  • Very hard to understand tests due to large amount of boiler plate mocking code
  • Very brittle tests. Change your driver, change your tests! Change from a query to a prepared statement, change your tests!
  • A lot of boiler plate. Take the Datstax driver for example, it returns a ResultSet, which is iterable, fancy writing the code that mocks it returning many results?
  • Don't do it if you wish to remain sane

2 Cassandra Unit

Cassandra Unit is a tool for starting an embedded Cassandra in the JVM your tests are running. It also has a great API for ingesting data into Cassandra for your tests.

  • Pretty fast. It is all in process.
  • Can run tests concurrently if they use different keyspaces and none of the tests turn it off etc
  • Can use CQL to load data as it is a real Cassandra. I think this leads to readable tests
  • No Cassandra required on machine. Can use the it via a Maven dependency
  • High confidence your code will work against a real Cassandra 

  • Unable to test failure scenarios. What is a read time out when there is only one node running in the same JVM as your test?
  • No way to verify consistency of queries

  • Very useful tool for in process happy path tests

3 Integration style tests using a real Cassandra

This is something I've done a lot of recently. Every dev machine at my current work place has a Cassandra instance running (using the awesome tool ccm). Then we test our DAOs by assuming it is there and doing testing in a dynamic keyspace.

  • Can run tests concurrently if they use different keyspaces and none of the tests turn it off etc
  • Tests aren't brittle, can change from queries to prepared statements, or the queries involved without changing tests
  • Readable tests - all data setup is done in CQL
  • Very high confidence it will work against a real Cassandra as the test is against a  real Cassandra :)
  • Probably the slowest option. But it is still millisecond quick.
  • Hard to test scenarios other than turning the node off. This then makes the tests slow.
  • Cassandra required to build and run tests
  • Slightly slower but very useful for testing happy paths
  • Very similar to using Cassandra unit

4 Stubbed Cassandra

Stubbed Cassandra is a new open source tool that pretends to be Cassandra and can be primed to returns rows, read timeouts, write timeouts and unavailable errors. It can be used via a maven dependency or as a standalone server.

  • Very fast
  • Can test all types of errors and be confident in what the driver does as the driver thinks it is a real Cassandra
  • Can run many instances inside the same JVM listening on different binary ports. So can run tests concurrently with no extra effort e.g no requirement to use different keyspaces
  • Tests less brittle than mocking the driver. Can change driver without changing test but if you change queries you need to update your priming
  • No requirement to have a real Cassandra. Just brought in by a maven dependency
  • Slightly more brittle than a real Cassandra/Cassandra unit. Requires priming on the query, priming for each prepared statement
  • Slightly less confidence it will work against a real Cassandra as it isn't a real Cassandra. But more confidence than mocking
  • It is new and does not support all of Cassandra's features, so if you use a feature that Scassandra doesn't support you are stuck!

  • Best solution for all error case testing
  • Best solution if you need to execute tests concurrently

Saturday, May 24, 2014

Introducing Scassandra: Testing Cassandra with ease

Scassandra (Stubbed Cassandra) is a new open source tool for testing applications that interact with Cassandra.

It is intended to be used for:
  • Unit testing DAOs that interact with Cassandra
  • Acceptance testing applications that interact with Cassandra
The first release is primarily aimed at Java developers. Subsequent releases will be aimed at people writing black box acceptance tests.

It works by implementing the server side of the CQL binary protocol, meaning that Cassandra drivers, such as the the Datastax Java Driver, believe it to be a real Cassandra.

The original motivation for developing Scassandra was to test edge case / failure scenarios. However it quickly became apparently it is just as useful with regular happy path unit tests.

So how does it work?

When you start Scassandra it opens two ports:
  • Binary port: this is the port you'll configure your application with rather than the binary or a real Cassandra. 
  • Admin port: this is for Scassandra's REST API that provides priming and retrieving a list of executed queries. 
By default, when your application connects to Scassandra and executes queries Scassandra will respond with a result with zero rows.

Then via Scassandra Java Client, and later the REST API, you can prime Scassandra to return rows (where you can specify the column types and values), read request time outs, write request time outs and unavailable exceptions.

After you've run your tests you can verify the queries and prepared statements that your application has executed. Including the consistency the queries have been executed with. So if you have requirements to execute certain writes at a high consistency but other queries can be at a lower consistency this can be tested via black box acceptance tests.

The benefits of Scassandra over testing against a real Cassandra instance:
  • Test failures deterministically: where previously you would need to have a multi node cluster with the ability to bring down nodes. 
  • Test the consistency of queries. This has come up at my workplace where a requirement was that for most queries we can downgrade consistency when there are failures but for certain important writes they had to be executed at QUORUM.
  • Have fast running DAO tests that don't require mocking out driver classes or a real Cassandra running.

So how do I use Scassandra?

The first release of Scassandra is aimed at Java developers. Scassandra comes in two parts:

  • Scassandra server. This is a Scala application that has been put in Maven central with a pom that will bring in its transitive dependencies.
  • Scassandra Java client. A thin wrapper written in Java to make using Scassandra from Java tests easy. This has methods to start/stop Scassandra and classes that prime / retrieve the list of executed queries.

For the first release it is expected that Scassandra will only be used via the Java client and no one will use it as a standalone executeable or interact with the REST API directly.

To get started with the Scassandra Java client then go here. Or checkout the example project here.

If you aren't using Java or a language that can easily use the Java client then the next release will be for you where we'll build a standalone executable and from that release on we'll make the REST API backward compatible as we'll expect people to use it directly.

It is all open source on github and you can find Scasandra server here.

The Java client is here.

And all the details of the REST API e.g how to prime are on the Scassandra sever website here.

Using Stubbed Cassandra: Unit testing Java applications

My first article on Scassandra introduced what it is and why I've made it.

This article describes how to use Scassanda to help unit test a Java class that stores and retrieves data from Cassandra.

It assumes you're using a tool that can download dependencies from maven central e.g Maven, Gradle or SBT.

First add Scassandra as a dependency. It is in maven central so you can add it to your pom with the following xml:


Or the following entry in your build.gradle:

dependencies {

There are four important classes you'll deal with from Java:
  • ScassandraFactory - used to create instances of Scassandra
  • Scassandra - interface for starting/stopping Scassandra and getting hold of a PrimingClient and an ActivityClient
  • PrimingClient - sends priming requests to Scassandra RESTful admin interface
  • ActivityClient - retrieves all the recorded queries and prepared statements from the Scassandra RESTful admin interface

The PrimingClient and ActivityClient have been created to ease integration for Java developers. Otherwise you would need to construct JSON and send it over HTTP to Scassandra.

You can start a Scassandra instance per unit test and clear all primes and recorded activity between tests.

To start Scassandra before your test starts add a BeforeClass e.g:

You can also add a AfterClass to close Scassandra down:

Now that you have Scassandra running lets write a test. Perhaps you want to test a simple Java DAO that connects to Cassandra and executes a query.

And you have a backing table like:

  id int,
  first_name text,

Lets TDD the DAO using Scassandra starting with our connect method:

Lets look at what this code is doing:
  • Line 4: Informs the activity client to clear all recorded connections. This is to stop other tests that have caused connections interfering with this one. 
  • Line 6: We call on connect on our PersonDao.
  • Line 8: We call retrieveConnections on the activity client and expect there to be at least one. The Java Datastax driver makes multiple connections on startup so you can't assert for this to be 1.
This fails with the following message:

java.lang.AssertionError: Expected at least one connection to Cassandra on connect
at org.junit.Assert.assertTrue(
at com.batey.examples.scassandra.PersonDaoTest.shouldConnectToCassandraWhenConnectCalled(

Now lets write some code to make it pass:

Now lets test the retrieveNames function gets all the first_names out of the person table.

This will prime Scassandra to return a single row with the column first_name with the value Chris. We expect our DAO to turn that into a List of strings containing Chris. To make this pass we need to execute a query and convert the ResultSet, something like this:

Next lets say you have the requirement that you really must not get an out of date list of names. So you want to test that the consistency you do the query is QUORUM. You can test this like this:

Lets look at what each line is doing:
  • Line 4 builds the expected query, note the consistency is also set. If you build a Query without a consistency it defaults to ONE.
  • Line 7 clears all the recorded activity so that another test does not interfere with this one. It also clears the queries that were executed as part of connect (the Datastax Java driver issues quite a few queries on the system keyspace on startup)
  • Line 11 retrieves all the queries your application has execited
  • Line 12 verifies the expected query that was built on Line 4 has been executed

This will fail with an error message like this:

java.lang.AssertionError: Expected query with consistency QUORUM, found following queries: [ {Query{query='select * from people', consistency='ONE'}]

We can make this pass by adding the consistency to our query:

And we're done!

This has been a brief instruction to Scassandra but hopefully the above gives you an idea of how Scassandra can be used to test your Java applications that use Cassandra. We've covered:
  • Priming basic queries
  • Verifying queries
  • Verifying connections
Future blog posts will show you how to:
  • Prime prepared statements
  • Prime different column types in responses
  • Prime error cases
Scassandra has only just been released. The future road map includes:
  • JUnit rule so you don't need to handle starting/stopping and clearing recorded activity
  • More generic priming e.g any query on this table
  • Support for more drivers
All the code for this example can be found in full here.

Thursday, February 13, 2014

Testing your RESTful services with Groovy, Spock and HTTPBuilder

I like to test the HTTP portion of the RESTful web services I build in isolation. This is because, in the languages I use (Java, Scala), most of the code responsible for taking in HTTP requests and parsing them is very framework specific and simply delegates the actual logic to another part of my application. Ideally I can launch this part of my application in the same way as it is launched in production but with the rest of the application stubbed out.

Recently I've been using Groovy/Spock for these integration tests, here's how for a really simple web service. In this example it's been built with Spring Boot.

Let's not go into the details of how Spring boot work but it is essentially the same application as in Spring's guide.

The web application has a single path: /exampleendpoint which takes in a single query param: input and returns a JSON object with a single field payload with the value: Something really important: with the input appended.

I want to test that the web service takes in HTTP, takes the query parameter out and builds the JSON correctly. With Groovy/Spock/HTTPBuilder here's how it looks:

Let's look at what is going on here:
  • In the setup phase we create a RESTClient
  • In the exercising phase we make a call out to the web application.
  • In the verification phase we check that the response code is 200, the content type is "application/json" and the body is JSON with a single payload field that contains "Something really important: Get a hair cut"
Benefits over writing this in Groovy/Spock over say just using Java/JUnit:
  • The HTTP libraries are easier to use with less code
  • The assertions are more expressive
  • More magic, for instance the second with() block works on a Groovy map that has automagically created
The above example doesn't deal with how to start and stop the web service under test, I left that out as it is very web framework specific. 

What do you need in your build to get this setup? I used Gradle for this example so it is probably easier to just check out the build.gradle, a lot of it can be ignored as it is related to Spring Boot rather than Spock. Here are the important bits:

The plugins:

And the dependencies:

The entire project is here. Happy testing.

Monday, February 10, 2014

Akka: Testing messages sent to an actor's parent

I've blogged previously about testing messages sent back to the sender and to a child actor. The final common scenario I come across is testing actors that send a messages back to its supervisor/parent. If it is in response to a message from the parent you can use the same technique as described in testing messages sent back to the sender.

However, if that is not the case then TestKit has a very simple way you can do it with the TestActorRef and a TestProbe. Any where you have a piece of code that looks like this:

context.parent ! "Any message"

For example:

Then you can test it by passing a TestProbe to your TestActorRef and using the expect* functions on the TestProbe. So to test the actor above your test will look something like this:

As you can see the TestActorRef can take an ActorRef to use as a parent when you use the apply method that takes in a Props.

Full code here. Happy testing.

Thursday, February 6, 2014

Testing Scala Futures with ScalaTest 2.0

Recently I was writing integration tests for a library that returned futures. I wanted simple synchronous tests.

Fortunately in the latest release of ScalaTest they have added something to do just that: the ScalaFutures trait

Say you have a class that returns a Future, e.g:

And you want to write a test for this and you want to wait for the future to complete. In regular code you might use a call back or a for comprehension e.g:

This won't work in a test as the test will finish before the future completes and the assertions are on a different thread.

ScalaFutures to the rescue! Upgrade to ScalaTest 2.0+ and mix in the ScalaFutures trait. Now you can test using whenReady:

Or with futureValue:

Happy testing! Full source code here.

Wednesday, February 5, 2014

Akka: Testing messages sent to child actors

I previously blogged about testing messages sent back to the sender. Another common scenario is testing that an actor sends the correct message to a child. A piece of code that does this might look like:

But how do you test drive this kind of code?

I've adopted the approach of removing the responsibility of creating the child from the ParentActor. To do this you can pass in a factory to the ParentActor's constructor that can be replaced for testing. To do this the ParentActor is changed to look like this:

The ParentActor now takes in a factory function which takes in an ActorFactoryRef and returns a ActorRef. This function is responsible for creating the ChildActor.

childFactory: (ActorRefFactory) => ActorRef

This is then used by the ParentActor as follows:

val childActor = childFactory(context)

You can now inject a TestProbe for testing rather than using a real ActorFactorRef. For example:

And for the real code you can pass in an anonymous function that uses the ActorRefFactory to actually create a real ChildActor:

Here you can see the ActorFactoryRef (context in Actors implement this interface) is used to create an Actor the same way as the original ParentActor did.

Full source code here.

Tuesday, January 14, 2014

Akka: Testing that an actor sends a message back to the sender

A common pattern when using actors is for actor A to send a message to Actor B and then for Actor B to send a message back.

This can be tested using the Akka testkit along with Scala Test.

Take the following very simple example. We have an Actor called Server which should accept Startup messages and respond with a Ready message when it is ready for clients to send additional messages.

We can start with a noddy implementation that does nothing:

Then write a test to specify the behaviour. Here I've used TestKit along with Scala Test FunSuite.

This test will fail with the following error message:

assertion failed: timeout (3 seconds) during expectMsg while waiting for Ready

As you can probably guess TestKit waited for 3 seconds for a Ready message to be sent back.

To fix the test we add the the following to the Server Actor implementation:

And now the test will pass! The two important things to take note of are that our test case extended from TestKit, this gives you an ActorSystem. And that the test case mixed in the ImplicitSender trait, this allows us to receive messages use the methods like "expectMsg" to assert that the correct message has been received.