<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Spark custom data sources - SQS streaming reader [DLT] in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/spark-custom-data-sources-sqs-streaming-reader-dlt/m-p/120611#M46205</link>
    <description>&lt;P&gt;For your consideration:&lt;/P&gt;
&lt;DIV class="paragraph"&gt;To address the challenge of passing message handles from executors back to the driver within the &lt;CODE&gt;DataSourceStreamReader&lt;/CODE&gt;, consider the following approaches:&lt;/DIV&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Challenges in Spark Architecture&lt;/STRONG&gt; 1. &lt;STRONG&gt;Executor Memory Isolation&lt;/STRONG&gt;: Executors operate independently, and any in-memory data structures on the executors (e.g., &lt;CODE&gt;_SQSPartition.receipt_handles&lt;/CODE&gt;) are not directly accessible by the driver.&lt;/DIV&gt;
&lt;OL start="2"&gt;
&lt;LI&gt;&lt;STRONG&gt;Serialization Limitations&lt;/STRONG&gt;: Global or static variables won’t synchronize across JVM processes as each Spark task runs in isolation.&lt;/LI&gt;
&lt;/OL&gt;
&lt;HR /&gt;
&lt;H3&gt;&lt;STRONG&gt;Possible Solutions&lt;/STRONG&gt;&lt;/H3&gt;
&lt;DIV class="paragraph"&gt;1. &lt;STRONG&gt;Use TaskCompletionListener to Report Data to Driver&lt;/STRONG&gt; Spark's &lt;CODE&gt;TaskCompletionListener&lt;/CODE&gt; allows capturing data generated by an executor during task completion and reporting it back to the driver. Executors can collect receipt handles during processing and then send them to the driver before task finalization.&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Implementation&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Modify the &lt;CODE&gt;read()&lt;/CODE&gt; method to register a &lt;CODE&gt;TaskCompletionListener&lt;/CODE&gt;.&lt;/LI&gt;
&lt;LI&gt;Upon task completion, receipt handles are serialized and sent to the driver.&lt;/LI&gt;
&lt;LI&gt;The driver aggregates these handles for bulk message deletions after each micro-batch.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;from pyspark import TaskContext

def read(partition: _SQSPartition) -&amp;gt; Iterator[Tuple]:
  context = TaskContext.get()
  receipt_handles = []
  for message in SQS.receive_message(...):
      receipt_handles.append(message['ReceiptHandle'])
      yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
  
  def on_task_completed():
      driver_data = context.taskMetrics().addCustomData("receipt_handles", receipt_handles)
      # driver_data accessible at Driver upon completion
  
  context.addTaskCompletionListener(on_task_completed)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Advantages&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Built-in Spark mechanism.&lt;/LI&gt;
&lt;LI&gt;Receipt handles can be serialized efficiently.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Limitations&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Driver must ensure data aggregation for micro-batch commit() action.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;2. &lt;STRONG&gt;Broadcast Variables for Two-Way Communication&lt;/STRONG&gt; Broadcast variables can be used to share data between the driver and the executors. While typically used to send data &lt;STRONG&gt;from driver to executors&lt;/STRONG&gt;, you can adapt them for sending data &lt;STRONG&gt;from executors to the driver&lt;/STRONG&gt;.&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Implementation&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Use a shared collection (e.g., accumulator-like structures) to enable executors to push data (receipt handles).&lt;/LI&gt;
&lt;LI&gt;On driver-side, retrieve the aggregated handles from the shared variable.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;from pyspark import SparkContext
sc = SparkContext.getOrCreate()

receipt_accumulator = sc.accumulator([], list)

def read(partition: _SQSPartition) -&amp;gt; Iterator[Tuple]:
  for message in SQS.receive_message(...):
      receipt_accumulator.add([message['ReceiptHandle']])
      yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])

def commit(endOffset):
  receipt_handles = receipt_accumulator.value
  delete_messages_in_bulk(receipt_handles)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Advantages&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Easily aggregates receipt handles across partitions.&lt;/LI&gt;
