cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Apply Avro defaults when writing to Confluent Kafka

AdamRink
New Contributor III

I have an avro schema for my Kafka topic. In that schema it has defaults. I would like to exclude the defaulted columns from databricks and just let them default as an empty array.

Sample avro, trying to not provide the UserFields because I can't store an empty array in Delta (Parquet) in databricks and just let it default in as empty.

{
            "name": "UserFields",
			"type": {
					"type": "array",
					"items": {
							"name" : "ud_record",
							"type" : "record",
							"fields": [{
								"name": "userFieldName",
								"type":  "string"
								},{
								"name": "userFieldValue",
								"type": ["null", "string"]
							}]
						},
					},
			"default": []
		}

1 REPLY 1

XiaoJun
New Contributor II

hi Kanzi this is no working let me know any other solu

df

          .writeStream.format("kafka")

          .option("kafka.bootstrap.servers", kafka_server_url)

          .option("kafka.security.protocol","SSL")

          .option("kafka.ssl.keystore.location",kafka_ssl_keystore_location)

          .option("kafka.ssl.truststore.location",kafka_ssl_truststore_location)

          .option("kafka.ssl.truststore.password",kafka_ssl_truststore_password)

          .option("kafka.ssl.keystore.password",kafka_ssl_keystore_password)

          .option("kafka.ssl.key.password",kafka_ssl_key_password)

          .option("security.protocol","SSL")

          .option("schema.registry.ssl.keystore.location","/dbfs/FileStore/kafka-ssl/connector01_worker_keystore.jks")

          .option("schema.registry.ssl.truststore.location","/dbfs/FileStore/kafka-ssl/connector01_truststore.jks")

          .option("schema.registry.ssl.truststore.password","xx")

          .option("schema.registry.ssl.keystore.password","xx")

          .option("schema.registry.ssl.key.password","xx")

          .option("value.converter","io.confluent.connect.avro.AvroConverter")

          .option("value.converter.schema.registry.url",schemaRegistryAddr)

     

          // .option("ssl.client.auth","true")

            // .option("ssl.enabled.protocols","TLSv1.3,TLSv1.2,TLSv1.1,TLSv1")

         

          .option("sasl.mechanism","PLAIN")

         .option("schema.registry.url",schemaRegistryAddr)

             .option("kafka.ssl.endpoint.identification.algorithm","https")

          .option("topic", targetTopic)

          .option("partitions", 3)

          .outputMode("append")

          .option("checkpointLocation", "/dbfs/test01/xx")

          .start()

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

rumtime 9.1.x 3.1.2

thx

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group