Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
Sink is not written into delta table in Spark structured streaming

I want to create a streaming job, that reads messages from a folder within TXT files, does the parsing, some processing, and appends the result into one of 3 possible delta tables depending on the parse result. There is a parse_failed table, an unknwon_msgs table, and a parsed_msgs table.

Reading is done with


sdf = spark.readStream.text(path=path_input, lineSep="\n\n", pathGlobFilter="*.txt", recursiveFileLookup=True)


and writing with


x = sdf.writeStream.foreachBatch(process_microbatch).start()


where process_microbatch is


def process_microbatch(self, batch_df: DataFrame, batch_id: int) -> None:
    """Processing of newly arrived messages. For each message replicate it if needed, and execute the parse_msg_proxy on each."""
    batch_df.rdd.flatMap(lambda msg: replicate_msg(msg)).map(lambda msg: parse_msg_proxy(msg))


and where parse_msg_proxy is


def parse_msg_proxy(self, msg: str) -> None:
            parsed_msg = parse_message(msg, element_mapping)
            # do some processing
            # create df_msg dataframe from parsed_msg
        except ParseException as e:
            spark.createDataFrame([{'msg': parsed_msg, 'error': str(e)}]).write.format("delta").mode("append").save(path_parse_errors)
            raise Exception("Parse error occured.")
        except UnknownMsgTypeException:
            spark.createDataFrame([{'msg': parsed_msg}]).write.format("delta").mode("append").save(path_unknown_msgs)


The streaming job starts without error message, but the delta tables are not created. Whats wrong? Maybe an action operations are missing, like collect()? Where should I put it?


There doesn't seem to any issue with code. But log needs to be analysed to get a clue of what is the issue. Could you please create a support ticket.

