Friends of OpenJDK Today

Ingesting Time-Series Events from Kafka into QuestDB

March 28, 2023

Author(s)

If you are working on a project with fast or streaming data, chances are Apache Kafka is already part of your pipeline.

But if you want to analyse your data, you will need to ingest from Kafka into some destination.

For time-series data, QuestDB can be an excellent choice.

For those of you who are not yet familiar with QuestDB, it is an Apache 2.0 licensed database designed for high throughput ingestion and fast SQL queries.

You could write your own application, or use the generic JDBC Kafka connector to consume from Kafka and ingest into QuestDB, but the most convenient option is to use the native QuestDB Kafka Connector.

Let's see how the integration works.

Requirements

Make sure you already have:

  • A Kafka installation
  • A running QuestDB (for example, docker run --add-host=host.docker.internal:host-gateway -p 9000:9000 -p 9009:9009 -p 8812:8812 -p 9003:9003 questdb/questdb:latest)
  • A local JDK installation

Adding the QuestDB Sink Connector to Kafka

The Apache Kafka distribution includes the Kafka Connect framework, but the QuestDB-specific zip file must be downloaded from the QuestDB Kafka connector GH page.

If you prefer it, the connector is also available via the Confluent Hub.

Once downloaded you need to unzip it and move it into your Kafka libs directory. For example:

unzip kafka-questdb-connector-*-bin.zip 
cd kafka-questdb-connector 
cp ./*.jar /path/to/kafka_2.13-2.6.0/libs

Configuration

The connector reads messages from a topic in Kafka, and writes them to a table in QuestDB using the ILP protocol, so the minimum configuration you need is the topic name, QuestDB host and port, and the table name.

There are some extra options you could configure for serialisation and authentication.

A configuration file /path/to/kafka/config/questdb-connector.properties must be created for Kafka Connect in the standalone mode. A basic config file could look like this:

name=questdb-sink 
connector.class=io.questdb.kafka.QuestDBSinkConnector 
host=localhost:9009 
topics=example-topic 
table=example_table 
include.key=false 
value.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter.schemas.enable=false 
key.converter=org.apache.kafka.connect.storage.StringConverter

Start Kafka

Go to the Kafka home directory and start Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

Then start Kafka itself

bin/kafka-server-start.sh config/server.properties

And let's start the QuestDB connector, pointing to the config file we just created

bin/connect-standalone.sh config/connect-standalone.properties config/questdb-connector.properties

Publish some events

We are going to start and interactive Kafka console producer, so you can post some messages into a topic. Just make sure the topic name is the one we have in the configuration file.

bin/kafka-console-producer.sh --topic example-topic --bootstrap-server localhost:9092

When the shell starts, you are ready to send JSON messages. You might have noticed that we didn't define a table structure for our output.

QuestDB will automatically create the table from the first message we insert, and if we add new fields in later messages, the table will be updated automatically.

In any case, even though QuestDB offers some flexibility with automatic schemas, it is not a schemaless database, so all the messages must have a compatible structure.

As an example, paste this JSON as a single line into the Kafka producer shell and hit enter:

{"firstname": "Arthur", "lastname": "Dent", "age": 42}

You can send one or two more messages, maybe add an extra field to one of them just for fun.

Note: If you preferred, you could have created your table beforehand in QuestDB issuing a CREATE TABLE statement

Querying your time-series data

If all went well, your data has already been stored in QuestDB and can be queried using SQL either via a Postgresql-compatible library, or the handy REST API.

Or you can simply go to the QuestDB web console, which runs by default at http://localhost:9000.

Run the following SQL query and you should see as many rows as messages you sent to your Kafka topic.

Make sure the table name matches the one you used in the configuration file.

SELECT * FROM example_table

That's a wrap

As you can see, consuming messages from Kafka and into QuestDB is quite painless using the built-in connector.

For more complete examples, including Change Data Capture from Postgresql into QuestDB using Kafka and Debezium, check out the official repository.

Topics:

Related Articles

View All

Author(s)

Comments (0)

Your email address will not be published. Required fields are marked *

Highlight your code snippets using [code lang="language name"] shortcode. Just insert your code between opening and closing tag: [code lang="java"] code [/code]. Or specify another language.

Save my name, email, and website in this browser for the next time I comment.

Subscribe to foojay updates:

https://foojay.io/feed/
Copied to the clipboard