Skip to main content

Celonis Product Documentation

Celonis Kafka Connector FAQ
I have data in the message key. How can I make sure that data is uploaded?

Kafka Connect offers another set of plugins called Single Message Transform, which allows users to change the data shape by adding or removing fields or moving them between the key and the value.

Information and examples can be found here:

I want to replay my data

At times, there's a need to re-process the data from the Kafka topic(s). The connector uses a Kafka consumer group, which means replaying the data requires changing the consumer group offsets to the ones required. To do so, follow these steps.

  1. Stop the connector

  2. The connector instance name is used as part of the consumer group name. For example, if my connector instance name is my_ems_sink, then the consumer group name is connect-my_ems_sink

  3. Reset the consumer group offsets.

  4. Restart the connector

//reset the offset on topic foo and partition 0 to the value 100
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets --topic foo:0 --to-offset 100 --execute

//reset all the connector topics to earliest 
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets --to-earliest 
--topic foo --execute

//reset to a specific point in time
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets
--to-datetime 2022-06-01T00:00:00Z --topic foo --execute

// move back the current offset by 10
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets --shift-by -10 
--topic foo --execute
I want to sink more than one topic into Celonis Platform

Currently, one connector instance accepts only one output Celonis Platform storage (see configuration connect.ems.target.table). The solution is to create a connector instance for each required topic.

// first connector instance configuration
name=ems-first_topic
topics=first_topic
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=first_topic
...


// second connector instance configuration
name=ems-second_topic
topics=second_topic
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=second_topic
...