Using the Celonis Kafka connector
The Celonis Kafka connector is a sink plugin, designed to be solely executed through the Kafka Connect runtime. The connector's purpose is to continuously read records from one or multiple source topics, transform them into a canonical format, and then load it into the Celonis Platform via the Standard Data Ingestion API.
Refer to the official Kafka Connect documentation for instructions on how to install Kafka Connect plugins.
The following diagram provides a high level outline of the Celonis Kafka connector workflow:
To set up the Celonis Kafka connector, follow the steps provided below.
If you require additional configuations for your use case, refer to: Additional Kafka connection configuration details.
From your data pool diagram, click Data Connections - Add Data Connection.
Select Push data into Celonis.
Define the configuration for your data ingestion application using the fields below. This information will be used to map incoming data into Celonis target tables.
Start by selecting whether you are using a flat or nested data structure:
Flat data structure: Select this option when the Kafka topic your consuming already contains data in a flat format.
If you proceed with the flat data structure, you then need to define:
Primary Key: This is used to properly handle Delta Loads and is similar to defining a primary key in a regular extraction.
Age Columns: This column is used to guarantee that the most recent version of a record (identified by its primary key) is stored in Celonis. Usually this is a date column and if a record with a value smaller than the existing value gets inserted, the update won’t be applied. However, values with equal or larger values will be applied accordingly.
For a case where you are consuming data from one Kafka topic called order, which has order_id as primary key and a timestamp reflecting when the record has been created (created_at), your final configuration would look like this:
Nested data structure: Select this option when the Kafka topic you're consuming contains data in nested format such as JSON.
In this case, you need to configure the table schema so we can properly construct the nested tables and retrieve the appropriate primary keys. To do so, click Options - Configure Table Schema:
To define the schema, we suggest using one exemplary record from the Kafka topic and copy paste it into the window on the left side.
In this example, we are using an order with information such as id, date, price info and one nested table with regards to shipments. Once the example record gets pasted into the left side of the window, we automatically derive the target table structure with the corresponding column names on the right side of the window:
By default, we add a column called _celonis_id (indicated by a tooltip) in case no primary key has been configured. However, in this example we define:
The order_id to be the primary key by selecting the relevant checkbox. Once selected, two things will happen automatically:
The autogenerated _celonis_id will disappear from the configuration.
The order_id gets created as a foreign key on the child table so users can join these two tables later on.
The order_date column as the age column.
As a result, the configuration looks like this:
If you click Finish, the Primary Key and Age column defined as part of the schema configuration will automatically be applied.
Save the configuration and note down the Access Key and Access Secret (you will need them once more later). Those are the credentials that your Kafka Connector will use to authenticate to Celonis when pushing data.
Retrieve the Kafka Connector configuration from the UI and insert the Access Key and Secret you’ve saved as part of step 4.
Use this configuration in Kafka Connect to start your Kafka Connector according to the tables, primary keys etc. that have been defined as part of this Data Connection. The configuration contains all relevant fields including authentication so that you don't have to specify anything else. In case you change anything in the Data Connection (e.g. add a new table), only the configuration gets updated within Celonis.
To make such a change effective, you need to replace the old with the new configuration file in Kafka Connect and restart your Kafka Connector.
Important
The statement that no additional configuration has to be added is not 100% correct as of today, but the following three properties with respect to the commit policy of the Connector need to be added:
connect.ems.commit.interval.ms=600000 connect.ems.commit.record s=10000 connect.ems.commit.size.bytes=25000000
Download the latest version of the Kafka Connector from the Download Portal. This is listed as Kafka EMS sink connector.
To access the Download Portal, click Admin & Settings - Download Portal.
Start the connector with the configuration retrieved in step 5.
Continue to use the Standard Data Ingestion API.
Using Kafka from a managed service provider
If you are using Kafka or Kafka Connect from a managed service provider, follow the instructions from your service:
AWS MSK Connect: To create the connector, follow the steps provided here. The steps involved will require installing a custom connector, and for that, follow this link and use the connector release jar.
Confluent: Connector installation will be performed using Confluent Hub. Follow the instructions here to enable the Kafka Connect sink.
Azure event hub: If you're running Event Hub, you can leverage Kafka Connect and the Celonis Platform Sink plugin to load data into Celonis Platform. The instructions to enable Kafka Connect for Event Hub can be found here. Once the installation is done, follow the manual steps to enable the connector before creating an instance of it.
Reading unstructured JSON data
The connector ships with a special serializer that dynamically infers a schema based on the structure of the incoming JSON records and converts them in the canonical representation used internally by the connector.
value.converter = cloud.celonis.kafka.connect.json.SchemaInferringJsonConverter
Reading data in AVRO format
If your data is in AVRO format, or another format supported by Confluent (i.e. Protobuf, Thrift), the value.converter key needs to be set to the appropriate Confluent converter and point to a reachable Schema Registry instance.
value.converter = io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url = http://<SCHEMA-REGISTRY-HOST>:<PORT>
Reading from multiple source topics
A single instance of the EMS sink connector can read from multiple source topics, provided that the data they hold is in the same format (e.g. all AVRO).
tasks.max = 3 topics=sourceA,sourceB,sourceC # or alternatively topics.regex=source.*
By default, the connector will persist records for each source topic in a Celonis Platform table with the same name.
Tip
When reading data from multiple topics, or from a simple topic that has multiple partitions, you may improve throughput by setting tasks.max with a value greater than one, thus allowing the Kafka Connect runtime to allocate up to N tasks to the connector instance.
Ensuring partition affinity
By default, the Kafka producer determines the target partition based on the message Key raw bytes values. To compute the target partition, it hashes the value using Murmur3, and then it does a modulo-Number of partitions. Therefore to ensure the records with the same primary key end up always in the same partition, the Kafka message Key should contain only the primary key(s) values. Note that the connector does not consider the Key fields. It's the user's responsibility to add the primary key(s) to the record Value, either at produce time (duplicating data between the Key and the Value) or at the connector level by using Kafka Connect Single Message Transform which will transfer fields from the Key into the Value (a reverse of this).