foreachBatch doesn't work in structured streaming
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-27-2024 07:05 AM
I' m trying to print out number of rows in the batch, but seems it doesn't work properly. I have 1 node compute optimized cluster and run in notebook this code:
# Logging the row count using a streaming-friendly approach
def log_row_count(batch_df, batch_id):
display(batch_df)
row_count = 0
if not batch_df.isEmpty():
row_count = batch_df.count()
print(f"{row_count} rows have been appended to {FULL_TABLE_NAME}")
LOGGER.info(f"{row_count} rows have been appended to {FULL_TABLE_NAME}")
# Configure Auto Loader to ingest JSON data to a Delta table
ptv = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.option("cloudFiles.schemaLocation", checkpoint_path) \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.load(file_path)
ptv \
.writeStream \
.option("checkpointLocation", checkpoint_path) \
.outputMode("append") \
.trigger(availableNow=True) \
.foreachBatch(log_row_count) \
.toTable(FULL_TABLE_NAME) \
.awaitTermination()
The only output I get is:
What should I do to print count in foreachBatch() ?
1 REPLY 1
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-09-2024 12:56 AM
Hi,
Can you try this:
def log_row_count(batch_df, batch_id):
row_count = batch_df.count()
print(f"Batch ID {batch_id}: {row_count} rows have been processed")
LOGGER.info(f"Batch ID {batch_id}: {row_count} rows have been processed")
ptv.writeStream \
.option("checkpointLocation", checkpoint_path) \
.outputMode("append") \
.trigger(availableNow=True) \
.foreachBatch(log_row_count) \
.start() \
.awaitTermination()
# Write to table separately if needed
ptv.writeStream \
.option("checkpointLocation", f"{checkpoint_path}_table") \
.outputMode("append") \
.trigger(availableNow=True) \
.toTable(FULL_TABLE_NAME) \
.awaitTermination()

