Tuesday, February 3, 2015

Unit testing Kafka applications

I recently started working with Kafka. The first thing I do when start with a tech is work out how I am going to write tests as I am a TDD/XP nut.

For HTTP I use Wiremock, for Cassandra I wrote a test double called Stubbed Cassandra. The term test double comes from the awesome book Release It! where it recommends for each technology you integrate with having a test double that you can prime to fail in every way possible.

I couldn't find anything for Kafka but I did find a couple of blogs and gists for people running Kafka/Zookeeper in the same JVM as tests.

That's a start, I took it one step further and wrote a version that will hide away all the details, including a JUnit rule so you don't even need to start/stop it for tests as well as convenient methods to send and receive messages. Here's an example of an integration test for the KafkaUnit class:


Let's say you have some code that sends a message to Kafka, like this:

A unit test would look something like this:

It is in Maven Central, so if you want to use it just add the following dependency:

<dependency>
  <groupId>info.batey.kafka</groupId>
  <artifactId>kafka-unit</artifactId>
  <version>0.1.1</version>
</dependency>

If you want to contribute check it out on github.

It is pretty limited so far, assumed String messages etc. If I keep working with Kafka I'll extend it and add support for injecting faults etc. Also for the next version I'll come up with a versioning mechanism that includes the Kafka version.

10 comments:

Yomi said...

Easy to use and works as expected. Great tool, thanks for sharing!

Unknown said...

Hi Christopher,

I've tried using this package for unit tests, but I keep getting
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
error.

I'm using new Kafka Producer - maybe that's the problem?

My code below.

Producer
package org.apache.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class SampleProducer {

private KafkaProducer producer;
private static Properties prodProp = new Properties();

public SampleProducer() {
prodProp.put("bootstrap.servers", "sandbox.hortonworks.com:6667");
prodProp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prodProp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prodProp.put("request.required.acks", "1");

this.producer = new KafkaProducer(prodProp);
}

public SampleProducer(Properties inProperties) {
this.producer = new KafkaProducer(inProperties);
}

public void sendMsgToTopic(String inMessage) {

StringBuilder msg = new StringBuilder(inMessage);

ProducerRecord msgData = new ProducerRecord("testTopic", msg.toString());
Future rs = this.producer.send(msgData, new Callback() {
public void onCompletion(RecordMetadata rM, Exception e) {
System.out.println("Received ack for partition=" + rM.partition() + " offset=" + rM.offset());
}
});

try {
RecordMetadata rM = (RecordMetadata) rs.get();
msg.append(" partition=" + rM.partition() + " offset=" + rM.offset());
} catch (Exception e) {
System.out.println(e);
}
}

public void closeProducer() {
this.producer.close();
}
}

Unit test
package org.apache.kafka;

import static org.junit.Assert.*;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import info.batey.kafka.unit.KafkaUnit;
import info.batey.kafka.unit.KafkaUnitRule;

public class KafkaUnitIntegrationTest {


private static String topicName = "TestTopic";
private SampleProducer underTest;
private KafkaUnit kafkaUnitServer;

@Before
public void setup() {

kafkaUnitServer = new KafkaUnit(5000, 5001);
kafkaUnitServer.startup();

Properties prodProp = new Properties();
prodProp.put("bootstrap.servers", "localhost:5001");
prodProp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prodProp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prodProp.put("request.required.acks", "1");

underTest = new SampleProducer(prodProp);
//underTest = new SampleProducer();
kafkaUnitServer.createTopic(topicName);
}

@Test
public void testKafkaProducer() throws Exception {
System.out.println("testKafkaProducer");
//send message
underTest.sendMsgToTopic("testMsg");

//test message
List messages = kafkaUnitServer.readMessages(topicName, 1);
assertEquals(Arrays.asList("testMsg"), messages);
}

@After
public void shutdown() {
kafkaUnitServer.shutdown();
}
}

Pavan Sannadi said...

