cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Spark streaming autoloader slow second batch - checkpoint issues?

drewster
New Contributor III

I am running a massive history of about 250gb ~6mil phone call transcriptions (json read in as raw text) from a raw -> bronze pipeline in Azure Databricks using pyspark.

The source is mounted storage and is continuously having files added and we do not delete/archive at the source.

I am using Autoloader and Trigger Available Now. (see code below)

This is scaling well and I am able to process all the files given the current configuration in under 2 hours.

The trigger Available Now breaks up the massive history batch well for my cluster size.

I start running into issues when I kick off the stream again AFTER the history has completed.

The microbatch execution log states that my latestOffset part of my trigger takes approximately 4,140,000 or 69 minutes just to get the offsets.

Once that insane offset time is finished the addBatch only takes a couple seconds to append to the destination.

Based on my cluster and configuration I can process around 1300 rec/sec until history (~6 mil files) is completed but as soon as I start the second batch the stream gets caught up reading the latest offset and I process at less than 1 rec/sec.

I have tried a multitude of configurations aimlessly to see if it solves the problem but no avail and there does not seem to be others with this issue so my last resort is posting here.

One thing to note is that based on the data I do not have a well balanced column to partition on and don't need one for down stream transformation so a solution with or without one will work for me.

Here is the current configuration of the read and write streams....

# Stream Read Function
 
def read_stream_raw(spark: SparkSession, rawPath: str) -> DataFrame:
  """Read a stream from a specified path
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  rawPath : str
      path to directory of files
 
  Returns
  -------
  DataFrame
      a dataframe with one column "value" type str that contains raw data for each row in raw file
  """
  kafka_schema = "value STRING"
  return (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .option("wholetext", "true")
    .option("cloudFiles.maxBytesPerTrigger", "10g")
    .schema("value STRING")
    .load(rawPath)
  )
 
 
rawDF = read_stream_raw(spark, rawPath=landingPath)
# Transformation
 
def transform_raw_to_bronze(spark: SparkSession, bronze: DataFrame) -> DataFrame:
  """Read a stream from a specified path and tag some metadata
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  bronze : DataFrame
      A spark df
 
  Returns
  -------
  DataFrame
      a dataframe with extra columns tagging more information
  """
  df = (
    bronze
    .select(
      lit("/my/cloud/storage").alias("datasource"),
      current_timestamp().alias("ingesttime"),
      "value",
      current_timestamp().cast("date").alias("ingestdate")
    )
    .withColumn("input_filename", input_file_name())
  )
  return df
 
 
bronzeDF = transform_raw_to_bronze(spark, rawDF)
def create_stream_writer(
    dataframe: DataFrame,
    checkpoint: str,
    name: str,
    partition_column: str = None,
    mode: str = "append",
) -> DataStreamWriter:
  """Write stream to specified path
 
  Parameters
  ----------
  dataframe: DataFrame
    A spark dataframe
  checkpoint: str
    unique checkpoint location
  name: str
    unique identifying name of the stream
  partition_column: str = None
    column to partition the stream by
  mode: str = "append"
    outout mode of the stream
 
  Returns
  -------
  StreamWriter
      an active stream
  """
  stream_writer = (
    dataframe
    .writeStream
    .format("delta")
    .outputMode(mode)
    .option("checkpointLocation", checkpoint)
    .queryName(name)
    .trigger(availableNow=True)
  )
 
  if partition_column is not None:
      return stream_writer.partitionBy(partition_column)
 
  return stream_writer
 
 
rawToBronzeWriter = create_stream_writer(
  dataframe=bronzeDF,
  checkpoint=bronzeCheckpoint, 
  mode="append", 
  name="rawToBronze"
)
 
stream = rawToBronzeWriter.start(bronzePath)
 

1 ACCEPTED SOLUTION

Accepted Solutions

Dan_Z
Honored Contributor
Honored Contributor

@Drew Ringoโ€‹ , What's happening here is that the directory is so large and it's having to do a full scan on that second batch which takes time, which should be parallelized in DBR 9.1+. I think what you need is IncrementalListing in your directory. If you have not seen it, this part of our docs should help: https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html#incremental-listing-1

I you could try cloudFiles.useIncrementalListing "true", and then manually specifying a backfill interval, like "1 day".

View solution in original post

17 REPLIES 17

Kaniz_Fatma
Community Manager
Community Manager

Hi @Drew Ringoโ€‹ ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

RKNutalapati
Valued Contributor

Hi @Drew Ringoโ€‹ , Based on your input, I am assuming that your file directory is huge, in this case I think you can try with option("cloudFiles.useNotifications" , "true")

Your assumption is correct. The file directory is huge and will only get bigger. Using option("cloudFiles.useNotifications" , "true") I will have to configure the Azure Event Grid and Queue storage based on the documentation located here correct?

Load files from Azure Data Lake Storage Gen2 (ADLS Gen2) using Auto Loader - Azure Databricks | Micr...

Hi @Drew Ringoโ€‹ , I am not that familiar with Azure. But yes by looking into the below nice article. I think some configurations need to be set.

https://chinnychukwudozie.com/2020/09/30/incrementally-process-data-lake-files-using-azure-databrick...

