cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Apply Avro defaults when writing to Confluent Kafka

AdamRink
New Contributor III

I have an avro schema for my Kafka topic. In that schema it has defaults. I would like to exclude the defaulted columns from databricks and just let them default as an empty array.

Sample avro, trying to not provide the UserFields because I can't store an empty array in Delta (Parquet) in databricks and just let it default in as empty.

{
            "name": "UserFields",
			"type": {
					"type": "array",
					"items": {
							"name" : "ud_record",
							"type" : "record",
							"fields": [{
								"name": "userFieldName",
								"type":  "string"
								},{
								"name": "userFieldValue",
								"type": ["null", "string"]
							}]
						},
					},
			"default": []
		}

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager
3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @Adam Rink​ ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

Kaniz
Community Manager
Community Manager

Hi @Adam Rink​ , Please go through the following blog. Let me know if it helps.

XiaoJun
New Contributor II

hi Kanzi this is no working let me know any other solu

df

          .writeStream.format("kafka")

          .option("kafka.bootstrap.servers", kafka_server_url)

          .option("kafka.security.protocol","SSL")

          .option("kafka.ssl.keystore.location",kafka_ssl_keystore_location)

          .option("kafka.ssl.truststore.location",kafka_ssl_truststore_location)

          .option("kafka.ssl.truststore.password",kafka_ssl_truststore_password)

          .option("kafka.ssl.keystore.password",kafka_ssl_keystore_password)

          .option("kafka.ssl.key.password",kafka_ssl_key_password)

          .option("security.protocol","SSL")

          .option("schema.registry.ssl.keystore.location","/dbfs/FileStore/kafka-ssl/connector01_worker_keystore.jks")

          .option("schema.registry.ssl.truststore.location","/dbfs/FileStore/kafka-ssl/connector01_truststore.jks")

          .option("schema.registry.ssl.truststore.password","xx")

          .option("schema.registry.ssl.keystore.password","xx")

          .option("schema.registry.ssl.key.password","xx")

          .option("value.converter","io.confluent.connect.avro.AvroConverter")

          .option("value.converter.schema.registry.url",schemaRegistryAddr)

     

          // .option("ssl.client.auth","true")

            // .option("ssl.enabled.protocols","TLSv1.3,TLSv1.2,TLSv1.1,TLSv1")

         

          .option("sasl.mechanism","PLAIN")

         .option("schema.registry.url",schemaRegistryAddr)

             .option("kafka.ssl.endpoint.identification.algorithm","https")

          .option("topic", targetTopic)

          .option("partitions", 3)

          .outputMode("append")

          .option("checkpointLocation", "/dbfs/test01/xx")

          .start()

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

rumtime 9.1.x 3.1.2

thx