cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Invalid field schema option provided-DatabricksDeltaLakeSinkConnector

Bency
New Contributor III

I have configured a Delta Lake Sink connector which reads from an AVRO topic and writes to the Delta lake . I have followed the docs and my config looks like below .

 {

 "name": "dev_test_delta_connector",

 "config": {

  "topics": "dl_test_avro",

  "input.data.format": "AVRO",

  "connector.class": "io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkConnector",

  "name": "dev_test_delta_connector",

  "kafka.auth.mode": "SERVICE_ACCOUNT",

  "kafka.service.account.id":"****",

  "delta.lake.host.name": "******",

  "delta.lake.http.path": "*********",

"delta.lake.database":"dl_test_db",

"delta.lake.token": "*********",

  "delta.lake.table.auto.create": "true",

"delta.lake.table.format":"kafka_${topic}",

  "staging.bucket.name": "dl-test-bucket", 

"s3.region":"eu-west-2",

"staging.s3.access.key.id":"*************",

"staging.s3.secret.access.key":"**********",

"confluent.topic.bootstrap.servers":"****************",

  "flush.interval.ms": "100",

  "tasks.max": "1"

 }

}

I am expecting the connector to automatically create the table , but it errors out as :

   "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Invalid field schema option provided\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.convertFieldSchemaToTableSchema(DatabricksDeltaLakeSinkTask.java:368)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.autoCreateTable(DatabricksDeltaLakeSinkTask.java:309)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.recordsToPutToS3(DatabricksDeltaLakeSinkTask.java:146)\n\tat io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.put(DatabricksDeltaLakeSinkTask.java:98)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\n"

Any help on this would be appreciated , thanks

3 REPLIES 3

Hubert-Dudek
Esteemed Contributor III

I understand that config is on confluent side (Confluent Databricks AWS Delta Lake Sink Connector).

Common issue is S3 problem (roles). Can you check that anything is saved in S3 staging bucket?

You can also contact confluent support.

Hubert-Dudek
Esteemed Contributor III

Generally you need to be sure that confluent can connect to S3/Databricks resources (ip/roles) and this message from confluent is not much helpful 🙂

Bency
New Contributor III

@Hubert Dudek​ , Should I be configuring anything with respect to schema in the connector config ?

Because I did successfully stage some data from another topic of a different format(JSON_SR) into delta lake table , but its with AVRO topic that I get this error.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group