cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Lakeflow SDP partition error

IM_01
Contributor III

Hi,
I was trying to log an exception in Lakeflow SDP , firstly I am creating an empty streaming dataframe in case of exception and writing log into audit table as shown below

try:
	raise Exception("testexception")
	return df
except Exception as e:
	df=spark. createDataFrame([{f"error _msg": str (e) }], schema="error_msg
	string")
	df.write.insertInto("cat.sch.tbl_stg_tst_audit")

	df=spark.readstream.format("rate").load()
	df=df.select(*[lit (None).cast(coltype).alias(colname) for colname, coltype in tb_schema l)
	df=df.where("1==0")
	return df

but was facing below error in case of write to original table

 

Category: Error
Message: The number of partitions
(0) used in previous microbatch is different from the current number of partitions (8). There could be two possible reasons:
1. Option "numpartitions" of the rate source gets changed during query restart.
2. The size of the cluster might change during query restart.
Explicitly set option
"numpartitions" of the rate source to 0 to fix this issue.
Error class:
STREAMING_RATE_SOURCE_V2_PARTITION_NUM_CHANGE_UNSUPPORTED

Could anyone please help with this issue

3 REPLIES 3

amirabedhiafi
New Contributor III

Hi @IM_01  !

I think that your issue is caused by using the rate source as a dummy empty stream.

The rate source stores its partition count in the streaming checkpoint and because numPartitions was not explicitly set it can change between runs depending on cluster size or default parallelism which causes STREAMING_RATE_SOURCE_V2_PARTITION_NUM_CHANGE_UNSUPPORTED.

However, I would not recommend this pattern in Lakeflow SDP. A pipeline table function should return a dataframe only and it should not write to an audit table with df.write.insertInto() because dataset definitions may be analyzed or even retried multiple times and side effects can be duplicated or fail unexpectedly.

So let the pipeline fail normally and use the Lakeflow pipeline event log for monitoring and if needed, you can create a separate monitoring job or query or event hook to copy error events into your audit table.

If you still need a workaround for the empty stream, you can explicitly set numPartitions on the rate source and keep it fixed for the lifetime of the checkpoint :

df = (
spark.readStream
.format("rate")
.option("numPartitions", "1")
.option("rowsPerSecond", "1")
.load()
)

df = df.select(
*[F.lit(None).cast(coltype).alias(colname) for colname, coltype in tb_schema]
).where("false")

return df

But if the existing checkpoint already stored numPartitions = 0 you should either set it to the value shown in the error message or do a full refresg on the pipeline before changing it to a new stable value.

If this answer resolves your question, could you please mark it as “Accept as Solution”? It will help other users quickly find the correct fix.

Senior BI/Data Engineer | Microsoft MVP Data Platform | Microsoft MVP Power BI | Power BI Super User | C# Corner MVP

IM_01
Contributor III

Hi @amirabedhiafi 

Thanks for the response ‌‌ 🙂

I am already using the event_hook to capture events of type- flow_definition & flow_Progress 

However I started wondering whether if the exception is handled will the exception would be still captured in audit table used by the event hook.
To ensure that I do not miss any exceptions, I was thinking of maintaining the audit table
for logging exceptions in dlt table decorator function as well

Please let me know if this approach would work , Please feel free to suggest if u see any loopholes in this approach or if you know of better approach Amira 🙂.

amirabedhiafi
New Contributor III

Hi again !

Yes, if you handle the exception, the event hook may miss it as a pipeline failure. But I think the fix should be reraising the exception and audit from the event log not write manually inside the table "decorator" function.

If this answer resolves your question, could you please mark it as “Accept as Solution”? It will help other users quickly find the correct fix.

Senior BI/Data Engineer | Microsoft MVP Data Platform | Microsoft MVP Power BI | Power BI Super User | C# Corner MVP