May be people from Azure better comment on this

Thanks,

RK

drewster
New Contributor III

Thank you for this. I am working with others now to make sure I have the correct permissions to configure this based on the article and the Azure documentation. Once implemented and tested I will respond.

Anonymous
Not applicable

I agree with drewster, it does appear to be a listing problem. Notifications will perform better at scale. Without the notification, it has to get the full list of files and compare it to the already processed list and that's what's taking a long time.

drewster
New Contributor III

Hello again @Joseph Kambourakisโ€‹ ,

I've been working with my Cloud Engineer and the service principal and permissions are all set up.

My new configuration looks like this....

def read_stream_raw(spark: SparkSession, rawPath: str) -> DataFrame:
  """Read a stream from a specified path
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  rawPath : str
      path to directory of files
 
  Returns
  -------
  DataFrame
      a dataframe with one column "value" type str that contains raw data for each row in raw file
  """
  kafka_schema = "value STRING"
  return (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .option("wholetext", "true")
    .option("cloudFiles.maxBytesPerTrigger", "10g")
    .option("cloudFiles.validateOptions", "true")
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.clientId", "id")
    .option("cloudFiles.clientSecret", "secret)
    .option("cloudFiles.resourceGroup", "rg")
    .option("cloudFiles.subscriptionId", "subId")
    .option("cloudFiles.tenantId", "tenId")
    .schema("value STRING")
    .load(rawPath)
  )
 

The rest of the configuration is the same as above.

Note: I am not using a connection string per the documentation because I am on Databricks runtime 10.3 where connection strings are not necessary after 8.1.

We are getting this error...

ERROR MicroBatchExecution: Query rawToBronze [id = 93e1fcd7-6529-4ba0-a694-be9e67239ae1, runId = c91c00b3-0c53-43cd-8742-0a7f99c5e832] terminated with error
com.microsoft.azure.storage.StorageException: 
....
Caused by: java.net.UnknownHostException: exampleAccount.queue.core.windows.net
 

Our assumptions were that the Queue would automatically get set up without a connection string given the correct Service Principal configuration and a Databricks runtime > 8.1.

Any thoughts? I am continuing to trouble shoot and will test a more primitive version on a databricks runtime < 8.1 with a connection string.

Regardless of the above outcome I need to be on runtime >10.2 for the AvailableNow Trigger.

Please any thoughts/directions would be incredibly beneficial.

Thank you!

UPDATE: I got the same error on Databricks runtime 7.3 where the connection string is required to configure autoloader. The only change I needed to make in the setup besides the runtime is change the trigger type because Once & AvailableNow didn't exist in runtime 7.3

drewster
New Contributor III

UPDATE @Joseph Kambourakisโ€‹ 

It seems that we have found that ADLS Gen2 Premium storage does not support Queue storage.

Therefore the Autoloader fails.

My Cloud Engineer stood up a standard tier storage in ADLS Gen2 and I was able to connect to it and run Autloader with the same exact configurations that initially caused it to fail.

Not sure who to ask/inform about this finding as it is not in any documentation to my knowledge.

Hopefully we can get this info out there and figure out some next steps if our assumptions are correct.

Thank you.

Hi @Drew Ringoโ€‹ , Premium tier offers significantly lower storage latencies as compared to other tiers and cost savings for workloads that are transaction heavy.

This is valuable for a number of scenarios and workload types as follows:

  • Scenarios that require real-time access and random read/write access to large volumes of data
  • Workloads that have small-sized read and write transactions
  • Read heavy workloads
  • Workloads that have a high number of transactions or high transactions per GB ratio

For more information, please refer to ADLS Gen2 Premium tier documentation.

Source

Dan_Z
Honored Contributor
Honored Contributor

@Drew Ringoโ€‹ , What's happening here is that the directory is so large and it's having to do a full scan on that second batch which takes time, which should be parallelized in DBR 9.1+. I think what you need is IncrementalListing in your directory. If you have not seen it, this part of our docs should help: https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html#incremental-listing-1

I you could try cloudFiles.useIncrementalListing "true", and then manually specifying a backfill interval, like "1 day".

Kaniz_Fatma
Community Manager
Community Manager

Hi @Drew Ringoโ€‹  , Just a friendly follow-up. Do you still need help, or @Dan Zafarโ€‹ 's response help you to find the solution? Please let us know.

drewster
New Contributor III

@Dan Zafarโ€‹ @Kaniz Fatmaโ€‹ I will be trying the recommendation by @Dan Zafarโ€‹ today.

Hi @Drew Ringoโ€‹ ,

Just a friendly follow-up. Did you have time to try Dan's recommendations? do you still need help or this recommendation helped? please let us know.

I just tested it out and my stream initialization times seem to have gone down. Can someone explain the backfill interval?

Based on the documentation located here its sounds like the backfill is almost like a full directory list at a given interval to make sure that all files have been processed.

Is this true?

If it is true then that would imply that the option is not used unless the auto loader is configured for event notification services right?

EDIT:

After I let it run for a little while after re starting the stream recs/sec tanked. I went from ~180 rec/esc to ~30 rec/sec.

I have auto compaction and auto optimization turned on as well because I dont have a partitioning column that lets me create balanced partitions.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group