cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Trigger.AvailableNow does not support maxOffsetsPerTrigger in Databricks runtime 10.3

SimonY
New Contributor III

Hello,

I ran a spark stream job to ingest data from kafka to test Trigger.AvailableNow.

What's environment the job run ?

1: Databricks runtime 10.3

2: Azure cloud

3: 1 Driver node + 3 work nodes( 14GB, 4core)

val maxOffsetsPerTrigger = "500"

spark.conf.set("spark.databricks.delta.autoCompact.enabled",      "auto")

...

val rdf = spark

 .readStream

 .format("kafka")

 .option("kafka.security.protocol", "SASL_PLAINTEXT")

 .option("kafka.sasl.mechanism",  "SCRAM-SHA-512")

 .option("kafka.sasl.jaas.config", "<>")

 .option("kafka.bootstrap.servers", servers)

 .option("subscribe",        topic)

 .option("startingOffsets",     "earliest") 

 .option("maxOffsetsPerTrigger",  maxOffsetsPerTrigger)

 .load()

rdf.writeStream

 .format("delta")

 .outputMode("append")

 .option("mergeSchema", "true")

 .option("checkpointLocation", ckpPath)

 .trigger(Trigger.AvailableNow)

 .start(tabPath)

 .awaitTermination()

What I expected to see:

1: The spark stream job can read all data from Kafka and then quit

2: The spark stream will apply maxOffsetsPerTrigger for each micro batch

What I see:

the Kafka topic has four partitions, it takes 5 hours to generate 4 huge data files.

part-00000-89afacf1-f2e6-4904-b313-080d48034859-c000.snappy.parquet

3/25/2022, 9:50:48 PM

Hot (Inferred)

Block blob

14.39 GiB

Available

part-00001-cf932ee2-8535-4dd6-9dab-e94b9292a438-c000.snappy.parquet

3/25/2022, 6:15:36 PM

Hot (Inferred)

Block blob

14.38 GiB

Available

part-00002-7d481793-10dc-4739-8c20-972cb6f18fd6-c000.snappy.parquet

3/25/2022, 6:15:22 PM

Hot (Inferred)

Block blob

14.41 GiB

Available

part-00003-17c88f26-f152-4b27-80cf-5ae372662950-c000.snappy.parquet

3/25/2022, 9:48:14 PM

Hot (Inferred)

Block blob

14.43 GiB

Available

1 ACCEPTED SOLUTION

Accepted Solutions

SimonY
New Contributor III

@Karli Watsicaโ€‹ , thanks for help. This issue has been fixed in databricks 10.4 and spark 3.3.

[SPARK-36649] [SQL] Support Trigger.AvailableNow on Kafka data source

https://docs.databricks.com/release-notes/runtime/10.4.html

View solution in original post

3 REPLIES 3

Eulaliasw
New Contributor II

Weโ€™re constantly working to improve our features based on feedback like this, so Iโ€™ll be sure to share your request to the API product team.

usps liteblue

SimonY
New Contributor III

@Karli Watsicaโ€‹ , thanks for help. This issue has been fixed in databricks 10.4 and spark 3.3.

[SPARK-36649] [SQL] Support Trigger.AvailableNow on Kafka data source

https://docs.databricks.com/release-notes/runtime/10.4.html

Anonymous
Not applicable

You'd be better off with 1 node with 12 cores than 3 nodes with 4 each. You're shuffles are going to be much better one 1 machine.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.