Apply Avro defaults when writing to Confluent Kafka
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-23-2022 09:45 AM
I have an avro schema for my Kafka topic. In that schema it has defaults. I would like to exclude the defaulted columns from databricks and just let them default as an empty array.
Sample avro, trying to not provide the UserFields because I can't store an empty array in Delta (Parquet) in databricks and just let it default in as empty.
{
"name": "UserFields",
"type": {
"type": "array",
"items": {
"name" : "ud_record",
"type" : "record",
"fields": [{
"name": "userFieldName",
"type": "string"
},{
"name": "userFieldValue",
"type": ["null", "string"]
}]
},
},
"default": []
}
- Labels:
-
Avro
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-20-2023 01:06 AM
hi Kanzi this is no working let me know any other solu
df
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", kafka_server_url)
.option("kafka.security.protocol","SSL")
.option("kafka.ssl.keystore.location",kafka_ssl_keystore_location)
.option("kafka.ssl.truststore.location",kafka_ssl_truststore_location)
.option("kafka.ssl.truststore.password",kafka_ssl_truststore_password)
.option("kafka.ssl.keystore.password",kafka_ssl_keystore_password)
.option("kafka.ssl.key.password",kafka_ssl_key_password)
.option("security.protocol","SSL")
.option("schema.registry.ssl.keystore.location","/dbfs/FileStore/kafka-ssl/connector01_worker_keystore.jks")
.option("schema.registry.ssl.truststore.location","/dbfs/FileStore/kafka-ssl/connector01_truststore.jks")
.option("schema.registry.ssl.truststore.password","xx")
.option("schema.registry.ssl.keystore.password","xx")
.option("schema.registry.ssl.key.password","xx")
.option("value.converter","io.confluent.connect.avro.AvroConverter")
.option("value.converter.schema.registry.url",schemaRegistryAddr)
// .option("ssl.client.auth","true")
// .option("ssl.enabled.protocols","TLSv1.3,TLSv1.2,TLSv1.1,TLSv1")
.option("sasl.mechanism","PLAIN")
.option("schema.registry.url",schemaRegistryAddr)
.option("kafka.ssl.endpoint.identification.algorithm","https")
.option("topic", targetTopic)
.option("partitions", 3)
.outputMode("append")
.option("checkpointLocation", "/dbfs/test01/xx")
.start()
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
rumtime 9.1.x 3.1.2
thx