Invalid field schema option provided-DatabricksDeltaLakeSinkConnector

Bency
New Contributor III

I have configured a Delta Lake Sink connector which reads from an AVRO topic and writes to the Delta lake . I have followed the docs and my config looks like below .

 {

 "name": "dev_test_delta_connector",

 "config": {

  "topics": "dl_test_avro",

  "input.data.format": "AVRO",

  "connector.class": "io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkConnector",

  "name": "dev_test_delta_connector",

  "kafka.auth.mode": "SERVICE_ACCOUNT",

  "kafka.service.account.id":"****",

  "delta.lake.host.name": "******",

  "delta.lake.http.path": "*********",

"delta.lake.database":"dl_test_db",

"delta.lake.token": "*********",

  "delta.lake.table.auto.create": "true",

"delta.lake.table.format":"kafka_${topic}",

  "staging.bucket.name": "dl-test-bucket", 

"s3.region":"eu-west-2",

"staging.s3.access.key.id":"*************",

"staging.s3.secret.access.key":"**********",

"confluent.topic.bootstrap.servers":"****************",

  "flush.interval.ms": "100",

  "tasks.max": "1"

 }

}

I am expecting the connector to automatically create the table , but it errors out as :

   "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Invalid field schema option provided\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.convertFieldSchemaToTableSchema(DatabricksDeltaLakeSinkTask.java:368)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.autoCreateTable(DatabricksDeltaLakeSinkTask.java:309)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.recordsToPutToS3(DatabricksDeltaLakeSinkTask.java:146)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.put(DatabricksDeltaLakeSinkTask.java:98)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\n"

Any help on this would be appreciated , thanks