hi,
I am writing a consumer app to fetch data from one topic in kafka and writing it to some server. After sending i am doing commitoffsets manually instead of automatically. Could u please look in to the scenario and tell me how to test "consumer.commitoffsets" alone..
here is the code....
I am using high-level consumer. i made many changes. i just wanted to test that whether it is committing corrrectly or not.

import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.jackson.map.JsonMappingException;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import com.company.domain.utils.GraphiteConsumerConstants;
import com.company.domain.utils.JsonParserUtility;

public class GraphiteConsumer {
Log log = LogFactory.getLog(GraphiteConsumer.class);

public void fetchMetricDataFromKafka(Socket clientSocket,
ConsumerConnector consumer) throws UnknownHostException,
IOException {
Map topicMap = new HashMap();
topicMap.put(GraphiteConsumerConstants.TOPIC, new Integer(1));
Map>> consumerStreamsMap = consumer
.createMessageStreams(topicMap);
List> streamList = consumerStreamsMap
.get(GraphiteConsumerConstants.TOPIC);
DataOutputStream outToServer = new DataOutputStream(
clientSocket.getOutputStream());
for (final KafkaStream stream : streamList) {
ConsumerIterator consumerIte = stream.iterator();
while (consumerIte.hasNext()) {
// String msg = new String(consumerIte.next().message());
try {
outToServer.writeBytes(JsonParserUtility.parseRawJson(new String(consumerIte.next().message())));
consumer.commitOffsets(); //this line wanted to test.
} catch (JsonMappingException e) {
log.info("Error Message:: " + e);
} catch (NullPointerException e) {
log.info("NullPointerError:: " + e);
} catch (IOException e) {
log.info("Error with Graphite Connectivity:: " + e);
}
clientSocket.close();
if (consumer != null)
consumer.shutdown();
}
}
}

public static void main(String[] args) throws Exception {
Socket clientSocket = new Socket(
GraphiteConsumerConstants.graphiteHost,
GraphiteConsumerConstants.graphitePort);
GraphiteConsumer simpleConsumer = new GraphiteConsumer();
Properties props = new Properties();
props.load(new FileInputStream("properties/graphiteConsumer.properties"));
ConsumerConfig consumerconfig = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(consumerconfig);
simpleConsumer.fetchMetricDataFromKafka(clientSocket, consumer);

}
}

man9ar00 said...

Do you have any thoughts on how to system (integration) test a system that is kafka-based, particularly where for the time being one has to validate data coming off kafka via a consumer and feed test data in via a producer, but in live system under test, the flow is more asynchronous, with multiple brokers, zookeepers, producers, consumers, and partitions, and live data going through system in addition to the test data, so to properly test the traditional way, one has to manage the synchronization and filtering such that sending X data in, can verify the proper Y data out, perhaps with offsets, or not. It's so much easier in a simplified test environment of single broker, zookeeper, single partition, minimal producer, consumers, where it's more synchronous and output is queued until consumed or there's no other live data to filter out since all data in and out is the test data. Unit testing kafka-based system sounds easier than a kafka-based system test.

Pavan Sannadi said...

thanx man9ar00 for your response. Do u have any sampe code or links which does the same??? if yes, then it would be more easier to a newbie like me to understand..

man9ar00 said...

Pavan, for the time being, I have no sample code that's available to the public to share. Such systems are generally organization specific. One would have to design a test app/system to demonstrate how to test it, which I don't think any exists yet. I'll post an update when I find something or create something to share.

Pavan Sannadi said...

thanks man9ar00 ... will try myself and will be looking for your approach as well..

Unknown said...

Hi, I would like to know if the KafkaUnit works with the latest kafka edition of 0.9.0.0.
Because When I use Kafka 2.11 version 0.9.0.0, it is throwing an error
kafka.server.KafkaConfig.(Ljava/util/Properties;)V
java.lang.NoSuchMethodError: kafka.server.KafkaConfig.(Ljava/util/Properties;)V
at info.batey.kafka.unit.KafkaUnit.startup(KafkaUnit.java:84)

The same code works fine for 0.8.2.2 kafka version.

Jack Dowson said...
This comment has been removed by the author.
JacobHarman said...
This comment has been removed by the author.