Disclaimer: all this was against 2.2-beta so the syntax may have changed.
Cassandra 2.2 introduced user defined functions and user defined aggregates. We can now do things like min, max and average on the server rather than having to bring all the data back to your application. Max and min are built in but we'll see how you could have implemented them your self.
Max/Min
Here's an example table for us to try and implement max/min against.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE keyvalue(key text , value int ); | |
INSERT INTO keyvalue (key, value ) VALUES ( 'chris', 5); | |
INSERT INTO keyvalue (key, value ) VALUES ( 'luke', 10); | |
INSERT INTO keyvalue (key, value ) VALUES ( 'patrick', 15); | |
INSERT INTO keyvalue (key, value ) VALUES ( 'haddad', 20); |
User defined aggregates work by calling your user defined function on every row returned from your query, they differ from a function because the first value to the function is state that is passed between rows, much like a fold.
Creating an aggregate is a two or three step process:
- Create a function that takes in state (any Cassandra type including collections) as the first parameter and any number of additional parameters
- (Optionally) Create a final function that is called after the state function has been called on every row
- Refer to these in an aggregate
For max we don't need a final function but we will for average later.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE FUNCTION maxI(current int, candidate int) | |
CALLED ON NULL INPUT | |
RETURNS int LANGUAGE java AS | |
'if (current == null) return candidate; else return Math.max(current, candidate);' ; | |
CREATE AGGREGATE maxAgg(int) | |
SFUNC maxI | |
STYPE int | |
INITCOND null; |
Here we're using Java for the language (you can also use JavaScript) and just using Math.max. For our aggregate definition we start with (INITCOND) null (so it will return null for an empty table) and then set the state to be the max of the current state and the value passed in. We can our new aggregate like:
GroupBy
So there's no group by keyword in Cassandra but you can get similar behaviour with a custom user defined aggregate. Imagine you had a table that kept track of everything your customers did e.g
We can write a UDA to get a count of a particular column:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION state_group_and_count( state map<text, int>, type text ) | |
CALLED ON NULL INPUT | |
RETURNS map<text, int> | |
LANGUAGE java AS ' | |
Integer count = (Integer) state.get(type); if (count == null) count = 1; else count++; state.put(type, count); return state; ' ; | |
CREATE OR REPLACE AGGREGATE group_and_count(text) | |
SFUNC state_group_and_count | |
STYPE map<text, int> | |
INITCOND {}; |
And we keep track of the counts in a map. Example use for counting both the event_type and the origin of the event:
More often than not when you use group by in other databases you are totalling another field. For example imagine we were keeping track of customer purchases and wanted a total amount each customer has spent:
We can create a generate aggregate for that called group_and_total:
More often than not when you use group by in other databases you are totalling another field. For example imagine we were keeping track of customer purchases and wanted a total amount each customer has spent:
We can create a generate aggregate for that called group_and_total:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE FUNCTION state_group_and_total( state map<text, int>, type text, amount int ) | |
CALLED ON NULL INPUT | |
RETURNS map<text, int> | |
LANGUAGE java AS ' | |
Integer count = (Integer) state.get(type); if (count == null) count = amount; else count = count + amount; state.put(type, count); return state; ' ; | |
CREATE OR REPLACE AGGREGATE group_and_total(text, int) | |
SFUNC state_group_and_total | |
STYPE map<text, int> | |
INITCOND {}; |
And an example usage:
As you can see Haddad spends way too much.
Average
The Cassandra docs have an example of how to use a user defined aggregate to calculate aggregates: http://cassandra.apache.org/doc/cql3/CQL-2.2.html#udas
Small print
If you've ever heard me rant about distributed databases you've probably heard me talk about scalable queries, ones that work on a 3 node cluster as well as a 1000 node cluster. User defined functions and aggregates are executed on the coordinator. So if you don't include a partition key in your query all the results are brought back to the coordinator for your function to be executed, if you do a full table scan for your UDF/A don't expect it to be fast if your table is huge.
This functionality is in beta for a very good reason, it is user defined code running in your database! Proper sandboxing e.g with a SecurityManager will be added before this goes mainstream.