cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark streaming autoloader slow second batch - checkpoint issues?

drewster
New Contributor III

I am running a massive history of about 250gb ~6mil phone call transcriptions (json read in as raw text) from a raw -> bronze pipeline in Azure Databricks using pyspark.

The source is mounted storage and is continuously having files added and we do not delete/archive at the source.

I am using Autoloader and Trigger Available Now. (see code below)

This is scaling well and I am able to process all the files given the current configuration in under 2 hours.

The trigger Available Now breaks up the massive history batch well for my cluster size.

I start running into issues when I kick off the stream again AFTER the history has completed.

The microbatch execution log states that my latestOffset part of my trigger takes approximately 4,140,000 or 69 minutes just to get the offsets.

Once that insane offset time is finished the addBatch only takes a couple seconds to append to the destination.

Based on my cluster and configuration I can process around 1300 rec/sec until history (~6 mil files) is completed but as soon as I start the second batch the stream gets caught up reading the latest offset and I process at less than 1 rec/sec.

I have tried a multitude of configurations aimlessly to see if it solves the problem but no avail and there does not seem to be others with this issue so my last resort is posting here.

One thing to note is that based on the data I do not have a well balanced column to partition on and don't need one for down stream transformation so a solution with or without one will work for me.

Here is the current configuration of the read and write streams....

# Stream Read Function
 
def read_stream_raw(spark: SparkSession, rawPath: str) -> DataFrame:
  """Read a stream from a specified path
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  rawPath : str
      path to directory of files
 
  Returns
  -------
  DataFrame
      a dataframe with one column "value" type str that contains raw data for each row in raw file
  """
  kafka_schema = "value STRING"
  return (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .option("wholetext", "true")
    .option("cloudFiles.maxBytesPerTrigger", "10g")
    .schema("value STRING")
    .load(rawPath)
  )
 
 
rawDF = read_stream_raw(spark, rawPath=landingPath)
# Transformation
 
def transform_raw_to_bronze(spark: SparkSession, bronze: DataFrame) -> DataFrame:
  """Read a stream from a specified path and tag some metadata
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  bronze : DataFrame
      A spark df
 
  Returns
  -------
  DataFrame
      a dataframe with extra columns tagging more information
  """
  df = (
    bronze
    .select(
      lit("/my/cloud/storage").alias("datasource"),
      current_timestamp().alias("ingesttime"),
      "value",
      current_timestamp().cast("date").alias("ingestdate")
    )
    .withColumn("input_filename", input_file_name())
  )
  return df
 
 
bronzeDF = transform_raw_to_bronze(spark, rawDF)
def create_stream_writer(
    dataframe: DataFrame,
    checkpoint: str,
    name: str,
    partition_column: str = None,
    mode: str = "append",
) -> DataStreamWriter:
  """Write stream to specified path
 
  Parameters
  ----------
  dataframe: DataFrame
    A spark dataframe
  checkpoint: str
    unique checkpoint location
  name: str
    unique identifying name of the stream
  partition_column: str = None
    column to partition the stream by
  mode: str = "append"
    outout mode of the stream
 
  Returns
  -------
  StreamWriter
      an active stream
  """
  stream_writer = (
    dataframe
    .writeStream
    .format("delta")
    .outputMode(mode)
    .option("checkpointLocation", checkpoint)
    .queryName(name)
    .trigger(availableNow=True)
  )
 
  if partition_column is not None:
      return stream_writer.partitionBy(partition_column)
 
  return stream_writer
 
 
rawToBronzeWriter = create_stream_writer(
  dataframe=bronzeDF,
  checkpoint=bronzeCheckpoint, 
  mode="append", 
  name="rawToBronze"
)
 
stream = rawToBronzeWriter.start(bronzePath)
 

17 REPLIES 17

I'm having a very similar issue, the backfill option from the documentation will check the file paths vs the meta data in checkpoints to see if everything has been read. By default backfills seem to be disabled.

The problem i'm having is that incremental directory listing just hangs, or takes forever to run post the initial write, and even if you use event notifications which seem to be really fast for updates, they don't provide 100% delivery guarantees, meaning that backfills will need to be ran often... But then we come back to the initial problem of having to do directory listing against potentially a massive directory for the backfills

Hi @Drew Ringo​, I was checking back to see if your problem is resolved. If you have any solution, please share it with the community as it can be helpful to others.

Brooksjit
New Contributor III

Thank you for the explanation.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group