seans
New Contributor III

Was getting errors trying to include the code. Here is my eighth attempt:

for_each_batch_partial = partial(
    for_each_batch,
    spark=spark,
    environment=config.environment,
    kinesis_options=config.kinesis_options,
    mongo_options=config.mongo_options,
    mock_target=config.mock_target,
    collection_schemas=create_collection_schema(spark, EVENT_TO_COLLECTION),
    log_level = config.log_level
)
query = (
    spark.readStream.format("kinesis")
    .options(**config.kinesis_options)
    .load()
    .writeStream.queryName("datapipe")
    .option("checkpointLocation", config.checkpoint_path)
    .foreachBatch(for_each_batch_partial)
    .start()
)