Invalid field schema option provided-DatabricksDeltaLakeSinkConnector
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-24-2022 12:01 PM
I have configured a Delta Lake Sink connector which reads from an AVRO topic and writes to the Delta lake . I have followed the docs and my config looks like below .
{
"name": "dev_test_delta_connector",
"config": {
"topics": "dl_test_avro",
"input.data.format": "AVRO",
"connector.class": "io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkConnector",
"name": "dev_test_delta_connector",
"kafka.auth.mode": "SERVICE_ACCOUNT",
"kafka.service.account.id":"****",
"delta.lake.host.name": "******",
"delta.lake.http.path": "*********",
"delta.lake.database":"dl_test_db",
"delta.lake.token": "*********",
"delta.lake.table.auto.create": "true",
"delta.lake.table.format":"kafka_${topic}",
"staging.bucket.name": "dl-test-bucket",
"s3.region":"eu-west-2",
"staging.s3.access.key.id":"*************",
"staging.s3.secret.access.key":"**********",
"confluent.topic.bootstrap.servers":"****************",
"flush.interval.ms": "100",
"tasks.max": "1"
}
}
I am expecting the connector to automatically create the table , but it errors out as :
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Invalid field schema option provided\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.convertFieldSchemaToTableSchema(DatabricksDeltaLakeSinkTask.java:368)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.autoCreateTable(DatabricksDeltaLakeSinkTask.java:309)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.recordsToPutToS3(DatabricksDeltaLakeSinkTask.java:146)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.put(DatabricksDeltaLakeSinkTask.java:98)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\n"
Any help on this would be appreciated , thanks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-24-2022 12:10 PM
I understand that config is on confluent side (Confluent Databricks AWS Delta Lake Sink Connector).
Common issue is S3 problem (roles). Can you check that anything is saved in S3 staging bucket?
You can also contact confluent support.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-24-2022 12:13 PM
Generally you need to be sure that confluent can connect to S3/Databricks resources (ip/roles) and this message from confluent is not much helpful 🙂
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-24-2022 12:23 PM
@Hubert Dudek , Should I be configuring anything with respect to schema in the connector config ?
Because I did successfully stage some data from another topic of a different format(JSON_SR) into delta lake table , but its with AVRO topic that I get this error.

