<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: DLT fails with Queries with streaming sources must be executed with writeStream.start(); in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/dlt-fails-with-queries-with-streaming-sources-must-be-executed/m-p/75583#M34992</link>
    <description>&lt;P&gt;UPDATE: tried adding writeStream.start() like error suggested + as per other posts and ended up with following error/code:&lt;/P&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;name&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"ETD_Bz"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;temporary&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;False&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;Bronze&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;(spark.readStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"skipChangeCommits"&lt;/SPAN&gt;&lt;SPAN&gt;,&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"default.tbl_raw_etd_data"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;SPAN&gt;# function that flattens json&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;process_raw_data&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;df&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;batchId) :&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; json_schema &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; spark.read.&lt;/SPAN&gt;&lt;SPAN&gt;json&lt;/SPAN&gt;&lt;SPAN&gt;(df.rdd.&lt;/SPAN&gt;&lt;SPAN&gt;map&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;lambda&lt;/SPAN&gt; &lt;SPAN&gt;row&lt;/SPAN&gt;&lt;SPAN&gt;: row.JsonString)).schema&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; kafka_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; &amp;nbsp;df.&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"JsonStruct"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;from_json&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"JsonString"&lt;/SPAN&gt;&lt;SPAN&gt;), json_schema))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; fj &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;FlattenJson&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; kafka_flattened_json &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; fj.&lt;/SPAN&gt;&lt;SPAN&gt;flatten_json&lt;/SPAN&gt;&lt;SPAN&gt;(kafka_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; kafka_flattened_json&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;name&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"ETD_Flattened_Bz"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;spark_conf&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;SPAN&gt;"spark.databricks.delta.schema.autoMerge.enabled"&lt;/SPAN&gt;&lt;SPAN&gt; : &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;},&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;temporary&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;False&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;Bronze_Flattend&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; stream &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (spark.readStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"skipChangeCommits"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"live.ETD_Bz"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .writeStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"json"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;outputMode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;foreachBatch&lt;/SPAN&gt;&lt;SPAN&gt;(process_raw_data)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"default.tbl_bz_tmp_etd_data"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;start&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;awaitTermination&lt;/SPAN&gt;&lt;SPAN&gt;())&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; (spark.readStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"skipChangeCommits"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"default.tbl_bz_tmp_etd_data"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;getting following error:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;"py4j.protocol.Py4JJavaError: An error occurred while calling o687.toTable. : org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find data source: foreachBatch. Please find packages at `&lt;A href="https://spark.apache.org/third-party-projects.html" target="_blank" rel="noopener"&gt;https://spark.apache.org/third-party-projects.html&lt;/A&gt;`."&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
    <pubDate>Mon, 24 Jun 2024 11:57:17 GMT</pubDate>
    <dc:creator>Nastia</dc:creator>
    <dc:date>2024-06-24T11:57:17Z</dc:date>
    <item>
      <title>DLT fails with Queries with streaming sources must be executed with writeStream.start();</title>
      <link>https://community.databricks.com/t5/data-engineering/dlt-fails-with-queries-with-streaming-sources-must-be-executed/m-p/75555#M34984</link>
      <description>&lt;P&gt;Hi guys!&lt;/P&gt;&lt;P&gt;I am having an issue with passing the "streaming flow" between layers of the DLT.&lt;/P&gt;&lt;P&gt;first layer "ETD_Bz" is passing through, but then "&lt;SPAN&gt;ETD_Flattened_Bz" is failing with "pyspark.errors.exceptions.captured.AnalysisException: Queries with streaming sources must be executed with writeStream.start();" error.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Code:&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# works successfully&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;name&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"ETD_Bz"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;temporary&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;False&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;Bronze&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; (spark.readStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"skipChangeCommits"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"default.tbl_raw_etd_data"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;SPAN&gt;# function that flattens json&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;process_raw_data&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;raw_tbl_name) :&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (spark.readStream&amp;nbsp;# &lt;STRONG&gt;&amp;lt;- starts working once I am changing from readStream to read, but then it obviously stops processing incrementally&lt;/STRONG&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"mergeSchema"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(raw_tbl_name)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; json_schema &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; spark.read.&lt;/SPAN&gt;&lt;SPAN&gt;json&lt;/SPAN&gt;&lt;SPAN&gt;(df.rdd.&lt;/SPAN&gt;&lt;SPAN&gt;map&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;lambda&lt;/SPAN&gt; &lt;SPAN&gt;row&lt;/SPAN&gt;&lt;SPAN&gt;: row.JsonString)).schema&amp;nbsp;&amp;nbsp;# &lt;STRONG&gt;&amp;lt;- fails here&lt;/STRONG&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; kafka_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; &amp;nbsp;df.&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"JsonStruct"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;from_json&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"JsonString"&lt;/SPAN&gt;&lt;SPAN&gt;), json_schema))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; fj &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;FlattenJson&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; kafka_flattened_json &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; fj.&lt;/SPAN&gt;&lt;SPAN&gt;flatten_json&lt;/SPAN&gt;&lt;SPAN&gt;(kafka_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; kafka_flattened_json&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;STRONG&gt;# failing layer&lt;/STRONG&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;name&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"ETD_Flattened_Bz"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;spark_conf&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;SPAN&gt;"spark.databricks.delta.schema.autoMerge.enabled"&lt;/SPAN&gt;&lt;SPAN&gt; : &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;},&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;temporary&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;False&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;Bronze_Flattend&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt; &lt;SPAN&gt;process_raw_data&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"live.ETD_Bz"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;any help appreciated! Thank you very much in advance&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Mon, 24 Jun 2024 09:10:05 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dlt-fails-with-queries-with-streaming-sources-must-be-executed/m-p/75555#M34984</guid>
      <dc:creator>Nastia</dc:creator>
      <dc:date>2024-06-24T09:10:05Z</dc:date>
    </item>
    <item>
      <title>Re: DLT fails with Queries with streaming sources must be executed with writeStream.start();</title>
      <link>https://community.databricks.com/t5/data-engineering/dlt-fails-with-queries-with-streaming-sources-must-be-executed/m-p/75583#M34992</link>
      <description>&lt;P&gt;UPDATE: tried adding writeStream.start() like error suggested + as per other posts and ended up with following error/code:&lt;/P&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;name&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"ETD_Bz"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;temporary&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;False&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;Bronze&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;(spark.readStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"skipChangeCommits"&lt;/SPAN&gt;&lt;SPAN&gt;,&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"default.tbl_raw_etd_data"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;SPAN&gt;# function that flattens json&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;process_raw_data&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;df&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;batchId) :&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; json_schema &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; spark.read.&lt;/SPAN&gt;&lt;SPAN&gt;json&lt;/SPAN&gt;&lt;SPAN&gt;(df.rdd.&lt;/SPAN&gt;&lt;SPAN&gt;map&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;lambda&lt;/SPAN&gt; &lt;SPAN&gt;row&lt;/SPAN&gt;&lt;SPAN&gt;: row.JsonString)).schema&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; kafka_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; &amp;nbsp;df.&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"JsonStruct"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;from_json&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"JsonString"&lt;/SPAN&gt;&lt;SPAN&gt;), json_schema))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; fj &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;FlattenJson&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; kafka_flattened_json &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; fj.&lt;/SPAN&gt;&lt;SPAN&gt;flatten_json&lt;/SPAN&gt;&lt;SPAN&gt;(kafka_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; kafka_flattened_json&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;name&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"ETD_Flattened_Bz"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;spark_conf&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;SPAN&gt;"spark.databricks.delta.schema.autoMerge.enabled"&lt;/SPAN&gt;&lt;SPAN&gt; : &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;},&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;temporary&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;False&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;Bronze_Flattend&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; stream &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (spark.readStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"skipChangeCommits"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"live.ETD_Bz"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .writeStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"json"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;outputMode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;foreachBatch&lt;/SPAN&gt;&lt;SPAN&gt;(process_raw_data)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"default.tbl_bz_tmp_etd_data"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;start&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;awaitTermination&lt;/SPAN&gt;&lt;SPAN&gt;())&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; (spark.readStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"skipChangeCommits"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"default.tbl_bz_tmp_etd_data"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;getting following error:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;"py4j.protocol.Py4JJavaError: An error occurred while calling o687.toTable. : org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find data source: foreachBatch. Please find packages at `&lt;A href="https://spark.apache.org/third-party-projects.html" target="_blank" rel="noopener"&gt;https://spark.apache.org/third-party-projects.html&lt;/A&gt;`."&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Mon, 24 Jun 2024 11:57:17 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dlt-fails-with-queries-with-streaming-sources-must-be-executed/m-p/75583#M34992</guid>
      <dc:creator>Nastia</dc:creator>
      <dc:date>2024-06-24T11:57:17Z</dc:date>
    </item>
  </channel>
</rss>

