<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Pipeline workflow dude in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3020#M219</link>
    <description>&lt;P&gt;Ive had issues trying to ingest with Autoloader as a single batch process into a dataframe. Its mainly for writing directly to a table or for streaming. Ive concluded the best way is to autoload into bronze then do a spark.read into a dataframe to transform and then write/upsert to tables with spark.sql&lt;/P&gt;</description>
    <pubDate>Mon, 19 Jun 2023 13:39:58 GMT</pubDate>
    <dc:creator>etsyal1e2r3</dc:creator>
    <dc:date>2023-06-19T13:39:58Z</dc:date>
    <item>
      <title>Pipeline workflow dude</title>
      <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3012#M211</link>
      <description>&lt;P&gt;Hi! I have a problem. I'm using an autoloader to ingest data from raw to a Delta Lake, but when my pipeline starts, I want to apply the pipeline only to the new data. The autoloader ingests data into the Delta Lake, but now, how can I distinguish the new data from the old?&lt;/P&gt;</description>
      <pubDate>Thu, 15 Jun 2023 11:49:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3012#M211</guid>
      <dc:creator>apiury</dc:creator>
      <dc:date>2023-06-15T11:49:13Z</dc:date>
    </item>
    <item>
      <title>Re: Pipeline workflow dude</title>
      <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3013#M212</link>
      <description>&lt;P&gt;You can add a column and give it a value of the days date that you ran for the newly added data with the selectExpr() function in autoloader. Itd look something like this...&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;From pyspark.sql.functions import current_timestamp
&amp;nbsp;
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "&amp;lt;path-to-checkpoint&amp;gt;") \
  .load("&amp;lt;source-data-with-nested-json&amp;gt;") \
  .selectExpr(
    "*",
    "current_timestamp() as `Date_Pulled`",
  )&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 16 Jun 2023 02:16:41 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3013#M212</guid>
      <dc:creator>etsyal1e2r3</dc:creator>
      <dc:date>2023-06-16T02:16:41Z</dc:date>
    </item>
    <item>
      <title>Re: Pipeline workflow dude</title>
      <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3014#M213</link>
      <description>&lt;P&gt;Hi @Alejandro Piury Pinzón​&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;We haven't heard from you since the last response from @Tyler Retzlaff​&amp;nbsp;​, and I was checking back to see if her suggestions helped you.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Or else, If you have any solution, please share it with the community, as it can be helpful to others.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 16 Jun 2023 03:35:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3014#M213</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2023-06-16T03:35:13Z</dc:date>
    </item>
    <item>
      <title>Re: Pipeline workflow dude</title>
      <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3015#M214</link>
      <description>&lt;P&gt;but why add days day column? I thought that autoloader keep the track of new files. My problem is, how can i process only new files. My data is binary and i have to apply a transformation but i dont want to apply it all data. &lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2023 09:06:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3015#M214</guid>
      <dc:creator>apiury</dc:creator>
      <dc:date>2023-06-19T09:06:33Z</dc:date>
    </item>
    <item>
      <title>Re: Pipeline workflow dude</title>
      <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3016#M215</link>
      <description>&lt;P&gt;Autoloader keeps track of files yeah so that it only reads them once to prevent duplicates. If you do a count before and after autoloader each time youll see that it only adds new data. Now do you have a @timestamp column? Im not sure what your logic looks like in the pipeline but if you have a timestamp or date_pulled column you can filter the pipeline query to grab the data thar doesnt exist yet in the next table in the pipeline by checking it for the last timestamp/date_pilled data. But if you just grab all the data into a dataframe you can just do an upsert to the new table which will update existing records (if you want) and insert new ones. I can only speculate what your logic looks like though without more info &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2023 12:27:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3016#M215</guid>
      <dc:creator>etsyal1e2r3</dc:creator>
      <dc:date>2023-06-19T12:27:32Z</dc:date>
    </item>
    <item>
      <title>Re: Pipeline workflow dude</title>
      <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3017#M216</link>
      <description>&lt;P&gt;Checking the data that doesn't exit yet in the next table and apply it transformation not is the same that use autoloader after the first ingest again? For example, i have binary data (pcap file format), in bronze layer. I want to transform the pcap into csv format and ingest in silver layer, but I dont want proccess the whole data each time, so only new files arrive. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2023 13:10:26 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3017#M216</guid>
      <dc:creator>apiury</dc:creator>
      <dc:date>2023-06-19T13:10:26Z</dc:date>
    </item>
    <item>
      <title>Re: Pipeline workflow dude</title>
      <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3018#M217</link>
      <description>&lt;P&gt;Yeah well you have to do an upsert with a generated checksum with all the data or only grab data after a certain datetime.&lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2023 13:20:44 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3018#M217</guid>
      <dc:creator>etsyal1e2r3</dc:creator>
      <dc:date>2023-06-19T13:20:44Z</dc:date>
    </item>
    <item>
      <title>Re: Pipeline workflow dude</title>
      <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3019#M218</link>
      <description>&lt;P&gt;Okey. Then, after ingest data with autoloader, check the new data by for example, a  date pulled column. My last dude is, why use autoloader only for ingestion (in this case, bronze layer, and not use for ingest bronze data into silver too?)&lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2023 13:25:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3019#M218</guid>
      <dc:creator>apiury</dc:creator>
      <dc:date>2023-06-19T13:25:25Z</dc:date>
    </item>
    <item>
      <title>Re: Pipeline workflow dude</title>
      <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3020#M219</link>
      <description>&lt;P&gt;Ive had issues trying to ingest with Autoloader as a single batch process into a dataframe. Its mainly for writing directly to a table or for streaming. Ive concluded the best way is to autoload into bronze then do a spark.read into a dataframe to transform and then write/upsert to tables with spark.sql&lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2023 13:39:58 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3020#M219</guid>
      <dc:creator>etsyal1e2r3</dc:creator>
      <dc:date>2023-06-19T13:39:58Z</dc:date>
    </item>
    <item>
      <title>Re: Pipeline workflow dude</title>
      <link>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3021#M220</link>
      <description>&lt;P&gt;So in your case you would pull data in as pcap then pull from that table to write to csv... not sure how well pcap to a table work bc ive never looked. But as long as you can write data to a table you can save as csv or just export the data as csv depending on your requirements.&lt;/P&gt;</description>
      <pubDate>Mon, 19 Jun 2023 13:41:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pipeline-workflow-dude/m-p/3021#M220</guid>
      <dc:creator>etsyal1e2r3</dc:creator>
      <dc:date>2023-06-19T13:41:33Z</dc:date>
    </item>
  </channel>
</rss>

