Saturday, March 12, 2016

Blog has moved to

Being able to work offline in Markdown has finally convinced me to leave Blogger. You can find my latest posts as well as all of my of my old ones at

Wednesday, August 5, 2015

A collection of tech talks I think we should all watch

An incomplete list of talks that I enjoyed. I'll add more and summaries at some point!

Distributed systems

These are all by Aphyr about Jepson:


This is also known as the Martin Thompson and Gil Tene section.

Responding in a timely manner by Martin Thompson:
Understanding latency by Gil Tene:

Functional/Events/Message driven

This is also known as the Greg Young section.

Querying event streams by Greg Young:

Technology talks

Friday, July 31, 2015

Stubbed Cassnadra 0.9.1: 2.2 support, query variables and verifification of batches and PS preparations

Version 0.9.1 has a few nice features and has reduced a lot of technical debt. The first features aren't that noticeable from a users point of view:

Java 1.6 support! This was contributed by Andrew Tolbert as he wanted to test against the 1.* branch of the C* Java driver which still supports Java 1.6. I hadn't intentionally dropped 1.6 support but I had brought in a jar that was compiled against 1.7 and thus wouldn't run in a 1.6 JVM. The offending jar was an internal C* jar that could help with serialisation so it was quite a lot of work to replace the use of this jar with custom serialisation code.

Another non-feature feature: Moving to Gradle and having a single build for the Server, Java client and Integration tests against all the versions of the C* Java driver. The aim of this was to make it MUCH easier for other people to contribute as before you had to install the server to a maven repo, then build the client and install it, then run the tests. Now you just run: ./gradlew clean check, Simples.

Real features

Verification of batch statements containing queries. Priming and prepared statements in batches will be in the next release.

Support for version 2.2 of the C* driver.

Verification of the prepare of prepared statements. This will allow you to test you only prepare the statements once and at the right time i.e. application start up. 

Queries that contain variables are now captured properly. As of C* 2.0 you can use a prepared statement like syntax for queries and pass in variables. These are now captured as long as you primed the types of the variables (so Stubbed Cassandra knows how to parse them).

A farJar task so you build a standalone executable. Useful if you aren't using Stubbed Cassandra from a build tool like maven.

As always please raise issues or reach out to be on twitter if you have any questions or feedback.

Tuesday, July 7, 2015

Cassandra 3.0 materialised views in action (pre-release)

Disclaimer: C* 3.0 is not released yet and all these examples are from a branch that hasn't even made it to trunk yet.

So this feature started off as "Global indexes", the final result is not a global index and I don't trust any claim of distributed indexes anyhow. If your data is spread across 200 machines, ad-hoc queries aren't a good idea reagardless of how you implement them as you will often end up going to all 200 machines.

Instead materialised views make a copy of your data partitioned a different way, which is basically what we've been doing manually in C* for years, this feature aims to remove the heavy lifting.

I'll use the same data model as the last article which is from the KillrWeather application. I will attempt to show use cases which we'd have previously used Spark or duplicated our data manually.

Recall the main data table:

This table assumes our queries are all going to be isolated to a weather station id (wsid) as it is the partition key. The KillrWeather application also has a table with information about each weather station:

I am going to denormalise by adding the columns from the weather_station table directly in the raw_weather_data table ending up with:

Now can we do some awesome things with materialised views? Of course we can!

So imagine you need to read the data not by weather station ID but by state_code. We'd normally have to write more code to duplicate the data manually. Not any more.

First let's insert some data, I've only inserted the primary key columns and one_hour_precip and I may have used UK county names rather than states :)

We can of course query by weather id and time e.g

We can then create a view:

We've asked C* is to materialise select country_code from raw_weather_data but with a different partition key, how awesome is that?? All of the original primary key columns and any columns in your new primary key are automatically added and I've added country_code for no good reason.

With that we can query by state and year as well. I included year as I assumed that partitioning by state would lead to very large partitions in the view table.

