Tuesday, January 14, 2014

Akka: Testing that an actor sends a message back to the sender

A common pattern when using actors is for actor A to send a message to Actor B and then for Actor B to send a message back.

This can be tested using the Akka testkit along with Scala Test.

Take the following very simple example. We have an Actor called Server which should accept Startup messages and respond with a Ready message when it is ready for clients to send additional messages.

We can start with a noddy implementation that does nothing:

Then write a test to specify the behaviour. Here I've used TestKit along with Scala Test FunSuite.

This test will fail with the following error message:

assertion failed: timeout (3 seconds) during expectMsg while waiting for Ready

As you can probably guess TestKit waited for 3 seconds for a Ready message to be sent back.

To fix the test we add the the following to the Server Actor implementation:

And now the test will pass! The two important things to take note of are that our test case extended from TestKit, this gives you an ActorSystem. And that the test case mixed in the ImplicitSender trait, this allows us to receive messages use the methods like "expectMsg" to assert that the correct message has been received.

Wednesday, October 2, 2013

Installing Cassandra 2.0 on Ubuntu

Update your apt source list with the following:

sudo vim /etc/apt/sources.list

#Add at the bottom
deb http://www.apache.org/dist/cassandra/debian 20x main
deb-src http://www.apache.org/dist/cassandra/debian 20x main

Run an apt-get update. 

sudo apt-get update

This will give you a warning about not being able to verify the signatures of the apache repos:

GPG error: http://www.apache.org unstable Release:
The following signatures couldn't be verified because the public key is not available:
NO_PUBKEY 4BD736A82B5C1B00


Now do the following for that key:

gpg --keyserver pgp.mit.edu --recv-keys 4BD736A82B5C1B00
gpg --export --armor 4BD736A82B5C1B00 | sudo apt-key add -

Also add this one:

gpg --keyserver pgp.mit.edu --recv-keys 2B5C1B00
gpg --export --armor 2B5C1B00 | sudo apt-key add -

Now run apt-get update again.
sudo apt-get update

The error should be gone. Now check that all is working and UBuntu can see Cassandra 2.0:

apt-cache showpkg cassandra
Package: cassandra
Versions:
2.0.1


Great! Now install it:

sudo apt-get install cassandra

Now start it:

sudo service cassandra start
xss = -ea -javaagent:/usr/share/cassandra/lib/jamm-0.2.5.jar -XX:+UseThreadPriorities -XX:ThreadPriorityPolicy=42 -Xms1001M -Xmx1001M -Xmn100M -XX:+HeapDumpOnOutOfMemoryError -Xss256k


Now you can check you can connect:
cqlsh
Connected to Test Cluster at localhost:9160.
[cqlsh 4.0.1 | Cassandra 2.0.1 | CQL spec 3.1.1 | Thrift protocol 19.37.0]
Use HELP for help.


Where is everything?

  • Logs: /var/log/cassandra
  • Config: /etc/cassandra/
  • Data: /var/lib/cassandra


Done!
 








Tuesday, October 1, 2013

Using Cassandra on Mac OSX

I posted some time ago about installing Cassandra on Mac OSX. Admittedly I generally use Linux when dealing with Cassandra but have recently been using it on Mac OSX again so here are some tips when working with Cassandra on ac OSX.

Install it with homebrew 


It's easy! The only reason for not using homebrew is if you want a specific version. I have an old blog post on installing it with homebrew here: install Cassandra on Mac OSX. If you want 1.2 rather than 2.0 read below first.

The default formula for Cassandra is now 2.0. If you aren't that cutting edge and want to stick to  Cassandra 1.2 then you need to do some tinkering. First off do a brew update & tap to the versions branch:

brew update
brew tap homebrew/versions

Now lets see what we get for cassandra:

brew search cassandra
cassandra      cassandra-0.6  cassandra12

Homebrew have kindly created three formulas you can work with: 0.6, 1.2 and the latest (currently 2.0). If you want 1.2 simply do:

brew install cassandra12 

Rather than brew install Cassandra. By default the brew installed Cassandra will use the same config/data locations for 1.2 and 2 so you can't (without work) use brew to manage multiple versions of Cassandra on your Mac - but if you want that you probably should use VMs instead.

