โ02-22-2022 06:39 PM
I am running a massive history of about 250gb ~6mil phone call transcriptions (json read in as raw text) from a raw -> bronze pipeline in Azure Databricks using pyspark.
The source is mounted storage and is continuously having files added and we do not delete/archive at the source.
I am using Autoloader and Trigger Available Now. (see code below)
This is scaling well and I am able to process all the files given the current configuration in under 2 hours.
The trigger Available Now breaks up the massive history batch well for my cluster size.
I start running into issues when I kick off the stream again AFTER the history has completed.
The microbatch execution log states that my latestOffset part of my trigger takes approximately 4,140,000 or 69 minutes just to get the offsets.
Once that insane offset time is finished the addBatch only takes a couple seconds to append to the destination.
Based on my cluster and configuration I can process around 1300 rec/sec until history (~6 mil files) is completed but as soon as I start the second batch the stream gets caught up reading the latest offset and I process at less than 1 rec/sec.
I have tried a multitude of configurations aimlessly to see if it solves the problem but no avail and there does not seem to be others with this issue so my last resort is posting here.
One thing to note is that based on the data I do not have a well balanced column to partition on and don't need one for down stream transformation so a solution with or without one will work for me.
Here is the current configuration of the read and write streams....
# Stream Read Function
 
def read_stream_raw(spark: SparkSession, rawPath: str) -> DataFrame:
  """Read a stream from a specified path
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  rawPath : str
      path to directory of files
 
  Returns
  -------
  DataFrame
      a dataframe with one column "value" type str that contains raw data for each row in raw file
  """
  kafka_schema = "value STRING"
  return (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .option("wholetext", "true")
    .option("cloudFiles.maxBytesPerTrigger", "10g")
    .schema("value STRING")
    .load(rawPath)
  )
 
 
rawDF = read_stream_raw(spark, rawPath=landingPath)# Transformation
 
def transform_raw_to_bronze(spark: SparkSession, bronze: DataFrame) -> DataFrame:
  """Read a stream from a specified path and tag some metadata
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  bronze : DataFrame
      A spark df
 
  Returns
  -------
  DataFrame
      a dataframe with extra columns tagging more information
  """
  df = (
    bronze
    .select(
      lit("/my/cloud/storage").alias("datasource"),
      current_timestamp().alias("ingesttime"),
      "value",
      current_timestamp().cast("date").alias("ingestdate")
    )
    .withColumn("input_filename", input_file_name())
  )
  return df
 
 
bronzeDF = transform_raw_to_bronze(spark, rawDF)def create_stream_writer(
    dataframe: DataFrame,
    checkpoint: str,
    name: str,
    partition_column: str = None,
    mode: str = "append",
) -> DataStreamWriter:
  """Write stream to specified path
 
  Parameters
  ----------
  dataframe: DataFrame
    A spark dataframe
  checkpoint: str
    unique checkpoint location
  name: str
    unique identifying name of the stream
  partition_column: str = None
    column to partition the stream by
  mode: str = "append"
    outout mode of the stream
 
  Returns
  -------
  StreamWriter
      an active stream
  """
  stream_writer = (
    dataframe
    .writeStream
    .format("delta")
    .outputMode(mode)
    .option("checkpointLocation", checkpoint)
    .queryName(name)
    .trigger(availableNow=True)
  )
 
  if partition_column is not None:
      return stream_writer.partitionBy(partition_column)
 
  return stream_writer
 
 
rawToBronzeWriter = create_stream_writer(
  dataframe=bronzeDF,
  checkpoint=bronzeCheckpoint, 
  mode="append", 
  name="rawToBronze"
)
 
stream = rawToBronzeWriter.start(bronzePath)
 โ05-04-2022 09:39 AM
