Skip to main content

Celonis Product Documentation

Data Push API - Python Example

Python Example

# Test script for data job push API. 
# Uploads a given parquet file. 
#
# Parameter:
# - Parquet filename in the current directory
#
#
# Before usage:
# Update app_key or api_key, pool_id, targetName and eventually base_url to your needs.
#


import datetime
import sys
import uuid

import requests

# Adopt to your needs
app_key = "YOUR APP KEY" # You can create this under the team setting page and application key
api_key = "YOUR API KEY" # You can create this under user setting by going to edit profile and api keys section
pool_id = "YOUR POOL ID"
targetName = "TABLE_PARQUET"


base_url = "YOUR TEAM CELONIS DOMAIN" 
connection_id ="YOUR CONNECTION ID" # this is optional you can also push on data pool global schema

client_id = str(uuid.uuid4())
file_name = sys.argv[1]

url = base_url + "/integration/api/v1/data-push/" + pool_id + "/jobs/"

print('file_name', file_name)


body={'type': 'DELTA', 'fileType': 'PARQUET', 'targetName': targetName, 'dataPoolId': pool_id}
# This is for authenticating with team based App Key
headers = {'Authorization': 'AppKey ' + app_key, 'Body-Type': 'application/json'} 
# Uncomment the line below if user based apiKey is used.
# headers = {'Authorization': 'Bearer ' + api_key, 'Body-Type': 'application/json'} 
response = requests.post(url=url ,json=body, headers=headers)
if response.status_code != 200:
    print("Received non 200 code created data push job", response.status_code)
   
data_push_job = response.json()
print(str(datetime.datetime.now()) + ": Data push job " +response.text + " successful created ")



push_chunk_url = url + data_push_job['id'] + "/chunks/upserted"
print(push_chunk_url);
file = {'file': open(file_name, 'rb')}
# This is for authenticating with team based App Key
headers = {'Authorization': 'AppKey ' + app_key, 'Body-Type': 'multipart/form-data'}
# Uncomment the line below if user based apiKey is used.
# headers = {'Authorization': 'Bearer ' + api_key, 'Body-Type': 'application/json'}
response = requests.post(url=push_chunk_url ,files=file, headers=headers)

if response.status_code != 200:
    print("Received non 200 code on pushing chunk", response.status_code)
   

print(str(datetime.datetime.now()) + ": Data push job chunk" + response.text + " successful created ")


# This is for authenticating with team based App Key
headers = {'Authorization': 'AppKey ' + app_key, 'Body-Type': 'application/json'}
# Uncomment the line below if user based apiKey is used.
# headers = {'Authorization': 'Bearer ' + api_key, 'Body-Type': 'application/json'}
url =url+data_push_job['id']
response = requests.post(url=url ,json={}, headers=headers)
if response.status_code != 200:
    print("Received non 200 code created data push job", response.status_code)

print(str(datetime.datetime.now()) + ": Successful data push job ", response.text)

#data_push_job {'file': open(file_name, 'rb')}