Where as secondary indexes go to the original table, which will often result in a multi partition query (a C* anti-pattern), a materialised view is copy of your data partitioned in a new way. This query will be as quick as if you'd duplicated the data your self.

The big take away here is that YOU, the developer, decide the partitioning of the materialised view. This is an important point. There was talk of you only needing to specify a cardinality, e.g low, medium, high or unique and leave C* to decide the partitioning. Where as that would have appeared more user friendly it would be a new concept and a layer of abstraction when IMO it is critical all C* developers/ops understand the importance of partitioning and we already do it every day for tables. You can now use all that knowledge you already have to design good primary keys for views.

The fine print

I'll use the term "original primary key" to refer to the table we're creating a materialised view on and MV primary key for our new view.
  1. You can include any part of the original primary key in your MV primary key and a single column that was not part of your original primary key
  2. Any part of the original primary key you don't use will be added to the end of your clustering columns to keep it a one to one mapping
  3. If the part of your primary key is NULL then it won't appear in the materialised view
  4. There is overhead added to the write path for each materialised view
Confused? Example time!

Original primary key:  PRIMARY KEY ((wsid), year, month, day, hour)
MV primary keyPRIMARY KEY ((state_code, year), one_hour_precip)
Conclusion: No, this is actually the example above and it does not work as we tried to include two columns that weren't part of the original primary key. I updated the original primary key to: PRIMARY KEY ((wsid), year, month, day, hour, state_code) and then it worked.

Original primary key:  PRIMARY KEY ((wsid), year, month, day, hour)
MV primary keyPRIMARY KEY ((state_code, year))
Conclusion: Yes - only one new column in the primary key: state_code

Here are some of the other questions that came up:
  1. Is historic data put into the view on creation? Yes
  2. Can the number of fields be limited in the new view? Yes - in the select clause
  3. Is the view written to synchronously or asynchronously on the write path? Very subject to change! It's complicated! The view mutations are put in the batch log and sent out before the write, the write can succeed before all the view replicas have acknowledged the update but the batch log won't be cleared until a majority of them have responded. See the diagram in this article.
  4. Are deletes reflected? Yes, yes they are!
  5. Are updated reflected? Yes, yes they are!
  6. What happens if I delete a table? The views are deleted
  7. Can I update data via the view? No
  8. What is the overhead? TBD, though it will be similar to using a logged batch if you had duplicated manually.
  9. Will Patrick Mcfadin have to change all his data modelling talks? Well at least some of them

Combining aggregates and MVs? Oh yes

You can't use aggregates in the select clause for creating the materialised view but you can use them when querying the materialised view. So we can now answer questions like what is the total precipitation for a given year for a given state:

We can change our view to include the month in its key and do the same for monthly:

And then we can do:

Though my data had all the rain in one month :)


This feature changes no key concepts for C* data modelling it simply makes the implementation of the intended data model vastly easier. If you have data set that doesn't fit on a single server you're still denormalising and duplicating for performance and scalability, C* will just do a huge amount of it for you in 3.0.

Friday, July 3, 2015

A few more Cassandra aggregates...

My last post was on UDAs in C* 2.2 beta. C*2.2 is now at RC1 so again everything in this post is subject to change. I'm running off 3.0 trunk so it is even more hairy. Anyway there are more built in UDAs now so let's take a look...

I am going to be using the schema from KillrWeather to illustrate the new functionality. KillrWeather is a cool project that uses C* for its storage and a combination of Spark batch and Spark Streaming to provide analytics on weather data.

Now C* hasn't previously supported aggregates but 2.2 changes all that, so let's see which parts of KillrWeather we can ditch the Spark and go pure C*.

The raw weather data schema:

Spark batch is used to populate the high low "materialised view" table:

The code from KillrWeather Spark batch:

There's a lot going on here as this code is from a fully fledged Akka based system. But essentially it is running a Spark batch job against a C* partition and then using the Spark StatsCounter to work out the max/min temperature etc. This is all done against the raw table, (not shown) the result is passed back to the requester and asynchronously saved to the C* daily_aggregate table.

