<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Sink is not written into delta table in Spark structured streaming in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/sink-is-not-written-into-delta-table-in-spark-structured/m-p/58078#M31013</link>
    <description>&lt;P&gt;I want to create a streaming job, that reads messages from a folder within TXT files, does the parsing, some processing, and appends the result into one of 3 possible delta tables depending on the parse result. There is a parse_failed table, an unknwon_msgs table, and a parsed_msgs table.&lt;/P&gt;&lt;P&gt;Reading is done with&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;sdf = spark.readStream.text(path=path_input, lineSep="\n\n", pathGlobFilter="*.txt", recursiveFileLookup=True)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;and writing with&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;x = sdf.writeStream.foreachBatch(process_microbatch).start()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;where process_microbatch is&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def process_microbatch(self, batch_df: DataFrame, batch_id: int) -&amp;gt; None:
    """Processing of newly arrived messages. For each message replicate it if needed, and execute the parse_msg_proxy on each."""
    batch_df.rdd.flatMap(lambda msg: replicate_msg(msg)).map(lambda msg: parse_msg_proxy(msg))&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;and where parse_msg_proxy is&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def parse_msg_proxy(self, msg: str) -&amp;gt; None:
        try:
            parsed_msg = parse_message(msg, element_mapping)
            # do some processing
            # create df_msg dataframe from parsed_msg
            df_msg.write.format("delta").mode("append").save(path_parsed_msgs)
        except ParseException as e:
            spark.createDataFrame([{'msg': parsed_msg, 'error': str(e)}]).write.format("delta").mode("append").save(path_parse_errors)
            raise Exception("Parse error occured.")
        except UnknownMsgTypeException:
            spark.createDataFrame([{'msg': parsed_msg}]).write.format("delta").mode("append").save(path_unknown_msgs)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;The streaming job starts without error message, but the delta tables are not created. Whats wrong? Maybe an action operations are missing, like collect()? Where should I put it?&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 22 Jan 2024 08:59:54 GMT</pubDate>
    <dc:creator>bayerb</dc:creator>
    <dc:date>2024-01-22T08:59:54Z</dc:date>
    <item>
      <title>Sink is not written into delta table in Spark structured streaming</title>
      <link>https://community.databricks.com/t5/data-engineering/sink-is-not-written-into-delta-table-in-spark-structured/m-p/58078#M31013</link>
      <description>&lt;P&gt;I want to create a streaming job, that reads messages from a folder within TXT files, does the parsing, some processing, and appends the result into one of 3 possible delta tables depending on the parse result. There is a parse_failed table, an unknwon_msgs table, and a parsed_msgs table.&lt;/P&gt;&lt;P&gt;Reading is done with&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;sdf = spark.readStream.text(path=path_input, lineSep="\n\n", pathGlobFilter="*.txt", recursiveFileLookup=True)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;and writing with&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;x = sdf.writeStream.foreachBatch(process_microbatch).start()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;where process_microbatch is&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def process_microbatch(self, batch_df: DataFrame, batch_id: int) -&amp;gt; None:
    """Processing of newly arrived messages. For each message replicate it if needed, and execute the parse_msg_proxy on each."""
    batch_df.rdd.flatMap(lambda msg: replicate_msg(msg)).map(lambda msg: parse_msg_proxy(msg))&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;and where parse_msg_proxy is&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def parse_msg_proxy(self, msg: str) -&amp;gt; None:
        try:
            parsed_msg = parse_message(msg, element_mapping)
            # do some processing
            # create df_msg dataframe from parsed_msg
            df_msg.write.format("delta").mode("append").save(path_parsed_msgs)
        except ParseException as e:
            spark.createDataFrame([{'msg': parsed_msg, 'error': str(e)}]).write.format("delta").mode("append").save(path_parse_errors)
            raise Exception("Parse error occured.")
        except UnknownMsgTypeException:
            spark.createDataFrame([{'msg': parsed_msg}]).write.format("delta").mode("append").save(path_unknown_msgs)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;The streaming job starts without error message, but the delta tables are not created. Whats wrong? Maybe an action operations are missing, like collect()? Where should I put it?&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 22 Jan 2024 08:59:54 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sink-is-not-written-into-delta-table-in-spark-structured/m-p/58078#M31013</guid>
      <dc:creator>bayerb</dc:creator>
      <dc:date>2024-01-22T08:59:54Z</dc:date>
    </item>
    <item>
      <title>Re: Sink is not written into delta table in Spark structured streaming</title>
      <link>https://community.databricks.com/t5/data-engineering/sink-is-not-written-into-delta-table-in-spark-structured/m-p/58145#M31024</link>
      <description>&lt;P&gt;There doesn't seem to any issue with code. But log needs to be analysed to get a clue of what is the issue. Could you please create a support ticket.&lt;/P&gt;</description>
      <pubDate>Mon, 22 Jan 2024 13:47:35 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sink-is-not-written-into-delta-table-in-spark-structured/m-p/58145#M31024</guid>
      <dc:creator>Lakshay</dc:creator>
      <dc:date>2024-01-22T13:47:35Z</dc:date>
    </item>
  </channel>
</rss>

