Data Push API - Python Example
Python Example
# Test script for data job push API. # Uploads a given parquet file. # # Parameter: # - Parquet filename in the current directory # # # Before usage: # Update app_key or api_key, pool_id, targetName and eventually base_url to your needs. # import datetime import sys import uuid import requests # Adopt to your needs app_key = "YOUR APP KEY" # You can create this under the team setting page and application key api_key = "YOUR API KEY" # You can create this under user setting by going to edit profile and api keys section pool_id = "YOUR POOL ID" targetName = "TABLE_PARQUET" base_url = "YOUR TEAM CELONIS DOMAIN" connection_id ="YOUR CONNECTION ID" # this is optional you can also push on data pool global schema client_id = str(uuid.uuid4()) file_name = sys.argv[1] url = base_url + "/integration/api/v1/data-push/" + pool_id + "/jobs/" print('file_name', file_name) body={'type': 'DELTA', 'fileType': 'PARQUET', 'targetName': targetName, 'dataPoolId': pool_id} # This is for authenticating with team based App Key headers = {'Authorization': 'AppKey ' + app_key, 'Body-Type': 'application/json'} # Uncomment the line below if user based apiKey is used. # headers = {'Authorization': 'Bearer ' + api_key, 'Body-Type': 'application/json'} response = requests.post(url=url ,json=body, headers=headers) if response.status_code != 200: print("Received non 200 code created data push job", response.status_code) data_push_job = response.json() print(str(datetime.datetime.now()) + ": Data push job " +response.text + " successful created ") push_chunk_url = url + data_push_job['id'] + "/chunks/upserted" print(push_chunk_url); file = {'file': open(file_name, 'rb')} # This is for authenticating with team based App Key headers = {'Authorization': 'AppKey ' + app_key, 'Body-Type': 'multipart/form-data'} # Uncomment the line below if user based apiKey is used. # headers = {'Authorization': 'Bearer ' + api_key, 'Body-Type': 'application/json'} response = requests.post(url=push_chunk_url ,files=file, headers=headers) if response.status_code != 200: print("Received non 200 code on pushing chunk", response.status_code) print(str(datetime.datetime.now()) + ": Data push job chunk" + response.text + " successful created ") # This is for authenticating with team based App Key headers = {'Authorization': 'AppKey ' + app_key, 'Body-Type': 'application/json'} # Uncomment the line below if user based apiKey is used. # headers = {'Authorization': 'Bearer ' + api_key, 'Body-Type': 'application/json'} url =url+data_push_job['id'] response = requests.post(url=url ,json={}, headers=headers) if response.status_code != 200: print("Received non 200 code created data push job", response.status_code) print(str(datetime.datetime.now()) + ": Successful data push job ", response.text) #data_push_job {'file': open(file_name, 'rb')}