@Drew Ringoโ , What's happening here is that the directory is so large and it's having to do a full scan on that second batch which takes time, which should be parallelized in DBR 9.1+. I think what you need is IncrementalListing in your directory. If you have not seen it, this part of our docs should help: https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html#incremental-listing-1
I you could try cloudFiles.useIncrementalListing "true", and then manually specifying a backfill interval, like "1 day".
โ02-23-2022 02:48 AM
Hi @Drew Ringoโ , Based on your input, I am assuming that your file directory is huge, in this case I think you can try with option("cloudFiles.useNotifications" , "true")
โ02-23-2022 05:41 AM
Your assumption is correct. The file directory is huge and will only get bigger. Using option("cloudFiles.useNotifications" , "true") I will have to configure the Azure Event Grid and Queue storage based on the documentation located here correct?
โ02-23-2022 07:17 AM
Hi @Drew Ringoโ , I am not that familiar with Azure. But yes by looking into the below nice article. I think some configurations need to be set.
May be people from Azure better comment on this
Thanks,
RK
โ02-23-2022 07:47 AM
Thank you for this. I am working with others now to make sure I have the correct permissions to configure this based on the article and the Azure documentation. Once implemented and tested I will respond.
โ02-23-2022 04:17 PM
I agree with drewster, it does appear to be a listing problem. Notifications will perform better at scale. Without the notification, it has to get the full list of files and compare it to the already processed list and that's what's taking a long time.
โ02-24-2022 07:44 AM
Hello again @Joseph Kambourakisโ ,
I've been working with my Cloud Engineer and the service principal and permissions are all set up.
My new configuration looks like this....
def read_stream_raw(spark: SparkSession, rawPath: str) -> DataFrame:
  """Read a stream from a specified path
 
  Parameters
  ----------
  spark : SparkSession
      A spark session 
  rawPath : str
      path to directory of files
 
  Returns
  -------
  DataFrame
      a dataframe with one column "value" type str that contains raw data for each row in raw file
  """
  kafka_schema = "value STRING"
  return (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .option("wholetext", "true")
    .option("cloudFiles.maxBytesPerTrigger", "10g")
    .option("cloudFiles.validateOptions", "true")
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.clientId", "id")
    .option("cloudFiles.clientSecret", "secret)
    .option("cloudFiles.resourceGroup", "rg")
    .option("cloudFiles.subscriptionId", "subId")
    .option("cloudFiles.tenantId", "tenId")
    .schema("value STRING")
    .load(rawPath)
  )
 The rest of the configuration is the same as above.
Note: I am not using a connection string per the documentation because I am on Databricks runtime 10.3 where connection strings are not necessary after 8.1.
We are getting this error...
ERROR MicroBatchExecution: Query rawToBronze [id = 93e1fcd7-6529-4ba0-a694-be9e67239ae1, runId = c91c00b3-0c53-43cd-8742-0a7f99c5e832] terminated with error
com.microsoft.azure.storage.StorageException: 
....
Caused by: java.net.UnknownHostException: exampleAccount.queue.core.windows.net
 Our assumptions were that the Queue would automatically get set up without a connection string given the correct Service Principal configuration and a Databricks runtime > 8.1.
Any thoughts? I am continuing to trouble shoot and will test a more primitive version on a databricks runtime < 8.1 with a connection string.
Regardless of the above outcome I need to be on runtime >10.2 for the AvailableNow Trigger.
Please any thoughts/directions would be incredibly beneficial.
Thank you!
UPDATE: I got the same error on Databricks runtime 7.3 where the connection string is required to configure autoloader. The only change I needed to make in the setup besides the runtime is change the trigger type because Once & AvailableNow didn't exist in runtime 7.3
โ02-24-2022 02:22 PM
UPDATE @Joseph Kambourakisโ
It seems that we have found that ADLS Gen2 Premium storage does not support Queue storage.
Therefore the Autoloader fails.
My Cloud Engineer stood up a standard tier storage in ADLS Gen2 and I was able to connect to it and run Autloader with the same exact configurations that initially caused it to fail.
Not sure who to ask/inform about this finding as it is not in any documentation to my knowledge.
Hopefully we can get this info out there and figure out some next steps if our assumptions are correct.
Thank you.
โ05-04-2022 09:39 AM
@Drew Ringoโ , What's happening here is that the directory is so large and it's having to do a full scan on that second batch which takes time, which should be parallelized in DBR 9.1+. I think what you need is IncrementalListing in your directory. If you have not seen it, this part of our docs should help: https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html#incremental-listing-1
I you could try cloudFiles.useIncrementalListing "true", and then manually specifying a backfill interval, like "1 day".
โ05-18-2022 06:04 AM
@Dan Zafarโ @Kaniz Fatmaโ I will be trying the recommendation by @Dan Zafarโ today.
โ06-07-2022 09:33 AM
Hi @Drew Ringoโ ,
Just a friendly follow-up. Did you have time to try Dan's recommendations? do you still need help or this recommendation helped? please let us know.
โ06-08-2022 11:56 AM
I just tested it out and my stream initialization times seem to have gone down. Can someone explain the backfill interval?
Based on the documentation located here its sounds like the backfill is almost like a full directory list at a given interval to make sure that all files have been processed.
Is this true?
If it is true then that would imply that the option is not used unless the auto loader is configured for event notification services right?
EDIT:
After I let it run for a little while after re starting the stream recs/sec tanked. I went from ~180 rec/esc to ~30 rec/sec.
I have auto compaction and auto optimization turned on as well because I dont have a partitioning column that lets me create balanced partitions.
โ06-14-2022 03:13 AM
I'm having a very similar issue, the backfill option from the documentation will check the file paths vs the meta data in checkpoints to see if everything has been read. By default backfills seem to be disabled.
The problem i'm having is that incremental directory listing just hangs, or takes forever to run post the initial write, and even if you use event notifications which seem to be really fast for updates, they don't provide 100% delivery guarantees, meaning that backfills will need to be ran often... But then we come back to the initial problem of having to do directory listing against potentially a massive directory for the backfills
โ06-25-2022 05:29 AM
Thank you for the explanation.
 
					
				
				
			
		
 
					
				
				
			
		
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now