Trigger.AvailableNow does not support maxOffsetsPerTrigger in Databricks runtime 10.3

SimonY
New Contributor III

Hello,

I ran a spark stream job to ingest data from kafka to test Trigger.AvailableNow.

What's environment the job run ?

1: Databricks runtime 10.3

2: Azure cloud

3: 1 Driver node + 3 work nodes( 14GB, 4core)

val maxOffsetsPerTrigger = "500"

spark.conf.set("spark.databricks.delta.autoCompact.enabled",      "auto")

...

val rdf = spark

 .readStream

 .format("kafka")

 .option("kafka.security.protocol", "SASL_PLAINTEXT")

 .option("kafka.sasl.mechanism",  "SCRAM-SHA-512")

 .option("kafka.sasl.jaas.config", "<>")

 .option("kafka.bootstrap.servers", servers)

 .option("subscribe",        topic)

 .option("startingOffsets",     "earliest") 

 .option("maxOffsetsPerTrigger",  maxOffsetsPerTrigger)

 .load()

rdf.writeStream

 .format("delta")

 .outputMode("append")

 .option("mergeSchema", "true")

 .option("checkpointLocation", ckpPath)

 .trigger(Trigger.AvailableNow)

 .start(tabPath)

 .awaitTermination()

What I expected to see:

1: The spark stream job can read all data from Kafka and then quit

2: The spark stream will apply maxOffsetsPerTrigger for each micro batch

What I see:

the Kafka topic has four partitions, it takes 5 hours to generate 4 huge data files.

part-00000-89afacf1-f2e6-4904-b313-080d48034859-c000.snappy.parquet

3/25/2022, 9:50:48 PM

Hot (Inferred)

Block blob

14.39 GiB

Available

part-00001-cf932ee2-8535-4dd6-9dab-e94b9292a438-c000.snappy.parquet

3/25/2022, 6:15:36 PM

Hot (Inferred)

Block blob

14.38 GiB

Available

part-00002-7d481793-10dc-4739-8c20-972cb6f18fd6-c000.snappy.parquet

3/25/2022, 6:15:22 PM

Hot (Inferred)

Block blob

14.41 GiB

Available

part-00003-17c88f26-f152-4b27-80cf-5ae372662950-c000.snappy.parquet

3/25/2022, 9:48:14 PM

Hot (Inferred)

Block blob

14.43 GiB

Available