I' m trying to print out number of rows in the batch, but seems it doesn't work properly. I have 1 node compute optimized cluster and run in notebook this code:
# Logging the row count using a streaming-friendly approach
def log_row_count(batch_df, batch_id):
display(batch_df)
row_count = 0
if not batch_df.isEmpty():
row_count = batch_df.count()
print(f"{row_count} rows have been appended to {FULL_TABLE_NAME}")
LOGGER.info(f"{row_count} rows have been appended to {FULL_TABLE_NAME}")
# Configure Auto Loader to ingest JSON data to a Delta table
ptv = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.option("cloudFiles.schemaLocation", checkpoint_path) \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.load(file_path)
ptv \
.writeStream \
.option("checkpointLocation", checkpoint_path) \
.outputMode("append") \
.trigger(availableNow=True) \
.foreachBatch(log_row_count) \
.toTable(FULL_TABLE_NAME) \
.awaitTermination()
The only output I get is:
What should I do to print count in foreachBatch() ?