Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-13-2024 07:17 AM
Was getting errors trying to include the code. Here is my eighth attempt:
for_each_batch_partial = partial(
for_each_batch,
spark=spark,
environment=config.environment,
kinesis_options=config.kinesis_options,
mongo_options=config.mongo_options,
mock_target=config.mock_target,
collection_schemas=create_collection_schema(spark, EVENT_TO_COLLECTION),
log_level = config.log_level
)
query = (
spark.readStream.format("kinesis")
.options(**config.kinesis_options)
.load()
.writeStream.queryName("datapipe")
.option("checkpointLocation", config.checkpoint_path)
.foreachBatch(for_each_batch_partial)
.start()
)