cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Sink is not written into delta table in Spark structured streaming

bayerb
New Contributor

I want to create a streaming job, that reads messages from a folder within TXT files, does the parsing, some processing, and appends the result into one of 3 possible delta tables depending on the parse result. There is a parse_failed table, an unknwon_msgs table, and a parsed_msgs table.

Reading is done with

 

sdf = spark.readStream.text(path=path_input, lineSep="\n\n", pathGlobFilter="*.txt", recursiveFileLookup=True)

 

and writing with

 

x = sdf.writeStream.foreachBatch(process_microbatch).start()

 

where process_microbatch is

 

def process_microbatch(self, batch_df: DataFrame, batch_id: int) -> None:
    """Processing of newly arrived messages. For each message replicate it if needed, and execute the parse_msg_proxy on each."""
    batch_df.rdd.flatMap(lambda msg: replicate_msg(msg)).map(lambda msg: parse_msg_proxy(msg))

 

and where parse_msg_proxy is

 

def parse_msg_proxy(self, msg: str) -> None:
        try:
            parsed_msg = parse_message(msg, element_mapping)
            # do some processing
            # create df_msg dataframe from parsed_msg
            df_msg.write.format("delta").mode("append").save(path_parsed_msgs)
        except ParseException as e:
            spark.createDataFrame([{'msg': parsed_msg, 'error': str(e)}]).write.format("delta").mode("append").save(path_parse_errors)
            raise Exception("Parse error occured.")
        except UnknownMsgTypeException:
            spark.createDataFrame([{'msg': parsed_msg}]).write.format("delta").mode("append").save(path_unknown_msgs)

 

The streaming job starts without error message, but the delta tables are not created. Whats wrong? Maybe an action operations are missing, like collect()? Where should I put it?

2 REPLIES 2

Lakshay
Esteemed Contributor
Esteemed Contributor

There doesn't seem to any issue with code. But log needs to be analysed to get a clue of what is the issue. Could you please create a support ticket.

Kaniz
Community Manager
Community Manager

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 
 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.