Skip to main content

Celonis Product Documentation

Kafka connection examples
AVRO nput

Here is an example of uploading the data from a Kafka topic with the message value stored as Avro. The key converter is set to StringConverter, but the key information is not being used.

The connector upload rules are

  • 10MB file or

  • 100000 records or

  • 30 seconds since the last write to cater for no more records available for the time being

Also, it flushes the parquet file every 1000 records. It’s when the flush happens that the file size check is updated.

name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false
JSON Input

Here is an example of uploading the data from a Kafka topic with the message value stored as JSON. The key converter is set to StringConverter, but the key information is not being used.

The connector upload rules are

  • 10MB file or

  • 100000 records or

  • 30 seconds since the last write to cater for no more records available for the time being

Also, it flushes the parquet file every 1000 records. It’s when the flush happens that the file size check is updated.

name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false
Primary key(s)

Specifies a set of fields from the incoming payload, which should be used as Primary Keys in Celonis. If this is not provided, then all the fields are used.

// single field
...
connect.ems.data.primary.key=customer_id
...

// Composite PK
...
connect.ems.data.primary.key=name,address
...

Please refer to the primary keys documentation to learn about the best practices.

Overwrite the order field when using primary key(s)

If your data already contains an ordered field, use it since it will improve the performance and leads to less disk space required in Celonis Platform.

Here is an example configuration when a field timestamp guarantees that two records with the same PK won't share the same value:

...
connect.ems.data.primary.key=customer_id
connect.ems.order.field.name=processed_ts
...

// Composite PK
...
connect.ems.data.primary.key=name,address
connect.ems.order.field.name=processed_ts
...

Please refer to the primary keys documentation to learn about the best practices.

Fix obfuscation

All the obfuscated fields are uploaded to Celonis Platform as *****. In this example _the creditcard and ssn fields are obfuscated.

...
name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
connect.ems.obfuscation.method="fix"
connect.ems.obfuscation.fields="credit_card, ssn"
...
SHA1 obfuscation

All the fields are encrypted with SHA1, and the result is converted to a hex string. For example the text "this is a test" will end up translated to "9938a75e6d10a74d6b2e9bc204177de5b95f28fe".

In this example _the creditcard and ssn fields are obfuscated.

...
connect.ems.obfuscation.method="sha1"
connect.ems.obfuscation.fields="credit_card, ssn"
...
SHA512 obfusction

All the fields are encrypted with SHA512, and the result is converted to a hex string. In this example _the creditcard and ssn fields are obfuscated.

...
connect.ems.obfuscation.method="sha512"
connect.ems.obfusation.sha512.salt="customerdefined salt"
connect.ems.obfuscation.fields="credit_card, ssn"
...