Stand alone this would look something like:

Now let's do something crazy and see if we can do away with this extra table and use C* aggregates directly against the raw data table:

Because we have the year, month, as clusterting columns we can get the max/min/avg all from the raw table. This will perform nicely as it is within a C* partition, don't do this across partitions! We haven't even had to define our own UDFs/UDAs as max and mean are built in. I wanted to analyse how long this UDA was taking but it currently isn't in trace so I raised a jira.

The next thing KillrWeather does is keep this table up to date with Spark streaming:

Can we do that with built in UDAs? Uh huh!

The data is a little weird as for one_hour_precip there are negative values, hence why it appears that we have less rain in a month than we do in a single day in that month.

We can also do things that don't include a partition key like get the max for all weather stations, but this will be slow / could cause OOM errors if you have a large table:

All the raw text for the queries are on my GitHub.

Thursday, May 28, 2015

Cassandra Aggregates - min, max, avg, group by

This blog has moved to and won't be updated here.

Disclaimer: all this was against 2.2-beta so the syntax may have changed.

Cassandra 2.2 introduced user defined functions and user defined aggregates. We can now do things like min, max and average on the server rather than having to bring all the data back to your application. Max and min are built in but we'll see how you could have implemented them your self.


Here's an example table for us to try and implement max/min against.

User defined aggregates work by calling your user defined function on every row returned from your query, they differ from a function because the first value to the function is state that is passed between rows, much like a fold.

Creating an aggregate is a two or three step process:
  1. Create a function that takes in state (any Cassandra type including collections) as the first parameter and any number of additional parameters
  2. (Optionally) Create a final function that is called after the state function has been called on every row
  3. Refer to these in an aggregate
For max we don't need a final function but we will for average later.

Here we're using Java for the language (you can also use JavaScript) and just using Math.max. For our aggregate definition we start with (INITCOND) null (so it will return null for an empty table) and then set the state to be the max of the current state and the value passed in. We can our new aggregate like:


So there's no group by keyword in Cassandra but you can get similar behaviour with a custom user defined aggregate. Imagine you had a table that kept track of everything your customers did e.g

We can write a UDA to get a count of a particular column:

And we keep track of the counts in a map. Example use for counting both the event_type and the origin of the event:

More often than not when you use group by in other databases you are totalling another field. For example imagine we were keeping track of customer purchases and wanted a total amount each customer has spent:

We can create a generate aggregate for that called group_and_total:

And an example usage:

As you can see Haddad spends way too much.


The Cassandra docs have an example of how to use a user defined aggregate to calculate aggregates:

Small print

If you've ever heard me rant about distributed databases you've probably heard me talk about scalable queries, ones that work on a 3 node cluster as well as a 1000 node cluster. User defined functions and aggregates are executed on the coordinator. So if you don't include a partition key in your query all the results are brought back to the coordinator for your function to be executed, if you do a full table scan for your UDF/A don't expect it to be fast if your table is huge.

This functionality is in beta for a very good reason, it is user defined code running in your database! Proper sandboxing e.g with a SecurityManager will be added before this goes mainstream.

Sunday, May 10, 2015

Building well tested applications with SpringBoot / Gradle / Cucumber / Gatling

I am a huge test first advocate. Since seeing Martin Thompson speak I am now trying to include performance testing with the same approach. I am going to call this approach Performance, Acceptance and Unit Test Driven Development, or PAUTDD :)

Tools/frameworks/library come and go so I'll start with the theory then show how I set this up using Junit and Mockio for unit testing, Cucumber for acceptance tests and Gatling for performance tests. Well I won't show JUnit and Mockito because that is boring!

So here's how I develop a feature:
  1. Write a high level end to end acceptance test. There will be times where I'll want acceptance tests not to be end to end, like if there was an embedded rules engine.
  2. Write a basic performance test.
  3. TDD with unit testing framework until the above two pass.
  4. Consider scenario based performance test.