Cassandra is installed: Where is everything?


All of this applies regardless of whether you're on Cassandra 1.2 or Cassandra 2.0. Package managers are great but sometimes they leave you baffled to where they put everything!

Where's my Cassandra yaml and other property files? /usr/local/etc/cassandra

Where's my logs? /usr/local/var/log/cassandra/
  • This can be updated by modifying /usr/local/etc/cassandra/log4j-server.properties
Where's the data/commit log etc (you may need to delete this when playing with different versions / partitioners) ? /usr/local/var/lib/cassandra/data

How do I stop and start Cassandra?


If you're used to unix services/init.d etc you'll want to know how to start/stop Cassandra without the kill command. On Mac this is launchd using the launchctl utility. Assuming you installed Cassandra using homebrew use the following commands:

launchctl start homebrew.mxcl.cassandra
launchctl stop homebrew.mxcl.cassandra

That's a lot of typing so I tend to alias these in my profile e.g

alias stop_cassandra="launchctl stop homebrew.mxcl.cassandra"
alias start_cassandra="launchctl start homebrew.mxcl.cassandra"

Cassandra: Datastax Java driver retry policy

The Datastax Java Driver for Cassandra exposes its strategy for retrying via the following interface:

There are three scenarios you can control retry policy for:
  1. Read time out: When a coordinator received the request and sent the read to replica(s) but the replica(s) did not respond in time
  2. Write timeout: As above but for writes
  3. Unavailable: When the coordinator is aware there aren't enough replica available without sending the read/write request on

What is the default behaviour?


The DefaultRetryPolicy retries with the following behaviour:
  1. Read timeout: When enough replica are available but the data did not come back within the configured read time out 
  2. Write timeout: Only if the initial phase of a batch write times out - see cassandra batch statement
  3. Unavailable timeout: Never

How do I configure the value for the read and write timeout?


This is configured in the cassandra.yaml on the Cassandra server.  The default is 10 seconds, you can change the following properties:
# How long the coordinator should wait for read operations to complete
read_request_timeout_in_ms: 10000
# How long the coordinator should wait for writes to complete
write_request_timeout_in_ms: 10000

What are the other policies?


DowngradingConsistencyRetryPolicy

The most complicated retry policy and comes with a big warning: your read/write may be re-tried at a lower consistency. So if you have business requirements to not report success if you don't meet a certain level of consistency then use this with cation.

