foreachBatch doesn't work in structured streaming

drag7ter
Contributor

I' m trying to print out number of rows in the batch, but seems it doesn't work properly. I have 1 node compute optimized cluster and run in notebook this code:

# Logging the row count using a streaming-friendly approach
def log_row_count(batch_df, batch_id):
  display(batch_df)
  row_count = 0
  if not batch_df.isEmpty():
    row_count = batch_df.count()
  print(f"{row_count} rows have been appended to {FULL_TABLE_NAME}")
  LOGGER.info(f"{row_count} rows have been appended to {FULL_TABLE_NAME}")

# Configure Auto Loader to ingest JSON data to a Delta table
ptv = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation", checkpoint_path) \
    .option("cloudFiles.inferColumnTypes", "true") \
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
    .load(file_path)

ptv \
  .writeStream \
  .option("checkpointLocation", checkpoint_path) \
  .outputMode("append") \
  .trigger(availableNow=True) \
  .foreachBatch(log_row_count) \
  .toTable(FULL_TABLE_NAME) \
  .awaitTermination()

The only output I get is:
 Capture.PNG

What should I do to print count in foreachBatch() ?