&lt;LI&gt;Works well for micro-batch logic.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Limitations&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Requires careful synchronization to avoid performance bottlenecks.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;3. &lt;STRONG&gt;External Storage Solutions&lt;/STRONG&gt; Use external storage to pass message handles between executors and the driver. Executors write receipt handles to a temporary storage system (e.g., cloud storage like S3, Redis, or an in-memory database), and the driver retrieves and aggregates these handles upon micro-batch completion.&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Implementation&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Each executor writes the handles to an external location (e.g., an S3 bucket or Redis).&lt;/LI&gt;
&lt;LI&gt;The driver retrieves these handles during &lt;CODE&gt;commit()&lt;/CODE&gt;.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;def read(partition: _SQSPartition) -&amp;gt; Iterator[Tuple]:
    cache_key = f"partition_{partition.pid}_receipt_handles"
    receipt_handles = []
    for message in SQS.receive_message(...):
        receipt_handles.append(message['ReceiptHandle'])
        yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
    
    save_receipt_handles_to_redis(cache_key, receipt_handles)

def commit(endOffset: int):
    all_handles = collect_receipt_handles_from_redis()
    delete_messages_in_bulk(all_handles)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Advantages&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Reliable transfer mechanism independent of Spark.&lt;/LI&gt;
&lt;LI&gt;Avoids serialization limitations.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Limitations&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Addition of a dependency on external systems.&lt;/LI&gt;
&lt;LI&gt;Latency introduced by storage access.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;4. &lt;STRONG&gt;Intermediate RDD-Based Aggregation&lt;/STRONG&gt; Transform the receipt handles into an RDD during streaming processing and use Spark actions (e.g., &lt;CODE&gt;collect()&lt;/CODE&gt;) to retrieve data on the driver.&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Implementation&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Executors store handles in an RDD using Spark transformations.&lt;/LI&gt;
&lt;LI&gt;Driver invokes &lt;CODE&gt;collect()&lt;/CODE&gt; on the RDD to retrieve and use the handles in &lt;CODE&gt;commit()&lt;/CODE&gt;.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;def read(partition: _SQSPartition) -&amp;gt; Iterator[Tuple]:
    buffer = []
    for message in SQS.receive_message(...):
        buffer.append((message['ReceiptHandle'],))
        yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
    
    return sc.parallelize(buffer)

def commit(endOffset: int):
    aggregated_handles = receipt_rdd.collect()
    delete_messages_in_bulk(aggregated_handles)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Advantages&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Leverages Spark’s distributed capabilities.&lt;/LI&gt;
&lt;LI&gt;Works naturally within Spark pipelines.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Limitations&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Potential performance overhead due to &lt;CODE&gt;collect()&lt;/CODE&gt; operation.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;5. &lt;STRONG&gt;Custom CommitMessage Mechanism&lt;/STRONG&gt; Some implementations use &lt;CODE&gt;CommitMessage&lt;/CODE&gt; objects in custom data sources to pass metadata back to the driver. Within &lt;CODE&gt;read()&lt;/CODE&gt;, generate these commit messages for receipt handles and aggregate them in &lt;CODE&gt;commit()&lt;/CODE&gt;.&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Implementation&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Define a &lt;CODE&gt;CommitMessage&lt;/CODE&gt; class for receipt handles.&lt;/LI&gt;
&lt;LI&gt;Executors return &lt;CODE&gt;CommitMessage&lt;/CODE&gt; to the driver upon task completion.&lt;/LI&gt;
&lt;LI&gt;Driver acts on these messages in &lt;CODE&gt;commit()&lt;/CODE&gt;.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;class ReceiptCommitMessage:
    def __init__(self, receipt_handles):
        self.receipt_handles = receipt_handles

def read(partition: _SQSPartition) -&amp;gt; Iterator[Tuple]:
    handles = []
    for message in SQS.receive_message(...):
        handles.append(message['ReceiptHandle'])
        yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
    
    return ReceiptCommitMessage(handles)

def commit(endOffset: int):
    messages = retrieve_commit_messages()  # Aggregate all ReceiptCommitMessages
    receipt_handles = [msg.receipt_handles for msg in messages]
    delete_messages_in_bulk(receipt_handles)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Advantages&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Provides clear lifecycle management for receipt handles.&lt;/LI&gt;
