Custom Kafka SinkConnector for ElasticSearch

Fernando Karnagi
4 min readDec 5, 2019

--

Just a Sink

Motivation

We have some applications running on Rancher Kubernetes platform and we have configured the Kubernetes containers logs to be shipped to Kafka Topics. Right now, we are going to developed a custom Kafka SinkConnector to parse the log records from Kafka Topics to ElaksticSearch.

First we need to identity which topics and what kind of message content that we have. We inspect some messages in our Topic using Kafka Tool and here is the content.

Topic messages using Kafka Tool

Create SinkConnector Maven Project

SinkConnector in Kafka (powered by Confluent) is developed on Java. Let’s start a new Maven project for this, by invoking this command.

mvn archetype:generate -DgroupId=sg.com.cyder.kafka.log.elk -DartifactId=cyderkafka-elk-sink -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

Develop Connector and Task

Let’s complete the project. The full source code can be found here https://github.com/CyderSG/kafka-elk-sink.

Let’s take a look at the some key classes.

ElkSinkConnector

This class (https://github.com/CyderSG/kafka-elk-sink/blob/master/src/main/java/sg/com/cyder/kafka/log/elk/sink/ElkSinkConnector.java) is the main entry for the SinkConnector. It defines the Task Definition and refers to the ElkSinkTask class as the Class to execute the task.

ElkSinkTask

This class (https://github.com/CyderSG/kafka-elk-sink/blob/master/src/main/java/sg/com/cyder/kafka/log/elk/sink/ElkSinkTask.java) does the main heave-lifting job.

Here is the main method, put(). This method basically get the list of records from Kafka broker (well, this SinkConnector acts as Kafka Consumer too). After getting the records from Kafka broker, it sends to Elastics Search service.

public void put(Collection<SinkRecord> collection) {
try {
log.debug("Put [" + collection + "]");
Collection<String> recordsAsString = collection.stream().map(r -> String.valueOf(r.value()))
.collect(Collectors.toList());
log.debug("Records [" + recordsAsString + "]");
elasticService.process(recordsAsString);
} catch (Exception e) {
log.error("Error while processing records");
log.error(e.toString());
}
}

ElkServiceImpl

Let’s take a look at this class (https://raw.githubusercontent.com/CyderSG/kafka-elk-sink/master/src/main/java/sg/com/cyder/kafka/log/elk/sink/elkservice/ElkServiceImpl.java). The process() method is responsible for processing each message (assumed in JSON format), filtering them, and sending them to the ELK server.

public void process(Collection<String> recordsAsString) {
List<Record> recordList = new ArrayList<>();

recordsAsString.forEach(recordStr -> {
try {

log.debug("Verifying record [" + recordStr + "]");
if (recordStr.matches(".*\\[.*\\].*")) {
log.debug("Processing record [" + recordStr + "]");
JsonObject recordAsJson = gson.fromJson(recordStr, JsonObject.class);
String behavior = Constants.insertedFlagValue;
recordList.add(new Record(recordAsJson, behavior));
}

} catch (JsonSyntaxException e) {
log.error("Cannot deserialize json string, which is : " + recordStr);
} catch (Exception e) {
log.error("CYDER: Cannot process data, which is : " + recordStr + ", with error [" + e.getMessage()
+ "]");
}
});

try {
elasticClient.bulkSend(recordList, indexName, typeName);
} catch (Exception e) {
log.error("Something failed, here is the error:");
log.error(e.toString());
}
}

Deploy the Connector

Development is done, code is ready. Now, we need to deploy the code to CP / Kafka Connect. We assumed that the Kafka Connect server is ready with the running REST API.

So, firstly we package this code as JAR file, and copy the JAR file, including the dependencies into the Connect plugin path.

We use Rancher, and this is our Kafka Connect workload

The plugin path is configured as Workload environment variable

CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components,/plugins

This is the path and content of the plugin folder (viewed in Rancher console)

After copying the JAR files, you need to reload the Workload. And of course, you need to reload the Workload every time there is an update of the JAR file.

Create the Connector in Kafka Connect

JAR files are ready, deployed. The next thing to do is register / create the connector. Use the following CURL command to create one

curl -v -H "Content-Type: application/json" -X POST --data "@elksink.json" http://xxx:8083/connectors

This is the sample of the elksink.json file

{
"name": "cyder-elk-sink-connector",
"config": {
"connector.class": "sg.com.cyder.kafka.log.elk.sink.ElkSinkConnector",
"tasks.max": "1",
"type.name": "type.name",
"elastic.url": "xxx",
"elastic.port": 9200,
"index.name": "cyderdev-log",
"flag.field": "status",
"data.array": "dataList",
"topics": "djarvis-dev-log",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}

After creating it, check the connector is added by invoking this CURL

curl -v -X GET http://xxx:8083/connectors

Check in Kibana

Well, assumed everything is fine, finally we can see the search index in Kibana ELK.

References

--

--

Fernando Karnagi
Fernando Karnagi

Written by Fernando Karnagi

DevSecOps and Cloud Apps Architect and Tech Enthusiast. Follow me in https://www.linkedin.com/in/fernandokarnagi/

No responses yet