Hey,
With DLT we canโt use a writer, only the DataSourceStreamReader. I tried to implement commit() to delete messages in bulk after each micro-batch using multiple partitions, but I ran into a problem: I canโt collect the handles that read() sees on the executors back on the driver.
Hereโs a sketch of what Iโve got:
CLASS _SQSPartition (extends InputPartition):
ATTRIBUTES:
pid # partition ID
receipt_handles # list to collect handles in read()
CLASS SQSStreamReader (implements DataSourceStreamReader):
METHOD __init__(schema, options):
โข Save schema and options
โข Extract queue_url, AWS creds/region
โข Extract tuning knobs: max_messages, wait_time_seconds, visibility_timeout, num_partitions
METHOD initialOffset() โ int:
โข Return -1
METHOD latestOffset() โ int:
โข Increment and return internal batch counter
METHOD partitions(startOffset, endOffset) โ List[_SQSPartition]:
โข Create num_partitions new _SQSPartition objects
โข Store them in self._current_partitions
โข Return that list
METHOD read(partition: _SQSPartition) โ Iterator[Tuple]:
โข Runs on each executor
โข Call SQS.receive_message(...)
โข For each message:
โ partition.receipt_handles.append(ReceiptHandle)
โ yield (MessageId, ReceiptHandle, MD5OfBody, Body)
METHOD commit(endOffset: int):
โข Runs on the driver
โข Cannot access partition.receipt_handles because those lists live in executor memory
โข Therefore you cannot gather handles in commit() to delete them
METHOD stop():
โข No-op
Why this fails
Each executor JVM mutates its own _SQSPartition.receipt_handles. The drivers commit() runs in a separate JVM and has no way to see those in-memory lists. Even a class-level (static) dict wonโt synchronize across executors, because each Spark task runs in its own process.
Any suggestions on how best to pass message handles from executors back to the driver within DataSourceStreamReader?