Hello,
I ran a spark stream job to ingest data from kafka to test Trigger.AvailableNow.
What's environment the job run ?
1: Databricks runtime 10.3
2: Azure cloud
3: 1 Driver node + 3 work nodes( 14GB, 4core)
val maxOffsetsPerTrigger = "500"
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "auto")
...
val rdf = spark
.readStream
.format("kafka")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("kafka.sasl.jaas.config", "<>")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
rdf.writeStream
.format("delta")
.outputMode("append")
.option("mergeSchema", "true")
.option("checkpointLocation", ckpPath)
.trigger(Trigger.AvailableNow)
.start(tabPath)
.awaitTermination()
What I expected to see:
1: The spark stream job can read all data from Kafka and then quit
2: The spark stream will apply maxOffsetsPerTrigger for each micro batch
What I see:
the Kafka topic has four partitions, it takes 5 hours to generate 4 huge data files.
part-00000-89afacf1-f2e6-4904-b313-080d48034859-c000.snappy.parquet
3/25/2022, 9:50:48 PM
Hot (Inferred)
Block blob
14.39 GiB
Available
part-00001-cf932ee2-8535-4dd6-9dab-e94b9292a438-c000.snappy.parquet
3/25/2022, 6:15:36 PM
Hot (Inferred)
Block blob
14.38 GiB
Available
part-00002-7d481793-10dc-4739-8c20-972cb6f18fd6-c000.snappy.parquet
3/25/2022, 6:15:22 PM
Hot (Inferred)
Block blob
14.41 GiB
Available
part-00003-17c88f26-f152-4b27-80cf-5ae372662950-c000.snappy.parquet
3/25/2022, 9:48:14 PM
Hot (Inferred)
Block blob
14.43 GiB
Available