I am running a massive history of about 250gb ~6mil phone call transcriptions (json read in as raw text) from a raw -> bronze pipeline in Azure Databricks using pyspark.
The source is mounted storage and is continuously having files added and we do not delete/archive at the source.
I am using Autoloader and Trigger Available Now. (see code below)
This is scaling well and I am able to process all the files given the current configuration in under 2 hours.
The trigger Available Now breaks up the massive history batch well for my cluster size.
I start running into issues when I kick off the stream again AFTER the history has completed.
The microbatch execution log states that my latestOffset part of my trigger takes approximately 4,140,000 or 69 minutes just to get the offsets.
Once that insane offset time is finished the addBatch only takes a couple seconds to append to the destination.
Based on my cluster and configuration I can process around 1300 rec/sec until history (~6 mil files) is completed but as soon as I start the second batch the stream gets caught up reading the latest offset and I process at less than 1 rec/sec.
I have tried a multitude of configurations aimlessly to see if it solves the problem but no avail and there does not seem to be others with this issue so my last resort is posting here.
One thing to note is that based on the data I do not have a well balanced column to partition on and don't need one for down stream transformation so a solution with or without one will work for me.
Here is the current configuration of the read and write streams....
# Stream Read Function
def read_stream_raw(spark: SparkSession, rawPath: str) -> DataFrame:
"""Read a stream from a specified path
Parameters
----------
spark : SparkSession
A spark session
rawPath : str
path to directory of files
Returns
-------
DataFrame
a dataframe with one column "value" type str that contains raw data for each row in raw file
"""
kafka_schema = "value STRING"
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.option("wholetext", "true")
.option("cloudFiles.maxBytesPerTrigger", "10g")
.schema("value STRING")
.load(rawPath)
)
rawDF = read_stream_raw(spark, rawPath=landingPath)
# Transformation
def transform_raw_to_bronze(spark: SparkSession, bronze: DataFrame) -> DataFrame:
"""Read a stream from a specified path and tag some metadata
Parameters
----------
spark : SparkSession
A spark session
bronze : DataFrame
A spark df
Returns
-------
DataFrame
a dataframe with extra columns tagging more information
"""
df = (
bronze
.select(
lit("/my/cloud/storage").alias("datasource"),
current_timestamp().alias("ingesttime"),
"value",
current_timestamp().cast("date").alias("ingestdate")
)
.withColumn("input_filename", input_file_name())
)
return df
bronzeDF = transform_raw_to_bronze(spark, rawDF)
def create_stream_writer(
dataframe: DataFrame,
checkpoint: str,
name: str,
partition_column: str = None,
mode: str = "append",
) -> DataStreamWriter:
"""Write stream to specified path
Parameters
----------
dataframe: DataFrame
A spark dataframe
checkpoint: str
unique checkpoint location
name: str
unique identifying name of the stream
partition_column: str = None
column to partition the stream by
mode: str = "append"
outout mode of the stream
Returns
-------
StreamWriter
an active stream
"""
stream_writer = (
dataframe
.writeStream
.format("delta")
.outputMode(mode)
.option("checkpointLocation", checkpoint)
.queryName(name)
.trigger(availableNow=True)
)
if partition_column is not None:
return stream_writer.partitionBy(partition_column)
return stream_writer
rawToBronzeWriter = create_stream_writer(
dataframe=bronzeDF,
checkpoint=bronzeCheckpoint,
mode="append",
name="rawToBronze"
)
stream = rawToBronzeWriter.start(bronzePath)