cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark streaming autoloader slow second batch - checkpoint issues?

drewster
New Contributor III

I am running a massive history of about 250gb ~6mil phone call transcriptions (json read in as raw text) from a raw -> bronze pipeline in Azure Databricks using pyspark.

The source is mounted storage and is continuously having files added and we do not delete/archive at the source.

I am using Autoloader and Trigger Available Now. (see code below)

This is scaling well and I am able to process all the files given the current configuration in under 2 hours.

The trigger Available Now breaks up the massive history batch well for my cluster size.

I start running into issues when I kick off the stream again AFTER the history has completed.

The microbatch execution log states that my latestOffset part of my trigger takes approximately 4,140,000 or 69 minutes just to get the offsets.

Once that insane offset time is finished the addBatch only takes a couple seconds to append to the destination.

Based on my cluster and configuration I can process around 1300 rec/sec until history (~6 mil files) is completed but as soon as I start the second batch the stream gets caught up reading the latest offset and I process at less than 1 rec/sec.

I have tried a multitude of configurations aimlessly to see if it solves the problem but no avail and there does not seem to be others with this issue so my last resort is posting here.

One thing to note is that based on the data I do not have a well balanced column to partition on and don't need one for down stream transformation so a solution with or without one will work for me.

Here is the current configuration of the read and write streams....

# Stream Read Function
 
def read_stream_raw(spark: SparkSession, rawPath: str) -> DataFrame:
  """Read a stream from a specified path
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  rawPath : str
      path to directory of files
 
  Returns
  -------
  DataFrame
      a dataframe with one column "value" type str that contains raw data for each row in raw file
  """
  kafka_schema = "value STRING"
  return (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .option("wholetext", "true")
    .option("cloudFiles.maxBytesPerTrigger", "10g")
    .schema("value STRING")
    .load(rawPath)
  )
 
 
rawDF = read_stream_raw(spark, rawPath=landingPath)
# Transformation
 
def transform_raw_to_bronze(spark: SparkSession, bronze: DataFrame) -> DataFrame:
  """Read a stream from a specified path and tag some metadata
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  bronze : DataFrame
      A spark df
 
  Returns
  -------
  DataFrame
      a dataframe with extra columns tagging more information
  """
  df = (
    bronze
    .select(
      lit("/my/cloud/storage").alias("datasource"),
      current_timestamp().alias("ingesttime"),
      "value",
      current_timestamp().cast("date").alias("ingestdate")
    )
    .withColumn("input_filename", input_file_name())
  )
  return df
 
 
bronzeDF = transform_raw_to_bronze(spark, rawDF)
def create_stream_writer(
    dataframe: DataFrame,
    checkpoint: str,
    name: str,
    partition_column: str = None,
    mode: str = "append",
) -> DataStreamWriter:
  """Write stream to specified path
 
  Parameters
  ----------
  dataframe: DataFrame
    A spark dataframe
  checkpoint: str
    unique checkpoint location
  name: str
    unique identifying name of the stream
  partition_column: str = None
    column to partition the stream by
  mode: str = "append"
    outout mode of the stream
 
  Returns
  -------
  StreamWriter
      an active stream
  """
  stream_writer = (
    dataframe
    .writeStream
    .format("delta")
    .outputMode(mode)
    .option("checkpointLocation", checkpoint)
    .queryName(name)
    .trigger(availableNow=True)
  )
 
  if partition_column is not None:
      return stream_writer.partitionBy(partition_column)
 
  return stream_writer
 
 
rawToBronzeWriter = create_stream_writer(
  dataframe=bronzeDF,
  checkpoint=bronzeCheckpoint, 
  mode="append", 
  name="rawToBronze"
)
 
stream = rawToBronzeWriter.start(bronzePath)
 

1 ACCEPTED SOLUTION

Accepted Solutions

Dan_Z
Honored Contributor
Honored Contributor

@Drew Ringo​ , What's happening here is that the directory is so large and it's having to do a full scan on that second batch which takes time, which should be parallelized in DBR 9.1+. I think what you need is IncrementalListing in your directory. If you have not seen it, this part of our docs should help: https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html#incremental-listing-1

I you could try cloudFiles.useIncrementalListing "true", and then manually specifying a backfill interval, like "1 day".

View solution in original post

17 REPLIES 17

Kaniz
Community Manager
Community Manager

Hi @Drew Ringo​ ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

RKNutalapati
Valued Contributor

Hi @Drew Ringo​ , Based on your input, I am assuming that your file directory is huge, in this case I think you can try with option("cloudFiles.useNotifications" , "true")

Your assumption is correct. The file directory is huge and will only get bigger. Using option("cloudFiles.useNotifications" , "true") I will have to configure the Azure Event Grid and Queue storage based on the documentation located here correct?

Load files from Azure Data Lake Storage Gen2 (ADLS Gen2) using Auto Loader - Azure Databricks | Micr...

Hi @Drew Ringo​ , I am not that familiar with Azure. But yes by looking into the below nice article. I think some configurations need to be set.

https://chinnychukwudozie.com/2020/09/30/incrementally-process-data-lake-files-using-azure-databrick...

