Apply Avro defaults when writing to Confluent Kafka

AdamRink
New Contributor III

I have an avro schema for my Kafka topic. In that schema it has defaults. I would like to exclude the defaulted columns from databricks and just let them default as an empty array.

Sample avro, trying to not provide the UserFields because I can't store an empty array in Delta (Parquet) in databricks and just let it default in as empty.

{
            "name": "UserFields",
			"type": {
					"type": "array",
					"items": {
							"name" : "ud_record",
							"type" : "record",
							"fields": [{
								"name": "userFieldName",
								"type":  "string"
								},{
								"name": "userFieldValue",
								"type": ["null", "string"]
							}]
						},
					},
			"default": []
		}

XiaoJun
New Contributor II

hi Kanzi this is no working let me know any other solu

df

          .writeStream.format("kafka")

          .option("kafka.bootstrap.servers", kafka_server_url)

          .option("kafka.security.protocol","SSL")

          .option("kafka.ssl.keystore.location",kafka_ssl_keystore_location)

          .option("kafka.ssl.truststore.location",kafka_ssl_truststore_location)

          .option("kafka.ssl.truststore.password",kafka_ssl_truststore_password)

          .option("kafka.ssl.keystore.password",kafka_ssl_keystore_password)

          .option("kafka.ssl.key.password",kafka_ssl_key_password)

          .option("security.protocol","SSL")

          .option("schema.registry.ssl.keystore.location","/dbfs/FileStore/kafka-ssl/connector01_worker_keystore.jks")

          .option("schema.registry.ssl.truststore.location","/dbfs/FileStore/kafka-ssl/connector01_truststore.jks")

          .option("schema.registry.ssl.truststore.password","xx")

          .option("schema.registry.ssl.keystore.password","xx")

          .option("schema.registry.ssl.key.password","xx")

          .option("value.converter","io.confluent.connect.avro.AvroConverter")

          .option("value.converter.schema.registry.url",schemaRegistryAddr)

     

          // .option("ssl.client.auth","true")

            // .option("ssl.enabled.protocols","TLSv1.3,TLSv1.2,TLSv1.1,TLSv1")

         

          .option("sasl.mechanism","PLAIN")

         .option("schema.registry.url",schemaRegistryAddr)

             .option("kafka.ssl.endpoint.identification.algorithm","https")

          .option("topic", targetTopic)

          .option("partitions", 3)

          .outputMode("append")

          .option("checkpointLocation", "/dbfs/test01/xx")

          .start()

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

rumtime 9.1.x 3.1.2

thx