See the library in action here!
The section below gives an overview on how to use the Kafka CDI library!
In a Maven managed project simply the following to your pom.xml
:
...
<dependency>
<groupId>org.aerogear.kafka</groupId>
<artifactId>kafka-cdi-extension</artifactId>
<version>0.1.2</version>
</dependency>
...
The @Producer
annotation is used to configure and inject an instance of the SimpleKafkaProducer
class, which is a simple extension of the original KafkaProducer
class:
...
public class MyPublisherService {
private Logger logger = LoggerFactory.getLogger(MyPublisherService.class);
@Producer
SimpleKafkaProducer<Integer, String> producer;
/**
* A simple service method, that sends payload over the wire
*/
public void hello() {
producer.send("myTopic", "My Message");
}
}
The @Consumer
annotation is used to configure and declare an annotated method as a callback for the internal DelegationKafkaConsumer
, which internally uses the vanilla KafkaConsumer
:
public class MyListenerService {
private Logger logger = LoggerFactory.getLogger(MyListenerService.class);
/**
* Simple listener that receives messages from the Kafka broker
*/
@Consumer(topics = "myTopic", groupId = "myGroupID")
public void receiver(final String message) {
logger.info("That's what I got: " + message);
}
}
Receiving the key and the value is also possible:
public class MyListenerService {
private Logger logger = LoggerFactory.getLogger(MyListenerService.class);
/**
* Simple listener that receives messages from the Kafka broker
*/
@Consumer(topics = "myTopic", groupId = "myGroupID")
public void receiver(final String key, final String value) {
logger.info("That's what I got: (key: " + key + " , value:" + value + ")");
}
}
With the @KafkaStream
annotation the libary supports the Kafka Streams API:
@KafkaStream(input = "push_messages_metrics", output = "successMessagesPerJob2")
public KTable<String, Long> successMessagesPerJobTransformer(final KStream<String, String> source) {
final KTable<String, Long> successCountsPerJob = source.filter((key, value) -> value.equals("Success"))
.groupByKey()
.count("successMessagesPerJob");
return successCountsPerJob;
}
The method accepts a KStream
instance from the input
topic, inside the method body some (simple) stream processing can be done, and as return value we support either KStream
or KTable
. The entire setup is handled by the library itself.
A minimal of configuration is currently needed. For that there is a @KafkaConfig
annotation. The first occurrence is used:
@KafkaConfig(bootstrapServers = "#{KAFKA_SERVICE_HOST}:#{KAFKA_SERVICE_PORT}")
public class MyService {
...
}
Apache Kafka uses a binary message format, and comes with a handful of handy Serializers and Deserializers, available through the Serdes
class. The Kafka CDI extension adds a Serde for the JsonObject
:
To send serialize a JsonObject, simply specify the type, like:
...
@Producer
SimpleKafkaProducer<Integer, JsonObject> producer;
...
producer.send("myTopic", myJsonObj);
For deserialization the argument on the annotation @Consumer
method is used to setup the actual Deserializer
@Consumer(topic = "myTopic", groupId = "myGroupID", keyType = Integer.class)
public void receiveJsonObject(JsonObject message) {
logger.info("That's what I got: " + message);
}
To setup Apache Kafka there are different ways to get started. This section quickly discusses pure Docker and Openshift.
Starting a Zookeeper cluster:
docker run -d --name zookeeper jplock/zookeeper:3.4.6
Next, we need to start Kafka and link the Zookeeper Linux container to it:
docker run -d --name kafka --link zookeeper:zookeeper ches/kafka
Now, that the broker is running, we need to figure out the IP address of it:
docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafka
We use this IP address when inside our @KafkaConfig
annotation that our Producers and Consumers can speak to Apache Kafka.
For Apache Kafka on Openshift please check this repository: