ā05-30-2025 03:03 AM
ā05-30-2025 06:21 AM
For your consideration:
DataSourceStreamReader
, consider the following approaches:_SQSPartition.receipt_handles
) are not directly accessible by the driver.TaskCompletionListener
allows capturing data generated by an executor during task completion and reporting it back to the driver. Executors can collect receipt handles during processing and then send them to the driver before task finalization.read()
method to register a TaskCompletionListener
.from pyspark import TaskContext
def read(partition: _SQSPartition) -> Iterator[Tuple]:
context = TaskContext.get()
receipt_handles = []
for message in SQS.receive_message(...):
receipt_handles.append(message['ReceiptHandle'])
yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
def on_task_completed():
driver_data = context.taskMetrics().addCustomData("receipt_handles", receipt_handles)
# driver_data accessible at Driver upon completion
context.addTaskCompletionListener(on_task_completed)
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
receipt_accumulator = sc.accumulator([], list)
def read(partition: _SQSPartition) -> Iterator[Tuple]:
for message in SQS.receive_message(...):
receipt_accumulator.add([message['ReceiptHandle']])
yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
def commit(endOffset):
receipt_handles = receipt_accumulator.value
delete_messages_in_bulk(receipt_handles)
commit()
.def read(partition: _SQSPartition) -> Iterator[Tuple]:
cache_key = f"partition_{partition.pid}_receipt_handles"
receipt_handles = []
for message in SQS.receive_message(...):
receipt_handles.append(message['ReceiptHandle'])
yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
save_receipt_handles_to_redis(cache_key, receipt_handles)
def commit(endOffset: int):
all_handles = collect_receipt_handles_from_redis()
delete_messages_in_bulk(all_handles)
collect()
) to retrieve data on the driver.collect()
on the RDD to retrieve and use the handles in commit()
.def read(partition: _SQSPartition) -> Iterator[Tuple]:
buffer = []
for message in SQS.receive_message(...):
buffer.append((message['ReceiptHandle'],))
yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
return sc.parallelize(buffer)
def commit(endOffset: int):
aggregated_handles = receipt_rdd.collect()
delete_messages_in_bulk(aggregated_handles)
collect()
operation.CommitMessage
objects in custom data sources to pass metadata back to the driver. Within read()
, generate these commit messages for receipt handles and aggregate them in commit()
.CommitMessage
class for receipt handles.CommitMessage
to the driver upon task completion.commit()
.class ReceiptCommitMessage:
def __init__(self, receipt_handles):
self.receipt_handles = receipt_handles
def read(partition: _SQSPartition) -> Iterator[Tuple]:
handles = []
for message in SQS.receive_message(...):
handles.append(message['ReceiptHandle'])
yield (message['MessageId'], message['ReceiptHandle'], message['MD5OfBody'], message['Body'])
return ReceiptCommitMessage(handles)
def commit(endOffset: int):
messages = retrieve_commit_messages() # Aggregate all ReceiptCommitMessages
receipt_handles = [msg.receipt_handles for msg in messages]
delete_messages_in_bulk(receipt_handles)
commit()
logic.TaskCompletionListener
or RDD-based aggregation (collect()
). - Large-scale environments: Opt for external storage (e.g., Redis or S3) to ensure scalability and robustness.Passionate about hosting events and connecting people? Help us grow a vibrant local communityāsign up today to get started!
Sign Up Now