Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-09-2024 12:56 AM
Hi,
Can you try this:
def log_row_count(batch_df, batch_id):
row_count = batch_df.count()
print(f"Batch ID {batch_id}: {row_count} rows have been processed")
LOGGER.info(f"Batch ID {batch_id}: {row_count} rows have been processed")
ptv.writeStream \
.option("checkpointLocation", checkpoint_path) \
.outputMode("append") \
.trigger(availableNow=True) \
.foreachBatch(log_row_count) \
.start() \
.awaitTermination()
# Write to table separately if needed
ptv.writeStream \
.option("checkpointLocation", f"{checkpoint_path}_table") \
.outputMode("append") \
.trigger(availableNow=True) \
.toTable(FULL_TABLE_NAME) \
.awaitTermination()