foreachBatch doesn't work in structured streaming

drag7ter
Contributor

I' m trying to print out number of rows in the batch, but seems it doesn't work properly. I have 1 node compute optimized cluster and run in notebook this code:

# Logging the row count using a streaming-friendly approach
def log_row_count(batch_df, batch_id):
  display(batch_df)
  row_count = 0
  if not batch_df.isEmpty():
    row_count = batch_df.count()
  print(f"{row_count} rows have been appended to {FULL_TABLE_NAME}")
  LOGGER.info(f"{row_count} rows have been appended to {FULL_TABLE_NAME}")

# Configure Auto Loader to ingest JSON data to a Delta table
ptv = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation", checkpoint_path) \
    .option("cloudFiles.inferColumnTypes", "true") \
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
    .load(file_path)

ptv \
  .writeStream \
  .option("checkpointLocation", checkpoint_path) \
  .outputMode("append") \
  .trigger(availableNow=True) \
  .foreachBatch(log_row_count) \
  .toTable(FULL_TABLE_NAME) \
  .awaitTermination()

The only output I get is:
 Capture.PNG

What should I do to print count in foreachBatch() ?

Sidhant07
Databricks Employee
Databricks Employee

Hi,

Can you try this:

def log_row_count(batch_df, batch_id):
    row_count = batch_df.count()
    print(f"Batch ID {batch_id}: {row_count} rows have been processed")
    LOGGER.info(f"Batch ID {batch_id}: {row_count} rows have been processed")

ptv.writeStream \
    .option("checkpointLocation", checkpoint_path) \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .foreachBatch(log_row_count) \
    .start() \
    .awaitTermination()

# Write to table separately if needed
ptv.writeStream \
    .option("checkpointLocation", f"{checkpoint_path}_table") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .toTable(FULL_TABLE_NAME) \
    .awaitTermination()

saffovski
New Contributor II

Hi, I am facing the exact same error. The method that I'm calling in the foreachBatch is just a very simple print statement that test whether the method is called or no, and the print is not printed out. Here's a code snippet:

def debug_batch(batch_df, batch_id):
    print(f"Batch {batch_id} started, row count = {batch_df.count()}")

query = (
    df.writeStream
        .option("checkpointLocation", checkpoint_path)
        .trigger(availableNow=True)
        .foreachBatch(debug_batch)
        .start()
        .awaitTermination()
)

szymon_dybczak
Esteemed Contributor III

Hi @saffovski ,

This is expected behavior. By default if you use print in foreachbatch it will output to driver log. So, check your driver logs 🙂

View solution in original post

Hi @Advika ,

Could you mark above reply as an answer to the thread? This question keeps popping up and it would be good to have a solution for it. The behaviuor that I mentioned above is described here:

Use foreachBatch to write to arbitrary data sinks | Databricks Documentation

Thanks for clarifying and sharing the doc @szymon_dybczak!
I’ve marked your reply as a solution.

szymon_dybczak
Esteemed Contributor III

Thanks @Advika 👍

Hi Szymon, thank you for your answer. However, if I use the froeachBatch for my actual work (reading each microbatch while reading a stream of files), it clearly shows that the foreachBatch part is not executed. I was reading something that it might be related to a serverless compute, but wasn't able to confirm this in the official Dabricks documentation.
Is this the reason?

Malthe
Valued Contributor II

@szymon_dybczak in my testing, the print output does not appear anywhere. There is no trace of them anywhere,  neither in the notebook or in driver logs.