I want to create a streaming job, that reads messages from a folder within TXT files, does the parsing, some processing, and appends the result into one of 3 possible delta tables depending on the parse result. There is a parse_failed table, an unknwon_msgs table, and a parsed_msgs table.
Reading is done with
sdf = spark.readStream.text(path=path_input, lineSep="\n\n", pathGlobFilter="*.txt", recursiveFileLookup=True)
and writing with
x = sdf.writeStream.foreachBatch(process_microbatch).start()
where process_microbatch is
def process_microbatch(self, batch_df: DataFrame, batch_id: int) -> None:
"""Processing of newly arrived messages. For each message replicate it if needed, and execute the parse_msg_proxy on each."""
batch_df.rdd.flatMap(lambda msg: replicate_msg(msg)).map(lambda msg: parse_msg_proxy(msg))
and where parse_msg_proxy is
def parse_msg_proxy(self, msg: str) -> None:
try:
parsed_msg = parse_message(msg, element_mapping)
# do some processing
# create df_msg dataframe from parsed_msg
df_msg.write.format("delta").mode("append").save(path_parsed_msgs)
except ParseException as e:
spark.createDataFrame([{'msg': parsed_msg, 'error': str(e)}]).write.format("delta").mode("append").save(path_parse_errors)
raise Exception("Parse error occured.")
except UnknownMsgTypeException:
spark.createDataFrame([{'msg': parsed_msg}]).write.format("delta").mode("append").save(path_unknown_msgs)
The streaming job starts without error message, but the delta tables are not created. Whats wrong? Maybe an action operations are missing, like collect()? Where should I put it?