I hate to work without test first at the acceptance level, even for a small feature (half a day dev?) I find them invaluable for keeping me on track and letting me know when functionally I am done. Especially if I end up doing a bit too much Unit testing/Mocking (bad Chris!) as when head down in code it is easy to forget the big picture: what functionality are you developing for a user. 

Next is a newer part of my development process. Here I want a performance test for the new feature, this may not be applicable but it usually is. Which ever framework I am using here I want to be able to run it locally and as part of my continuous integration for trend analysis. However I want more for my effort than that, I really want to be able to use the same code for running against various environments running different hardware.

I hate performance tests that aren't in source control and versioned. Tools that use a GUI are no use to me, I constantly found while working at my last company that the "performance testers" would constantly change their scripts and this would change the trend way more than any application changes. I want to be able to track both.

So how to set all this up for a Spring Boot application using Cucumber and Gatling?

This post is on build setup, not the actual writing of the tests. My aim is to enable easy acceptance/performance testing.

Here's the layout of my latest project:

Main is the source, test is the unit tests and e2e is both the Cucumber and the Gatling tests. I could have had separate source sets for the Cucumber and Gatling tests but that would have confused IntelliJ's Gradle support too much (and they are nicely split by Cucumber being Java and Gatling being Scala).

Cucumber JVM

There are various articles on Cucumber-JVM, here's the steps I used to get this running nicely in the IDE and via Gradle.

First the new source set:

Nothing exciting here, we are using the same classapth as test, we could have had a separate one.

Next is dependencies, this is actually the Gatling and HTTP client (for hitting our application) as well.

We cucumber JUnit and Spring dependencies.

Next is the source code for the acceptance tests. The Features are in the resource folder and the source is in the Java folder. To allow running via Gradle you also create a JUnit test to run all the features. Intellij should work find without this by just running the feature files.

Here I have Features separated by type, here's an example Feature:

I like to keep the language at a high level so a non-techy can write these. The JUnit test RunEndToEndTests looks like this:

This is what Gradle will pick up when we run this from the command line. You could separate this out into multiple tests if you wanted.

For running inside IntelliJ you might need to edit the run configuration to include a different Glue as by default it will be the same as the package your Feature file is in, for this project this wouldn't pick up the GlobalSteps as it is outside of the security/users folder. This is what my configuration looks like, I set this as the default:

Now our features will run if you want to see what the implementation of the Steps look like, checkout the whole project from Github.


This is my first project using Gatling, I wanted my scenarios in code that I could have in version control. Previously I've used JMeter. Where as you can checkin the XML it really isn't nice to look at in diff tools. I've also been forced *weep* to use more GUI based tools like SOASTA and HP Load runner. One thing I haven't looked at is Gatling's support for running many agents. For me to continue using Gatling beyond developer trend analysis this needs to be well supported.

We already have the dependencies, see the dependencies section above, and we're going to use the same source set. The only difference is we're going to be writing these tests in Scala.

My first requirement was not to have to have Gatling installed manually on developer and CI boxes. Here's how to do this in Gradle:

Where BasicSimulation is the fully qualified name of my Gatling load test. All we do here is define a JavaExec task with the Gatling main class, tell Gatling where our source is and which simulation to run.

To tie all this together so it runs every time we fun Gradle check we add the following at the bottom of our Gradle build file:

This will produce reports that can be published + consumed by the Jenkins Gatling plugin.

To run the same load test from Intellij we do exactly the same in a run configuration:

A basic Gatling tests looks like this:

This test will run a very simple test where a single virtual user hits the /api/auction URL 100 times with a pause of 10 milliseconds. The top couple of lines start the Spring boot application and register a shut down hook to stop it.

We'll then end up with a report that looks like this:

This is a pretty terrible load test as it runs for 2 seconds and has a single user. But the point of this post is to setup everything so when adding new functionality it is trivial to add new performance and acceptance tests.

That's it, happy testing! If you want to see the whole project is on Github. It us under active development so you'll know how I got on with Gatling based if it is still there and there are lots of tests that use it!