What does it do?
  1. Read: If at least one replica responded then the read is retried at a lower consistency
  2. Write: Retries for unlogged batch queries when at least one replica responded (see http://www.datastax.com/dev/blog/atomic-batches-in-cassandra-1-2) and for all other types of writes the timeout is just ignored if at least one replica acknowledged the write (essentially ignoring the consistency request)
  3. Unavailable: If at least one replica is available then the query is re-tried with a lower consistency

FallthroughRetryPolicy

No retrying! Any failure is re-thrown to the client.

LoggingRetryPolicy


This is just a decorator policy that you can wrap around any other policy that logs ignored (no retry) and any actual retries. The driver uses SLF4J and logs at INFO level.

How do I use a different policy?


Simply add it with creating your Cluster. The retry policies all have a singleton you can use e.g:


Conclusion


The Datastax driver is very open for extension as it exposes its strategies for retry, load balancing and reconnection. 

The retry policy is very easy to work with as all the current implementations are stateless. I'll follow this post up with how to implement your own retry policy.

Wednesday, September 25, 2013

Talk: Introduction to Cassandra, CQL, and the Datstax Java Driver

Presenter: Johnny Miller
Who is he? Datastax Solutions Architect
Where? Skills matter exchange in London

I went to an "introductory" talk even though I have a lot of experience with Cassandra for a few reasons:
  • Meet other people in London that are using Cassandra
  • To discover what I don't know about Cassandra
Here are my notes that in roughly the same order as the talk.

What's Cassandra? The headlines

  • Been around for ~5 years - originally developed by Facebook for their inbox search
  • Distributed key store - column orientated data model
  • Tuneable consistency - per request decide how consistent you want the response to be
  • Datacenter aware with asynchronous replication
  • Designed for use as a cluster - not much value in a single node Cassandra deployment

Gossip - how nodes in a cluster learn about other nodes

  • P2P protocol for how nodes discover location and state of other nodes
  • New nodes are given seed nodes for bootstrapping - but these aren't single points of failure as they aren't used again

Data distribution and replication

  • Replication factor: How many nodes each piece of data is stored on
  • Each node is given a range of primary keys to look after

Partitioners - How to decide which node gets what data

  • Row keys are hashed to decide node then a replication strategy defines how to pick the other replicas

Replicas - how to select where else the data lives

  • All replicas are equally important. No difference between the node the key hashed to and the other replicas that were selected
  • Two ways to pick the other replicas:
    • Simple: Only single DC. Specify just a replication factor. Hashes the key and then walks the cluster and picks the replicas. Not very clever - all replicas could end up on the same rack
    • Network: Configure with a RF per DC. Walk the ring for each DC until it reaches a node in another rack

Snitch - how to define a data centre and a rack

  • Informs Cassandra about node topology, designates DC and Rack for every node
  • Example: Rack inferring snitch designates DC and Rack based on the IP of the node 
  • Example: Property file snitch where every node has the DC and Rack of every other node
  • Example: GossipingPropertyFileSnitch: Every node knows its own DC and Rack and tells other nodes via Gossip
  • Dynamic snitching: monitors performance of reads, this snitch wraps the other snitches to respond to network latency

Client requests

  • Connect to any client in the node - becomes the coordinator. This node knows which nodes to talk to for the request
  • Multi DC - picks a coordinator in the other data centre  to replicate data there or to get data for a read

Consistency

  • Quorum = (Replication Factor/2) + 1 i.e. more than half
  • E.g R = 3, Q = 2, tolerate 1 replica going down to continue reading and writing at Quorum
  • Per request consistency - can decide certain writes are more important and require higher consistency than others
  • Example consistency levels: ANY, ONE, TWO, THREE, QUORUM, EACH_QUORUM, LOCAL_QUORUM
  • SERIAL: New in cassandra 2.0

Write requests - what happens?

  • The coordinator (node the client connects to) forwards the write to all the replicas in the local DC and designates a coordinator in the other DCs to do the same there
  • The coordinator may be a replica but does not need to be
  • For a single node writes first go to commit log (disk), then writes to meltable (memory)
  • When does the write succeed? Depends on consistency e.g a write consistency of ONE means that the data needs to be in the commit log and memtable of at least one replica

Hinted handoff - how Cassandra deals with nodes being down on write

  • Coordinator node keeps hints if one of the replicas down
  • When the node comes back up the hints are then sent to the node so it can catch up
  • Hints are kept for a finite amount of time - default is three hours

Read requests  - what happens?

  • Coordinator contacts a number of nodes depending on the consistency - once enough have responded the read can be successful 
  • Will send requests to node responding the fastest
  • If not consistent - compare timestamps + do a read repair
  • Possible other background read repair

What was missing?

Overall it was a great talk however here is some possible improvements:
  • A glossary/overview at the start? Perhaps a mapping from relational terminology to Cassandra terminology. For example the term keyspace was used a number of times before describing what it is
  • Overview of consistency when talking about eventual consistency - however this did come later? A few scenarios for when read/writes at different consistency levels would fail/succeed would have been very helpful 
  • Compaction required for an intro talk? I thought talking about compaction was a bit too much for an introductory talk as you need to understand memtables and sstables before it makes sense
  • The downsides of Cassandra: for example some forms of schema migration/change is a nightmare when you are using CQL3 + have data you need to migrate

Sunday, September 22, 2013

Scala, MongoDB and Casbah: Dealing with Arrays

Get hold of a collection object using something like this:

scala> val collection = MongoClient()("test")("messages")
collection: com.mongodb.casbah.MongoCollection = messages

Where test is the database and messages is the name of the collection.

Inserting arrays is nice and easy, just build up your MongoDBObject with Lists inside:

scala> collection.insert(MongoDBObject("message" -> "Hello World") ++ ("countries" -> List("England","France","Spain")))
res18: com.mongodb.casbah.Imports.WriteResult = { "serverUsed" : "/127.0.0.1:27017" , "n" : 0 , "connectionId" : 234 , "err" :  null  , "ok" : 1.0}

Use your favourite one liner to print all the objects in the collection:

scala> collection.foreach(println(_))
{ "_id" : { "$oid" : "523f145e30041dae32fd04da"} , "message" : "Hello World" , "countries" : [ "England" , "France" , "Spain"]}

Now lets say you want a list of objects, simply create a list of MongoDBObjects:

scala> collection.insert(MongoDBObject("message" -> "A list of objects?") ++ ("objects" -> List(MongoDBObject("name" -> "England"),MongoDBObject("name" -> "France"))))
res20: com.mongodb.casbah.Imports.WriteResult = { "serverUsed" : "/127.0.0.1:27017" , "n" : 0 , "connectionId" : 234 , "err" :  null  , "ok" : 1.0}

scala> collection.foreach(println(_))
{ "_id" : { "$oid" : "523f145e30041dae32fd04da"} , "message" : "Hello World" , "countries" : [ "England" , "France" , "Spain"]}
{ "_id" : { "$oid" : "523f14b530041dae32fd04db"} , "message" : "A list of objects?" , "objects" : [ { "name" : "England"} , { "name" : "France"}]}

Now reading them back out of Mongo and processing the array items individually. First we can get a hold of an object that contains an array:

scala> val anObjectThatContainsAnArrayOfObjects = collection.findOne().get
anObjectThatContainsAnArrayOfObjects: collection.T = { "_id" : { "$oid" : "523f145e30041dae32fd04da"} , "message" : "Hello World" , "countries" : [ "England" , "France" , "Spain"]}

The extra get is on the end as we used the findOne method this time and it returns an Option. Then we can get just the array field:

val mongoListOfObjects = anObjectThatContainsAnArrayOfObjects.getAs[MongoDBList]("countries").get
mongoListOfObjects: Option[com.mongodb.casbah.Imports.MongoDBList] = Some([ "England" , "France" , "Spain"])

Now we have a handle on a MongoDBList which represents our array in Mongo. The MongoDBList is Iterable so we can loop through and print it out:

scala> mongoListOfObjects.foreach( country => println(country) )
England
France
Spain

Or map it to a sequence of Strings:

scala> val listOfCountries = mongoListOfObjects.map(_.toString)
listOfCountries: scala.collection.mutable.Seq[String] = ArrayBuffer(England, France, Spain)

scala> listOfCountries

res24: scala.collection.mutable.Seq[String] = ArrayBuffer(England, France, Spain)

Friday, September 20, 2013

Scala and MongoDB: Getting started with Casbah

The officially supported Scala driver for Mongo is Casbah. Cashbah is a thin wrapper around the Java MongoDB driver that gives it a Scala like feel. As long as you ignore all the MongoDBObjects then it feels much more like being in the Mongo shell or working in Python that working with Java/Mongo.

All the examples are copied from a Scala REPL launched from an SBT project with Casbah added as a dependency.

So lets get started by importing the Casbah package:

scala> import com.mongodb.casbah.Imports._ 
import com.mongodb.casbah.Imports._

Now lets create a connection to a locally running Mongo and use the "test" database:

scala> val mongoClient = MongoClient()
mongoClient: com.mongodb.casbah.MongoClient = com.mongodb.casbah.MongoClient@2acf0276
scala> val database = mongoClient("test")
database: com.mongodb.casbah.MongoDB = test

And now lets get a reference to the messages collections:

scala> val collection = database("messages")
collection: com.mongodb.casbah.MongoCollection = messages

As you can see Casbah makes heavy use of the apply method to give relatively nice boiler plate connection code. To print all the rows for a collection you can use the find method which returns an iterator (there is none at the moment):

scala> collection.find().foreach(row => println(row) )

Now lets insert some data the using the insert method and then find and print it:

scala> collection.insert(MongoDBObject("message" -> "Hello world"))
res2: com.mongodb.casbah.Imports.WriteResult = { "serverUsed" : "/127.0.0.1:27017" , "n" : 0 , "connectionId" : 225 , "err" :  null  , "ok" : 1.0}

scala> collection.find().foreach(row => println(row) )
{ "_id" : { "$oid" : "523aa69a30048ee48f49c333"} , "message" : "Hello world"}

And adding another document:

scala> collection.insert(MongoDBObject("message" -> "Hello London"))
res4: com.mongodb.casbah.Imports.WriteResult = { "serverUsed" : "/127.0.0.1:27017" , "n" : 0 , "connectionId" : 225 , "err" :  null  , "ok" : 1.0}

scala> collection.find().foreach(row => println(row) )
{ "_id" : { "$oid" : "523aa69a30048ee48f49c333"} , "message" : "Hello world"}
{ "_id" : { "$oid" : "523aa6bf30048ee48f49c334"} , "message" : "Hello London"}

The familiar findone method is there. Rather than an Iterable object returned from find, findOne returns an Option so you can use a basic pattern match to handle the document being there or not:

scala> val singleResult = collection.findOne()
singleResult: Option[collection.T] = Some({ "_id" : { "$oid" : "523aa69a30048ee48f49c333"} , "message" : "Hello world"})

scala> singleResult match {
     |   case None => println("No messages found")
     |   case Some(message) => println(message)
     | }
{ "_id" : { "$oid" : "523aa69a30048ee48f49c333"} , "message" : "Hello world"}

Now lets query using the ID of an object we've inserted (querying by any other field is the same):

scala> val query = MongoDBObject("_id" -> helloWorld.get("_id"))
id: com.mongodb.casbah.commons.Imports.DBObject = { "_id" : { "$oid" : "523aa69a30048ee48f49c333"}}

scala> collection.findOne(query)
res12: Option[collection.T] = Some({ "_id" : { "$oid" : "523aa69a30048ee48f49c333"} , "message" : "Hello world"})

We can also update the document in the database and then get it again to prove it has changed:

scala> collection.update(query, MongoDBObject("message" -> "Hello Planet"))
res13: com.mongodb.WriteResult = { "serverUsed" : "/127.0.0.1:27017" , "updatedExisting" : true , "n" : 1 , "connectionId" : 225 , "err" :  null  , "ok" : 1.0}

scala> collection.findOne(query)
res14: Option[collection.T] = Some({ "_id" : { "$oid" : "523aa69a30048ee48f49c333"} , "message" : "Hello Planet"})

The remove method works in the same way, just pass in a MongoDBObject for the selection criterion.

Not look Scalary enough for you? You can also insert using the += method:

scala> collection += MongoDBObject("message"->"Hello England")
res15: com.mongodb.WriteResult = { "serverUsed" : "/127.0.0.1:27017" , "n" : 0 , "connectionId" : 225 , "err" :  null  , "ok" : 1.0}

scala> collection.find().foreach(row => println(row))
{ "_id" : { "$oid" : "523aa69a30048ee48f49c333"} , "message" : "Hello Planet"}
{ "_id" : { "$oid" : "523aa6bf30048ee48f49c334"} , "message" : "Hello London"}
{ "_id" : { "$oid" : "523c911230048ee48f49c335"} , "message" : "Hello England"}

How do you build more complex document in Scala? Simply use the MongoDBObject ++ method, for example we can create an object with multiple fields, insert it, then view it by printing all the documents in the collection:

scala> val moreThanOneField = MongoDBObject("message" -> "I'm coming") ++ ("time" -> "today") ++ ("Name" -> "Chris")
moreThanOneField: com.mongodb.casbah.commons.Imports.DBObject = { "message" : "I'm coming" , "time" : "today" , "Name" : "Chris"}

scala> collection.insert(moreThanOneField)
res6: com.mongodb.casbah.Imports.WriteResult = { "serverUsed" : "/127.0.0.1:27017" , "n" : 0 , "connectionId" : 234 , "err" :  null  , "ok" : 1.0}

scala> collection.find().foreach(println(_) )
{ "_id" : { "$oid" : "523aa69a30048ee48f49c333"} , "message" : "Hello Planet"}
{ "_id" : { "$oid" : "523aa6bf30048ee48f49c334"} , "message" : "Hello London"}
{ "_id" : { "$oid" : "523c911230048ee48f49c335"} , "message" : "Hello England"}
{ "_id" : { "$oid" : "523c96b530041dae32fd04d6"} , "message" : "I'm coming" , "time" : "today" , "Name" : "Chris"}