Friday, July 31, 2015

Stubbed Cassnadra 0.9.1: 2.2 support, query variables and verifification of batches and PS preparations

Version 0.9.1 has a few nice features and has reduced a lot of technical debt. The first features aren't that noticeable from a users point of view:

Java 1.6 support! This was contributed by Andrew Tolbert as he wanted to test against the 1.* branch of the C* Java driver which still supports Java 1.6. I hadn't intentionally dropped 1.6 support but I had brought in a jar that was compiled against 1.7 and thus wouldn't run in a 1.6 JVM. The offending jar was an internal C* jar that could help with serialisation so it was quite a lot of work to replace the use of this jar with custom serialisation code.

Another non-feature feature: Moving to Gradle and having a single build for the Server, Java client and Integration tests against all the versions of the C* Java driver. The aim of this was to make it MUCH easier for other people to contribute as before you had to install the server to a maven repo, then build the client and install it, then run the tests. Now you just run: ./gradlew clean check, Simples.

Real features


Verification of batch statements containing queries. Priming and prepared statements in batches will be in the next release.

Support for version 2.2 of the C* driver.

Verification of the prepare of prepared statements. This will allow you to test you only prepare the statements once and at the right time i.e. application start up. 

Queries that contain variables are now captured properly. As of C* 2.0 you can use a prepared statement like syntax for queries and pass in variables. These are now captured as long as you primed the types of the variables (so Stubbed Cassandra knows how to parse them).

A farJar task so you build a standalone executable. Useful if you aren't using Stubbed Cassandra from a build tool like maven.

As always please raise issues or reach out to be on twitter if you have any questions or feedback.

Tuesday, July 7, 2015

Cassandra 3.0 materialised views in action (pre-release)

Disclaimer: C* 3.0 is not released yet and all these examples are from a branch that hasn't even made it to trunk yet.

So this feature started off as "Global indexes", the final result is not a global index and I don't trust any claim of distributed indexes anyhow. If your data is spread across 200 machines, ad-hoc queries aren't a good idea reagardless of how you implement them as you will often end up going to all 200 machines.

Instead materialised views make a copy of your data partitioned a different way, which is basically what we've been doing manually in C* for years, this feature aims to remove the heavy lifting.

I'll use the same data model as the last article which is from the KillrWeather application. I will attempt to show use cases which we'd have previously used Spark or duplicated our data manually.

Recall the main data table:

This table assumes our queries are all going to be isolated to a weather station id (wsid) as it is the partition key. The KillrWeather application also has a table with information about each weather station:

I am going to denormalise by adding the columns from the weather_station table directly in the raw_weather_data table ending up with:

Now can we do some awesome things with materialised views? Of course we can!

So imagine you need to read the data not by weather station ID but by state_code. We'd normally have to write more code to duplicate the data manually. Not any more.

First let's insert some data, I've only inserted the primary key columns and one_hour_precip and I may have used UK county names rather than states :)


We can of course query by weather id and time e.g



We can then create a view:


We've asked C* is to materialise select country_code from raw_weather_data but with a different partition key, how awesome is that?? All of the original primary key columns and any columns in your new primary key are automatically added and I've added country_code for no good reason.

With that we can query by state and year as well. I included year as I assumed that partitioning by state would lead to very large partitions in the view table.



Where as secondary indexes go to the original table, which will often result in a multi partition query (a C* anti-pattern), a materialised view is copy of your data partitioned in a new way. This query will be as quick as if you'd duplicated the data your self.

The big take away here is that YOU, the developer, decide the partitioning of the materialised view. This is an important point. There was talk of you only needing to specify a cardinality, e.g low, medium, high or unique and leave C* to decide the partitioning. Where as that would have appeared more user friendly it would be a new concept and a layer of abstraction when IMO it is critical all C* developers/ops understand the importance of partitioning and we already do it every day for tables. You can now use all that knowledge you already have to design good primary keys for views.

The fine print


