cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Apply Avro defaults when writing to Confluent Kafka

AdamRink
New Contributor III

I have an avro schema for my Kafka topic. In that schema it has defaults. I would like to exclude the defaulted columns from databricks and just let them default as an empty array.

Sample avro, trying to not provide the UserFields because I can't store an empty array in Delta (Parquet) in databricks and just let it default in as empty.

{
            "name": "UserFields",
			"type": {
					"type": "array",
					"items": {
							"name" : "ud_record",
							"type" : "record",
							"fields": [{
								"name": "userFieldName",
								"type":  "string"
								},{
								"name": "userFieldValue",
								"type": ["null", "string"]
							}]
						},
					},
			"default": []
		}

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager
3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @Adam Rink​ ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

Kaniz
Community Manager
Community Manager

Hi @Adam Rink​ , Please go through the following blog. Let me know if it helps.

XiaoJun
New Contributor II

hi Kanzi this is no working let me know any other solu

df

          .writeStream.format("kafka")

          .option("kafka.bootstrap.servers", kafka_server_url)

          .option("kafka.security.protocol","SSL")

          .option("kafka.ssl.keystore.location",kafka_ssl_keystore_location)

          .option("kafka.ssl.truststore.location",kafka_ssl_truststore_location)

          .option("kafka.ssl.truststore.password",kafka_ssl_truststore_password)

          .option("kafka.ssl.keystore.password",kafka_ssl_keystore_password)

          .option("kafka.ssl.key.password",kafka_ssl_key_password)

          .option("security.protocol","SSL")

          .option("schema.registry.ssl.keystore.location","/dbfs/FileStore/kafka-ssl/connector01_worker_keystore.jks")

          .option("schema.registry.ssl.truststore.location","/dbfs/FileStore/kafka-ssl/connector01_truststore.jks")

          .option("schema.registry.ssl.truststore.password","xx")

          .option("schema.registry.ssl.keystore.password","xx")

          .option("schema.registry.ssl.key.password","xx")

          .option("value.converter","io.confluent.connect.avro.AvroConverter")

          .option("value.converter.schema.registry.url",schemaRegistryAddr)

     

          // .option("ssl.client.auth","true")

            // .option("ssl.enabled.protocols","TLSv1.3,TLSv1.2,TLSv1.1,TLSv1")

         

          .option("sasl.mechanism","PLAIN")

         .option("schema.registry.url",schemaRegistryAddr)

             .option("kafka.ssl.endpoint.identification.algorithm","https")

          .option("topic", targetTopic)

          .option("partitions", 3)

          .outputMode("append")

          .option("checkpointLocation", "/dbfs/test01/xx")

          .start()

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

rumtime 9.1.x 3.1.2

thx

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.