May be people from Azure better comment on this

Thanks,

RK

drewster
New Contributor III

Thank you for this. I am working with others now to make sure I have the correct permissions to configure this based on the article and the Azure documentation. Once implemented and tested I will respond.

Anonymous
Not applicable

I agree with drewster, it does appear to be a listing problem. Notifications will perform better at scale. Without the notification, it has to get the full list of files and compare it to the already processed list and that's what's taking a long time.

drewster
New Contributor III

Hello again @Joseph Kambourakis​ ,

I've been working with my Cloud Engineer and the service principal and permissions are all set up.

My new configuration looks like this....

def read_stream_raw(spark: SparkSession, rawPath: str) -> DataFrame:
  """Read a stream from a specified path
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  rawPath : str
      path to directory of files
 
  Returns
  -------
  DataFrame
      a dataframe with one column "value" type str that contains raw data for each row in raw file
  """
  kafka_schema = "value STRING"
  return (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .option("wholetext", "true")
    .option("cloudFiles.maxBytesPerTrigger", "10g")
    .option("cloudFiles.validateOptions", "true")
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.clientId", "id")
    .option("cloudFiles.clientSecret", "secret)
    .option("cloudFiles.resourceGroup", "rg")
    .option("cloudFiles.subscriptionId", "subId")
    .option("cloudFiles.tenantId", "tenId")
    .schema("value STRING")
    .load(rawPath)
  )
 

The rest of the configuration is the same as above.

Note: I am not using a connection string per the documentation because I am on Databricks runtime 10.3 where connection strings are not necessary after 8.1.

We are getting this error...

ERROR MicroBatchExecution: Query rawToBronze [id = 93e1fcd7-6529-4ba0-a694-be9e67239ae1, runId = c91c00b3-0c53-43cd-8742-0a7f99c5e832] terminated with error
com.microsoft.azure.storage.StorageException: 
....
Caused by: java.net.UnknownHostException: exampleAccount.queue.core.windows.net
 

Our assumptions were that the Queue would automatically get set up without a connection string given the correct Service Principal configuration and a Databricks runtime > 8.1.

Any thoughts? I am continuing to trouble shoot and will test a more primitive version on a databricks runtime < 8.1 with a connection string.

Regardless of the above outcome I need to be on runtime >10.2 for the AvailableNow Trigger.

Please any thoughts/directions would be incredibly beneficial.

Thank you!

UPDATE: I got the same error on Databricks runtime 7.3 where the connection string is required to configure autoloader. The only change I needed to make in the setup besides the runtime is change the trigger type because Once & AvailableNow didn't exist in runtime 7.3

drewster
New Contributor III

UPDATE @Joseph Kambourakis​ 

It seems that we have found that ADLS Gen2 Premium storage does not support Queue storage.

Therefore the Autoloader fails.

My Cloud Engineer stood up a standard tier storage in ADLS Gen2 and I was able to connect to it and run Autloader with the same exact configurations that initially caused it to fail.

Not sure who to ask/inform about this finding as it is not in any documentation to my knowledge.

Hopefully we can get this info out there and figure out some next steps if our assumptions are correct.

Thank you.

Kaniz
Community Manager
Community Manager

Hi @Drew Ringo​ , Premium tier offers significantly lower storage latencies as compared to other tiers and cost savings for workloads that are transaction heavy.

This is valuable for a number of scenarios and workload types as follows:

  • Scenarios that require real-time access and random read/write access to large volumes of data
  • Workloads that have small-sized read and write transactions
  • Read heavy workloads
  • Workloads that have a high number of transactions or high transactions per GB ratio

For more information, please refer to ADLS Gen2 Premium tier documentation.

Source

Dan_Z
Honored Contributor
Honored Contributor

@Drew Ringo​ , What's happening here is that the directory is so large and it's having to do a full scan on that second batch which takes time, which should be parallelized in DBR 9.1+. I think what you need is IncrementalListing in your directory. If you have not seen it, this part of our docs should help: https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html#incremental-listing-1

I you could try cloudFiles.useIncrementalListing "true", and then manually specifying a backfill interval, like "1 day".

Kaniz
Community Manager
Community Manager

Hi @Drew Ringo​  , Just a friendly follow-up. Do you still need help, or @Dan Zafar​ 's response help you to find the solution? Please let us know.

drewster
New Contributor III

@Dan Zafar​ @Kaniz Fatma​ I will be trying the recommendation by @Dan Zafar​ today.

Hi @Drew Ringo​ ,

Just a friendly follow-up. Did you have time to try Dan's recommendations? do you still need help or this recommendation helped? please let us know.

I just tested it out and my stream initialization times seem to have gone down. Can someone explain the backfill interval?

Based on the documentation located here its sounds like the backfill is almost like a full directory list at a given interval to make sure that all files have been processed.

Is this true?

If it is true then that would imply that the option is not used unless the auto loader is configured for event notification services right?

EDIT:

After I let it run for a little while after re starting the stream recs/sec tanked. I went from ~180 rec/esc to ~30 rec/sec.

I have auto compaction and auto optimization turned on as well because I dont have a partitioning column that lets me create balanced partitions.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.