I'll use the term "original primary key" to refer to the table we're creating a materialised view on and MV primary key for our new view.
  1. You can include any part of the original primary key in your MV primary key and a single column that was not part of your original primary key
  2. Any part of the original primary key you don't use will be added to the end of your clustering columns to keep it a one to one mapping
  3. If the part of your primary key is NULL then it won't appear in the materialised view
  4. There is overhead added to the write path for each materialised view
Confused? Example time!

Original primary key:  PRIMARY KEY ((wsid), year, month, day, hour)
MV primary keyPRIMARY KEY ((state_code, year), one_hour_precip)
Conclusion: No, this is actually the example above and it does not work as we tried to include two columns that weren't part of the original primary key. I updated the original primary key to: PRIMARY KEY ((wsid), year, month, day, hour, state_code) and then it worked.

Original primary key:  PRIMARY KEY ((wsid), year, month, day, hour)
MV primary keyPRIMARY KEY ((state_code, year))
Conclusion: Yes - only one new column in the primary key: state_code

Here are some of the other questions that came up:
  1. Is historic data put into the view on creation? Yes
  2. Can the number of fields be limited in the new view? Yes - in the select clause
  3. Is the view written to synchronously or asynchronously on the write path? Very subject to change! It's complicated! The view mutations are put in the batch log and sent out before the write, the write can succeed before all the view replicas have acknowledged the update but the batch log won't be cleared until a majority of them have responded. See the diagram in this article.
  4. Are deletes reflected? Yes, yes they are!
  5. Are updated reflected? Yes, yes they are!
  6. What happens if I delete a table? The views are deleted
  7. Can I update data via the view? No
  8. What is the overhead? TBD, though it will be similar to using a logged batch if you had duplicated manually.
  9. Will Patrick Mcfadin have to change all his data modelling talks? Well at least some of them

Combining aggregates and MVs? Oh yes


You can't use aggregates in the select clause for creating the materialised view but you can use them when querying the materialised view. So we can now answer questions like what is the total precipitation for a given year for a given state:



We can change our view to include the month in its key and do the same for monthly:

And then we can do:



Though my data had all the rain in one month :)

Conclusion


This feature changes no key concepts for C* data modelling it simply makes the implementation of the intended data model vastly easier. If you have data set that doesn't fit on a single server you're still denormalising and duplicating for performance and scalability, C* will just do a huge amount of it for you in 3.0.

Friday, July 3, 2015

A few more Cassandra aggregates...

My last post was on UDAs in C* 2.2 beta. C*2.2 is now at RC1 so again everything in this post is subject to change. I'm running off 3.0 trunk so it is even more hairy. Anyway there are more built in UDAs now so let's take a look...

I am going to be using the schema from KillrWeather to illustrate the new functionality. KillrWeather is a cool project that uses C* for its storage and a combination of Spark batch and Spark Streaming to provide analytics on weather data.

Now C* hasn't previously supported aggregates but 2.2 changes all that, so let's see which parts of KillrWeather we can ditch the Spark and go pure C*.

The raw weather data schema:

Spark batch is used to populate the high low "materialised view" table:

The code from KillrWeather Spark batch:

There's a lot going on here as this code is from a fully fledged Akka based system. But essentially it is running a Spark batch job against a C* partition and then using the Spark StatsCounter to work out the max/min temperature etc. This is all done against the raw table, (not shown) the result is passed back to the requester and asynchronously saved to the C* daily_aggregate table.

Stand alone this would look something like:

Now let's do something crazy and see if we can do away with this extra table and use C* aggregates directly against the raw data table:




Because we have the year, month, as clusterting columns we can get the max/min/avg all from the raw table. This will perform nicely as it is within a C* partition, don't do this across partitions! We haven't even had to define our own UDFs/UDAs as max and mean are built in. I wanted to analyse how long this UDA was taking but it currently isn't in trace so I raised a jira.

The next thing KillrWeather does is keep this table up to date with Spark streaming:

Can we do that with built in UDAs? Uh huh!


The data is a little weird as for one_hour_precip there are negative values, hence why it appears that we have less rain in a month than we do in a single day in that month.

We can also do things that don't include a partition key like get the max for all weather stations, but this will be slow / could cause OOM errors if you have a large table:



All the raw text for the queries are on my GitHub.