I have configured a Delta Lake Sink connector which reads from an AVRO topic and writes to the Delta lake . I have followed the docs and my config looks like below .
{
"name": "dev_test_delta_connector",
"config": {
"topics": "dl_test_avro",
"input.data.format": "AVRO",
"connector.class": "io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkConnector",
"name": "dev_test_delta_connector",
"kafka.auth.mode": "SERVICE_ACCOUNT",
"kafka.service.account.id":"****",
"delta.lake.host.name": "******",
"delta.lake.http.path": "*********",
"delta.lake.database":"dl_test_db",
"delta.lake.token": "*********",
"delta.lake.table.auto.create": "true",
"delta.lake.table.format":"kafka_${topic}",
"staging.bucket.name": "dl-test-bucket",
"s3.region":"eu-west-2",
"staging.s3.access.key.id":"*************",
"staging.s3.secret.access.key":"**********",
"confluent.topic.bootstrap.servers":"****************",
"flush.interval.ms": "100",
"tasks.max": "1"
}
}
I am expecting the connector to automatically create the table , but it errors out as :
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Invalid field schema option provided\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.convertFieldSchemaToTableSchema(DatabricksDeltaLakeSinkTask.java:368)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.autoCreateTable(DatabricksDeltaLakeSinkTask.java:309)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.recordsToPutToS3(DatabricksDeltaLakeSinkTask.java:146)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.put(DatabricksDeltaLakeSinkTask.java:98)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\n"
Any help on this would be appreciated , thanks