cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Spark custom data sources - SQS streaming reader [DLT]

Pat
Esteemed Contributor

Hey,

I’m working on pulling data from AWS SQS into Databricks using Spark custom data sources and DLT (see https://docs.databricks.com/aws/en/pyspark/datasources). I started with a batch reader/writer based on this example: https://medium.com/@zcking/spark-4-tour-of-custom-data-sources-with-sqs-e2151fe30a22. In batch mode, I delete messages in the writer after they’ve been read, but in streaming with DLT we only get the reader.

With DLT we can’t use a writer, only the DataSourceStreamReader. I tried to implement commit() to delete messages in bulk after each micro-batch using multiple partitions, but I ran into a problem: I can’t collect the handles that read() sees on the executors back on the driver.

Here’s a sketch of what I’ve got:

CLASS _SQSPartition (extends InputPartition):
  ATTRIBUTES:
    pid             # partition ID
    receipt_handles # list to collect handles in read()

CLASS SQSStreamReader (implements DataSourceStreamReader):

  METHOD __init__(schema, options):
    • Save schema and options
    • Extract queue_url, AWS creds/region
    • Extract tuning knobs: max_messages, wait_time_seconds, visibility_timeout, num_partitions

  METHOD initialOffset() → int:
    • Return -1

  METHOD latestOffset() → int:
    • Increment and return internal batch counter

  METHOD partitions(startOffset, endOffset) → List[_SQSPartition]:
    • Create num_partitions new _SQSPartition objects
    • Store them in self._current_partitions
    • Return that list

  METHOD read(partition: _SQSPartition) → Iterator[Tuple]:
    • Runs on each executor
    • Call SQS.receive_message(...)
    • For each message:
        – partition.receipt_handles.append(ReceiptHandle)
        – yield (MessageId, ReceiptHandle, MD5OfBody, Body)

  METHOD commit(endOffset: int):
    • Runs on the driver
    • Cannot access partition.receipt_handles because those lists live in executor memory
    • Therefore you cannot gather handles in commit() to delete them

  METHOD stop():
    • No-op

Why this fails
Each executor JVM mutates its own _SQSPartition.receipt_handles. The drivers commit() runs in a separate JVM and has no way to see those in-memory lists. Even a class-level (static) dict won’t synchronize across executors, because each Spark task runs in its own process.

Any suggestions on how best to pass message handles from executors back to the driver within DataSourceStreamReader?
1 REPLY 1

BigRoux
Databricks Employee
Databricks Employee

For your consideration:

To address the challenge of passing message handles from executors back to the driver within the DataSourceStreamReader, consider the following approaches:

Challenges in Spark Architecture 1. Executor Memory Isolation: Executors operate independently, and any in-memory data structures on the executors (e.g., _SQSPartition.receipt_handles) are not directly accessible by the driver.
  1. Serialization Limitations: Global or static variables won’t synchronize across JVM processes as each Spark task runs in isolation.

Possible Solutions

1. Use TaskCompletionListener to Report Data to Driver Spark's TaskCompletionListener allows capturing data generated by an executor during task completion and reporting it back to the driver. Executors can collect receipt handles during processing and then send them to the driver before task finalization.
  • Implementation:
    • Modify the read() method to register a TaskCompletionListener.
    • Upon task completion, receipt handles are serialized and sent to the driver.
    • The driver aggregates these handles for bulk message deletions after each micro-batch.
    from pyspark import TaskContext
    
    def read(partition: _SQSPartition) -> Iterator[Tuple]:
      context = TaskContext.get()
      receipt_handles = []
      for message in SQS.receive_message(...):
          receipt_handles.append(message['ReceiptHandle'])
          yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
      
      def on_task_completed():
          driver_data = context.taskMetrics().addCustomData("receipt_handles", receipt_handles)
          # driver_data accessible at Driver upon completion
      
      context.addTaskCompletionListener(on_task_completed)
  • Advantages:
    • Built-in Spark mechanism.
    • Receipt handles can be serialized efficiently.
  • Limitations:
    • Driver must ensure data aggregation for micro-batch commit() action.

2. Broadcast Variables for Two-Way Communication Broadcast variables can be used to share data between the driver and the executors. While typically used to send data from driver to executors, you can adapt them for sending data from executors to the driver.
  • Implementation:
    • Use a shared collection (e.g., accumulator-like structures) to enable executors to push data (receipt handles).
    • On driver-side, retrieve the aggregated handles from the shared variable.
    from pyspark import SparkContext
    sc = SparkContext.getOrCreate()
    
    receipt_accumulator = sc.accumulator([], list)
    
    def read(partition: _SQSPartition) -> Iterator[Tuple]:
      for message in SQS.receive_message(...):
          receipt_accumulator.add([message['ReceiptHandle']])
          yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
    
    def commit(endOffset):
      receipt_handles = receipt_accumulator.value
      delete_messages_in_bulk(receipt_handles)
  • Advantages:
    • Easily aggregates receipt handles across partitions.
    • Works well for micro-batch logic.
  • Limitations:
    • Requires careful synchronization to avoid performance bottlenecks.

3. External Storage Solutions Use external storage to pass message handles between executors and the driver. Executors write receipt handles to a temporary storage system (e.g., cloud storage like S3, Redis, or an in-memory database), and the driver retrieves and aggregates these handles upon micro-batch completion.
  • Implementation:
    • Each executor writes the handles to an external location (e.g., an S3 bucket or Redis).
    • The driver retrieves these handles during commit().
    def read(partition: _SQSPartition) -> Iterator[Tuple]:
        cache_key = f"partition_{partition.pid}_receipt_handles"
        receipt_handles = []
        for message in SQS.receive_message(...):
            receipt_handles.append(message['ReceiptHandle'])
            yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
        
        save_receipt_handles_to_redis(cache_key, receipt_handles)
    
    def commit(endOffset: int):
        all_handles = collect_receipt_handles_from_redis()
        delete_messages_in_bulk(all_handles)
  • Advantages:
    • Reliable transfer mechanism independent of Spark.
    • Avoids serialization limitations.
  • Limitations:
    • Addition of a dependency on external systems.
    • Latency introduced by storage access.

4. Intermediate RDD-Based Aggregation Transform the receipt handles into an RDD during streaming processing and use Spark actions (e.g., collect()) to retrieve data on the driver.
  • Implementation:
    • Executors store handles in an RDD using Spark transformations.
    • Driver invokes collect() on the RDD to retrieve and use the handles in commit().
    def read(partition: _SQSPartition) -> Iterator[Tuple]:
        buffer = []
        for message in SQS.receive_message(...):
            buffer.append((message['ReceiptHandle'],))
            yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
        
        return sc.parallelize(buffer)
    
    def commit(endOffset: int):
        aggregated_handles = receipt_rdd.collect()
        delete_messages_in_bulk(aggregated_handles)
  • Advantages:
    • Leverages Spark’s distributed capabilities.
    • Works naturally within Spark pipelines.
  • Limitations:
    • Potential performance overhead due to collect() operation.

5. Custom CommitMessage Mechanism Some implementations use CommitMessage objects in custom data sources to pass metadata back to the driver. Within read(), generate these commit messages for receipt handles and aggregate them in commit().
  • Implementation:
    • Define a CommitMessage class for receipt handles.
    • Executors return CommitMessage to the driver upon task completion.
    • Driver acts on these messages in commit().
    class ReceiptCommitMessage:
        def __init__(self, receipt_handles):
            self.receipt_handles = receipt_handles
    
    def read(partition: _SQSPartition) -> Iterator[Tuple]:
        handles = []
        for message in SQS.receive_message(...):
            handles.append(message['ReceiptHandle'])
            yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
        
        return ReceiptCommitMessage(handles)
    
    def commit(endOffset: int):
        messages = retrieve_commit_messages()  # Aggregate all ReceiptCommitMessages
        receipt_handles = [msg.receipt_handles for msg in messages]
        delete_messages_in_bulk(receipt_handles)
  • Advantages:
    • Provides clear lifecycle management for receipt handles.
    • Directly integrates with Spark’s commit() logic.
  • Limitations:
    • Requires additional engineering complexity.

Recommendation For simplicity and reliability: - Small-scale environments: Use TaskCompletionListener or RDD-based aggregation (collect()). - Large-scale environments: Opt for external storage (e.g., Redis or S3) to ensure scalability and robustness.
 
Cheers, Lou.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now