&lt;LI&gt;Directly integrates with Spark’s &lt;CODE&gt;commit()&lt;/CODE&gt; logic.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Limitations&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Requires additional engineering complexity.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Recommendation&lt;/STRONG&gt; For simplicity and reliability: - &lt;STRONG&gt;Small-scale environments&lt;/STRONG&gt;: Use &lt;CODE&gt;TaskCompletionListener&lt;/CODE&gt; or RDD-based aggregation (&lt;CODE&gt;collect()&lt;/CODE&gt;). - &lt;STRONG&gt;Large-scale environments&lt;/STRONG&gt;: Opt for external storage (e.g., Redis or S3) to ensure scalability and robustness.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Cheers, Lou.&lt;/DIV&gt;</description>
    <pubDate>Fri, 30 May 2025 13:21:51 GMT</pubDate>
    <dc:creator>Louis_Frolio</dc:creator>
    <dc:date>2025-05-30T13:21:51Z</dc:date>
    <item>
      <title>Spark custom data sources - SQS streaming reader [DLT]</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-custom-data-sources-sqs-streaming-reader-dlt/m-p/120603#M46200</link>
      <description>&lt;DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;Hey,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;I’m working on pulling data from AWS SQS into Databricks using Spark custom data sources and DLT (see &lt;A href="https://docs.databricks.com/aws/en/pyspark/datasources" target="_blank" rel="noopener"&gt;https://docs.databricks.com/aws/en/pyspark/datasources&lt;/A&gt;). I started with a batch reader/writer based on this example: &lt;A href="https://medium.com/@zcking/spark-4-tour-of-custom-data-sources-with-sqs-e2151fe30a22" target="_blank" rel="noopener"&gt;https://medium.com/@zcking/spark-4-tour-of-custom-data-sources-with-sqs-e2151fe30a22&lt;/A&gt;. In batch mode, I delete messages in the writer after they’ve been read, but in streaming &lt;U&gt;&lt;STRONG&gt;with DLT&lt;/STRONG&gt;&lt;/U&gt; we only get the reader.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;With DLT we can’t use a writer, only the DataSourceStreamReader. I tried to implement commit() to delete messages in bulk after each micro-batch using multiple partitions, but I ran into a problem: I can’t collect the handles that read() sees on the executors back on the driver.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;Here’s a sketch of what I’ve got:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;CLASS _SQSPartition (extends InputPartition):&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; ATTRIBUTES:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; pid &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # partition ID&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; receipt_handles # list to collect handles in read()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;CLASS SQSStreamReader (implements DataSourceStreamReader):&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; METHOD &lt;/SPAN&gt;&lt;SPAN&gt;__init__&lt;/SPAN&gt;&lt;SPAN&gt;(schema, options):&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Save schema and options&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Extract queue_url, AWS creds/region&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Extract tuning knobs: max_messages, wait_time_seconds, visibility_timeout, num_partitions&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; METHOD initialOffset() → int:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Return -1&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; METHOD latestOffset() → int:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Increment and return internal batch counter&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; METHOD partitions(startOffset, endOffset) → List[&lt;/SPAN&gt;&lt;SPAN&gt;_SQSPartition&lt;/SPAN&gt;&lt;SPAN&gt;]:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Create num_partitions new _SQSPartition objects&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Store them in self._current_partitions&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Return that list&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; METHOD read(partition: _SQSPartition) → Iterator[&lt;/SPAN&gt;&lt;SPAN&gt;Tuple&lt;/SPAN&gt;&lt;SPAN&gt;]:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Runs on each executor&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Call SQS.receive_message(...)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • For each message:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; – partition.receipt_handles.append(ReceiptHandle)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; – yield (MessageId, ReceiptHandle, MD5OfBody, Body)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; METHOD commit(endOffset: int):&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Runs on the driver&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Cannot access partition.receipt_handles because those lists live in executor memory&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • Therefore you cannot gather handles in commit() to delete them&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; METHOD stop():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; • No-op&lt;BR /&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Why this fails&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Each executor JVM mutates its own _SQSPartition.receipt_handles. The drivers commit() runs in a separate JVM and has no way to see those in-memory lists. Even a class-level (static) dict won’t synchronize across executors, because each Spark task runs in its own process.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;Any suggestions on how best to pass message handles from executors back to the driver within DataSourceStreamReader?&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Fri, 30 May 2025 10:03:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-custom-data-sources-sqs-streaming-reader-dlt/m-p/120603#M46200</guid>
      <dc:creator>Pat</dc:creator>
      <dc:date>2025-05-30T10:03:25Z</dc:date>
    </item>
    <item>
      <title>Re: Spark custom data sources - SQS streaming reader [DLT]</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-custom-data-sources-sqs-streaming-reader-dlt/m-p/120611#M46205</link>
      <description>&lt;P&gt;For your consideration:&lt;/P&gt;
&lt;DIV class="paragraph"&gt;To address the challenge of passing message handles from executors back to the driver within the &lt;CODE&gt;DataSourceStreamReader&lt;/CODE&gt;, consider the following approaches:&lt;/DIV&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Challenges in Spark Architecture&lt;/STRONG&gt; 1. &lt;STRONG&gt;Executor Memory Isolation&lt;/STRONG&gt;: Executors operate independently, and any in-memory data structures on the executors (e.g., &lt;CODE&gt;_SQSPartition.receipt_handles&lt;/CODE&gt;) are not directly accessible by the driver.&lt;/DIV&gt;
&lt;OL start="2"&gt;
&lt;LI&gt;&lt;STRONG&gt;Serialization Limitations&lt;/STRONG&gt;: Global or static variables won’t synchronize across JVM processes as each Spark task runs in isolation.&lt;/LI&gt;
&lt;/OL&gt;
&lt;HR /&gt;
&lt;H3&gt;&lt;STRONG&gt;Possible Solutions&lt;/STRONG&gt;&lt;/H3&gt;
&lt;DIV class="paragraph"&gt;1. &lt;STRONG&gt;Use TaskCompletionListener to Report Data to Driver&lt;/STRONG&gt; Spark's &lt;CODE&gt;TaskCompletionListener&lt;/CODE&gt; allows capturing data generated by an executor during task completion and reporting it back to the driver. Executors can collect receipt handles during processing and then send them to the driver before task finalization.&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Implementation&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Modify the &lt;CODE&gt;read()&lt;/CODE&gt; method to register a &lt;CODE&gt;TaskCompletionListener&lt;/CODE&gt;.&lt;/LI&gt;
&lt;LI&gt;Upon task completion, receipt handles are serialized and sent to the driver.&lt;/LI&gt;
&lt;LI&gt;The driver aggregates these handles for bulk message deletions after each micro-batch.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;from pyspark import TaskContext

def read(partition: _SQSPartition) -&amp;gt; Iterator[Tuple]:
  context = TaskContext.get()
  receipt_handles = []
  for message in SQS.receive_message(...):
      receipt_handles.append(message['ReceiptHandle'])
      yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
  
  def on_task_completed():
      driver_data = context.taskMetrics().addCustomData("receipt_handles", receipt_handles)
      # driver_data accessible at Driver upon completion
  
  context.addTaskCompletionListener(on_task_completed)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Advantages&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Built-in Spark mechanism.&lt;/LI&gt;
&lt;LI&gt;Receipt handles can be serialized efficiently.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Limitations&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Driver must ensure data aggregation for micro-batch commit() action.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;2. &lt;STRONG&gt;Broadcast Variables for Two-Way Communication&lt;/STRONG&gt; Broadcast variables can be used to share data between the driver and the executors. While typically used to send data &lt;STRONG&gt;from driver to executors&lt;/STRONG&gt;, you can adapt them for sending data &lt;STRONG&gt;from executors to the driver&lt;/STRONG&gt;.&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Implementation&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Use a shared collection (e.g., accumulator-like structures) to enable executors to push data (receipt handles).&lt;/LI&gt;
&lt;LI&gt;On driver-side, retrieve the aggregated handles from the shared variable.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;from pyspark import SparkContext
sc = SparkContext.getOrCreate()

receipt_accumulator = sc.accumulator([], list)

def read(partition: _SQSPartition) -&amp;gt; Iterator[Tuple]:
  for message in SQS.receive_message(...):
      receipt_accumulator.add([message['ReceiptHandle']])
      yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])

def commit(endOffset):
  receipt_handles = receipt_accumulator.value
  delete_messages_in_bulk(receipt_handles)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Advantages&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Easily aggregates receipt handles across partitions.&lt;/LI&gt;
&lt;LI&gt;Works well for micro-batch logic.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Limitations&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Requires careful synchronization to avoid performance bottlenecks.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;3. &lt;STRONG&gt;External Storage Solutions&lt;/STRONG&gt; Use external storage to pass message handles between executors and the driver. Executors write receipt handles to a temporary storage system (e.g., cloud storage like S3, Redis, or an in-memory database), and the driver retrieves and aggregates these handles upon micro-batch completion.&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Implementation&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Each executor writes the handles to an external location (e.g., an S3 bucket or Redis).&lt;/LI&gt;
&lt;LI&gt;The driver retrieves these handles during &lt;CODE&gt;commit()&lt;/CODE&gt;.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;def read(partition: _SQSPartition) -&amp;gt; Iterator[Tuple]:
    cache_key = f"partition_{partition.pid}_receipt_handles"
    receipt_handles = []
    for message in SQS.receive_message(...):
        receipt_handles.append(message['ReceiptHandle'])
        yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
    
    save_receipt_handles_to_redis(cache_key, receipt_handles)

def commit(endOffset: int):
    all_handles = collect_receipt_handles_from_redis()
    delete_messages_in_bulk(all_handles)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Advantages&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Reliable transfer mechanism independent of Spark.&lt;/LI&gt;
&lt;LI&gt;Avoids serialization limitations.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Limitations&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Addition of a dependency on external systems.&lt;/LI&gt;
&lt;LI&gt;Latency introduced by storage access.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;4. &lt;STRONG&gt;Intermediate RDD-Based Aggregation&lt;/STRONG&gt; Transform the receipt handles into an RDD during streaming processing and use Spark actions (e.g., &lt;CODE&gt;collect()&lt;/CODE&gt;) to retrieve data on the driver.&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Implementation&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Executors store handles in an RDD using Spark transformations.&lt;/LI&gt;
&lt;LI&gt;Driver invokes &lt;CODE&gt;collect()&lt;/CODE&gt; on the RDD to retrieve and use the handles in &lt;CODE&gt;commit()&lt;/CODE&gt;.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;def read(partition: _SQSPartition) -&amp;gt; Iterator[Tuple]:
    buffer = []
    for message in SQS.receive_message(...):
        buffer.append((message['ReceiptHandle'],))
        yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
    
    return sc.parallelize(buffer)

def commit(endOffset: int):
    aggregated_handles = receipt_rdd.collect()
    delete_messages_in_bulk(aggregated_handles)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Advantages&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Leverages Spark’s distributed capabilities.&lt;/LI&gt;
&lt;LI&gt;Works naturally within Spark pipelines.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Limitations&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Potential performance overhead due to &lt;CODE&gt;collect()&lt;/CODE&gt; operation.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;5. &lt;STRONG&gt;Custom CommitMessage Mechanism&lt;/STRONG&gt; Some implementations use &lt;CODE&gt;CommitMessage&lt;/CODE&gt; objects in custom data sources to pass metadata back to the driver. Within &lt;CODE&gt;read()&lt;/CODE&gt;, generate these commit messages for receipt handles and aggregate them in &lt;CODE&gt;commit()&lt;/CODE&gt;.&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Implementation&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Define a &lt;CODE&gt;CommitMessage&lt;/CODE&gt; class for receipt handles.&lt;/LI&gt;
&lt;LI&gt;Executors return &lt;CODE&gt;CommitMessage&lt;/CODE&gt; to the driver upon task completion.&lt;/LI&gt;
&lt;LI&gt;Driver acts on these messages in &lt;CODE&gt;commit()&lt;/CODE&gt;.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;class ReceiptCommitMessage:
    def __init__(self, receipt_handles):
        self.receipt_handles = receipt_handles

def read(partition: _SQSPartition) -&amp;gt; Iterator[Tuple]:
    handles = []
    for message in SQS.receive_message(...):
        handles.append(message['ReceiptHandle'])
        yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
    
    return ReceiptCommitMessage(handles)

def commit(endOffset: int):
    messages = retrieve_commit_messages()  # Aggregate all ReceiptCommitMessages
    receipt_handles = [msg.receipt_handles for msg in messages]
    delete_messages_in_bulk(receipt_handles)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Advantages&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Provides clear lifecycle management for receipt handles.&lt;/LI&gt;
&lt;LI&gt;Directly integrates with Spark’s &lt;CODE&gt;commit()&lt;/CODE&gt; logic.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Limitations&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Requires additional engineering complexity.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Recommendation&lt;/STRONG&gt; For simplicity and reliability: - &lt;STRONG&gt;Small-scale environments&lt;/STRONG&gt;: Use &lt;CODE&gt;TaskCompletionListener&lt;/CODE&gt; or RDD-based aggregation (&lt;CODE&gt;collect()&lt;/CODE&gt;). - &lt;STRONG&gt;Large-scale environments&lt;/STRONG&gt;: Opt for external storage (e.g., Redis or S3) to ensure scalability and robustness.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Cheers, Lou.&lt;/DIV&gt;</description>
      <pubDate>Fri, 30 May 2025 13:21:51 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-custom-data-sources-sqs-streaming-reader-dlt/m-p/120611#M46205</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-05-30T13:21:51Z</dc:date>
    </item>
  </channel>
</rss>

