Sidhant07
Databricks Employee
Databricks Employee

Hi,

Can you try this:

def log_row_count(batch_df, batch_id):
    row_count = batch_df.count()
    print(f"Batch ID {batch_id}: {row_count} rows have been processed")
    LOGGER.info(f"Batch ID {batch_id}: {row_count} rows have been processed")

ptv.writeStream \
    .option("checkpointLocation", checkpoint_path) \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .foreachBatch(log_row_count) \
    .start() \
    .awaitTermination()

# Write to table separately if needed
ptv.writeStream \
    .option("checkpointLocation", f"{checkpoint_path}_table") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .toTable(FULL_TABLE_NAME) \
    .awaitTermination()