Hey,
With DLT we can’t use a writer, only the DataSourceStreamReader. I tried to implement commit() to delete messages in bulk after each micro-batch using multiple partitions, but I ran into a problem: I can’t collect the handles that read() sees on the executors back on the driver.
Here’s a sketch of what I’ve got:
CLASS _SQSPartition (extends InputPartition):
ATTRIBUTES:
pid # partition ID
receipt_handles # list to collect handles in read()
CLASS SQSStreamReader (implements DataSourceStreamReader):
METHOD __init__(schema, options):
• Save schema and options
• Extract queue_url, AWS creds/region
• Extract tuning knobs: max_messages, wait_time_seconds, visibility_timeout, num_partitions
METHOD initialOffset() → int:
• Return -1
METHOD latestOffset() → int:
• Increment and return internal batch counter
METHOD partitions(startOffset, endOffset) → List[_SQSPartition]:
• Create num_partitions new _SQSPartition objects
• Store them in self._current_partitions
• Return that list
METHOD read(partition: _SQSPartition) → Iterator[Tuple]:
• Runs on each executor
• Call SQS.receive_message(...)
• For each message:
– partition.receipt_handles.append(ReceiptHandle)
– yield (MessageId, ReceiptHandle, MD5OfBody, Body)
METHOD commit(endOffset: int):
• Runs on the driver
• Cannot access partition.receipt_handles because those lists live in executor memory
• Therefore you cannot gather handles in commit() to delete them
METHOD stop():
• No-op
Why this fails
Each executor JVM mutates its own _SQSPartition.receipt_handles. The drivers commit() runs in a separate JVM and has no way to see those in-memory lists. Even a class-level (static) dict won’t synchronize across executors, because each Spark task runs in its own process.
Any suggestions on how best to pass message handles from executors back to the driver within DataSourceStreamReader?