cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

foreachBatch doesn't work in structured streaming

drag7ter
Contributor

I' m trying to print out number of rows in the batch, but seems it doesn't work properly. I have 1 node compute optimized cluster and run in notebook this code:

# Logging the row count using a streaming-friendly approach
def log_row_count(batch_df, batch_id):
  display(batch_df)
  row_count = 0
  if not batch_df.isEmpty():
    row_count = batch_df.count()
  print(f"{row_count} rows have been appended to {FULL_TABLE_NAME}")
  LOGGER.info(f"{row_count} rows have been appended to {FULL_TABLE_NAME}")

# Configure Auto Loader to ingest JSON data to a Delta table
ptv = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation", checkpoint_path) \
    .option("cloudFiles.inferColumnTypes", "true") \
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
    .load(file_path)

ptv \
  .writeStream \
  .option("checkpointLocation", checkpoint_path) \
  .outputMode("append") \
  .trigger(availableNow=True) \
  .foreachBatch(log_row_count) \
  .toTable(FULL_TABLE_NAME) \
  .awaitTermination()

The only output I get is:
 Capture.PNG

What should I do to print count in foreachBatch() ?

1 ACCEPTED SOLUTION

Accepted Solutions

szymon_dybczak
Esteemed Contributor III

Hi @saffovski ,

This is expected behavior. By default if you use print in foreachbatch it will output to driver log. So, check your driver logs 🙂

View solution in original post

7 REPLIES 7

Sidhant07
Databricks Employee
Databricks Employee

Hi,

Can you try this:

def log_row_count(batch_df, batch_id):
    row_count = batch_df.count()
    print(f"Batch ID {batch_id}: {row_count} rows have been processed")
    LOGGER.info(f"Batch ID {batch_id}: {row_count} rows have been processed")

ptv.writeStream \
    .option("checkpointLocation", checkpoint_path) \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .foreachBatch(log_row_count) \
    .start() \
    .awaitTermination()

# Write to table separately if needed
ptv.writeStream \
    .option("checkpointLocation", f"{checkpoint_path}_table") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .toTable(FULL_TABLE_NAME) \
    .awaitTermination()

saffovski
New Contributor II

Hi, I am facing the exact same error. The method that I'm calling in the foreachBatch is just a very simple print statement that test whether the method is called or no, and the print is not printed out. Here's a code snippet:

def debug_batch(batch_df, batch_id):
    print(f"Batch {batch_id} started, row count = {batch_df.count()}")

query = (
    df.writeStream
        .option("checkpointLocation", checkpoint_path)
        .trigger(availableNow=True)
        .foreachBatch(debug_batch)
        .start()
        .awaitTermination()
)

szymon_dybczak
Esteemed Contributor III

Hi @saffovski ,

This is expected behavior. By default if you use print in foreachbatch it will output to driver log. So, check your driver logs 🙂

Hi @Advika ,

Could you mark above reply as an answer to the thread? This question keeps popping up and it would be good to have a solution for it. The behaviuor that I mentioned above is described here:

Use foreachBatch to write to arbitrary data sinks | Databricks Documentation

Thanks for clarifying and sharing the doc @szymon_dybczak!
I’ve marked your reply as a solution.

szymon_dybczak
Esteemed Contributor III

Thanks @Advika 👍

Hi Szymon, thank you for your answer. However, if I use the froeachBatch for my actual work (reading each microbatch while reading a stream of files), it clearly shows that the foreachBatch part is not executed. I was reading something that it might be related to a serverless compute, but wasn't able to confirm this in the official Dabricks documentation.
Is this the reason?