<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13597#M8255</link>
    <description>&lt;P&gt;I tried without flatten, even though it is not working. no records are inserting into table.&lt;/P&gt;&lt;P&gt;import os&lt;/P&gt;&lt;P&gt;from azure.storage.blob import BlockBlobService&lt;/P&gt;&lt;P&gt;from datetime import date, timedelta&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;block_blob_service = BlockBlobService(account_name="*******", account_key="************")&amp;nbsp;&lt;/P&gt;&lt;P&gt;containers = block_blob_service.list_containers()&lt;/P&gt;&lt;P&gt;for c in containers:&lt;/P&gt;&lt;P&gt;&amp;nbsp;if "self-verification" in c.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;top_level_container_name = c.name&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;generator = block_blob_service.list_blobs(top_level_container_name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;for blob in generator:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;if "/PageViews/" in blob.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if (date.today() - timedelta(1)).isoformat()+"/05/" in blob.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;# print(c.name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;dfoct11 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#dfoct11.write.format("delta").mode('append').save("usr/hive/warehouse/stg_pageviews") #write with format as delta&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#dfoct11.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;dfoct11.write.format("json").mode("append").option("SaveMode.Overwrite",True)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.save("/usr/hive/warehouse/stg_pageviews")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#dfoct11.write.format("parquet").saveAsTable("stg_pageviews")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;df = spark.sql('select * from&amp;nbsp;stg_pageviews')&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;display(dfoct11) #displaying data&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;display(df) #No records found&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;print(blob.name) #[Displaying path&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;***************************************]​&lt;/P&gt;&lt;P&gt;Somehow data is not appending or inserting into table&lt;/P&gt;&lt;P&gt;Please let me know, if I need to do something else&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 14 Oct 2021 11:31:35 GMT</pubDate>
    <dc:creator>Data_Bricks1</dc:creator>
    <dc:date>2021-10-14T11:31:35Z</dc:date>
    <item>
      <title>data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates</title>
      <link>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13590#M8248</link>
      <description>&lt;P&gt;I am able to load data for single container by hard coding, but not able to load from multiple containers. I used for loop, but data frame is loading only last container's last folder record only.&lt;/P&gt;&lt;P&gt;Here one more issue is I have to flatten data, when I execute code in separate cell for flattening, it is working fine, but when kept in same cell under for loop it is throwing error.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Can someone help me on this, it is highly appreciated. Many Thanks in advance.&lt;/P&gt;</description>
      <pubDate>Wed, 13 Oct 2021 18:47:18 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13590#M8248</guid>
      <dc:creator>Data_Bricks1</dc:creator>
      <dc:date>2021-10-13T18:47:18Z</dc:date>
    </item>
    <item>
      <title>Re: data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates</title>
      <link>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13591#M8249</link>
      <description>&lt;P&gt;Hello there! My name is Piper and I'm one of the community moderators! Thank you for your question. Let's see how your fellows respond. &lt;/P&gt;</description>
      <pubDate>Wed, 13 Oct 2021 19:36:55 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13591#M8249</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2021-10-13T19:36:55Z</dc:date>
    </item>
    <item>
      <title>Re: data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates</title>
      <link>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13592#M8250</link>
      <description>&lt;P&gt;" I used for loop, but data frame is loading only last container's last folder record only."&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;It seems that dataframe is overwritten. Please check that you have main dataframe and you union/unionAll dataframes from loop with main one.  Maybe you could share your code?&lt;/P&gt;</description>
      <pubDate>Thu, 14 Oct 2021 08:14:10 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13592#M8250</guid>
      <dc:creator>Hubert-Dudek</dc:creator>
      <dc:date>2021-10-14T08:14:10Z</dc:date>
    </item>
    <item>
      <title>Re: data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates</title>
      <link>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13593#M8251</link>
      <description>&lt;P&gt;Many Thanks for your response HubertDudek. As mentioned in response, please find the following code which I am using:&lt;/P&gt;&lt;P&gt;*************************************&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;import os&lt;/P&gt;&lt;P&gt;import pyspark&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;/P&gt;&lt;P&gt;from pyspark.sql.types import StructType&lt;/P&gt;&lt;P&gt;from pyspark.sql.types import ArrayType&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import col&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import explode_outer&lt;/P&gt;&lt;P&gt;from array import array&lt;/P&gt;&lt;P&gt;from azure.storage.blob import BlockBlobService&lt;/P&gt;&lt;P&gt;from datetime import date, timedelta&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;block_blob_service = BlockBlobService(account_name="********", account_key="*************")&lt;/P&gt;&lt;P&gt;containers = block_blob_service.list_containers()&lt;/P&gt;&lt;P&gt;for c in containers:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;top_level_container_name = c.name&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;generator = block_blob_service.list_blobs(top_level_container_name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;#print(c.name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;if "self-verification" in c.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;for blob in generator:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if "/PageViews/" in blob.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if (date.today() - timedelta(1)).isoformat() in blob.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#print(c.name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;df2 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#print(df2)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;def Flatten(df2):&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;complex_fields = dict([(field.name, field.dataType)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;for field in df2.schema.fields&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if type(field.dataType) == ArrayType or type(field.dataType) == StructType])&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;while len(complex_fields) != 0:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;col_name = list(complex_fields.keys())[0]&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if (type(complex_fields[col_name]) == StructType):&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;expanded = [col(col_name + '.' + k).alias(col_name + '_' + k) for k in [ n.name for n in complex_fields[col_name]]]&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;df2 = df2.select("*", *expanded).drop(col_name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;elif (type(complex_fields[col_name]) == ArrayType):&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;df2 = df2.withColumn(col_name, explode_outer(col_name))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;complex_fields = dict([(field.name, field.dataType)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;for field in df2.schema.fields&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if type(field.dataType) == ArrayType or type(field.dataType) == StructType])&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;return df2&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;Flatten_df2 = Flatten(df2)&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;Flatten_df2.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;*********************************************&lt;/P&gt;&lt;P&gt;Please help me.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Best Regards&lt;/P&gt;&lt;P&gt;Rajeswari Gummadi&lt;/P&gt;</description>
      <pubDate>Thu, 14 Oct 2021 09:20:40 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13593#M8251</guid>
      <dc:creator>Data_Bricks1</dc:creator>
      <dc:date>2021-10-14T09:20:40Z</dc:date>
    </item>
    <item>
      <title>Re: data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates</title>
      <link>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13594#M8252</link>
      <description>&lt;P&gt;Many Thanks for your response HubertDudek. As mentioned in response, please find the following code which I am using:&lt;/P&gt;&lt;P&gt;*************************************&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;import os&lt;/P&gt;&lt;P&gt;import pyspark&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;/P&gt;&lt;P&gt;from pyspark.sql.types import StructType&lt;/P&gt;&lt;P&gt;from pyspark.sql.types import ArrayType&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import col&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import explode_outer&lt;/P&gt;&lt;P&gt;from array import array&lt;/P&gt;&lt;P&gt;from azure.storage.blob import BlockBlobService&lt;/P&gt;&lt;P&gt;from datetime import date, timedelta&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;block_blob_service = BlockBlobService(account_name="********", account_key="*************")&amp;nbsp;&lt;/P&gt;&lt;P&gt;containers = block_blob_service.list_containers()&lt;/P&gt;&lt;P&gt;for c in containers:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;top_level_container_name = c.name&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;generator = block_blob_service.list_blobs(top_level_container_name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;#print(c.name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;if "self-verification" in c.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;for blob in generator:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if "/PageViews/" in blob.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if (date.today() - timedelta(1)).isoformat() in blob.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#print(c.name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;df2 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#print(df2)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;def Flatten(df2):&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;complex_fields = dict([(field.name, field.dataType)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;for field in df2.schema.fields&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if type(field.dataType) == ArrayType or type(field.dataType) == StructType])&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;while len(complex_fields) != 0:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;col_name = list(complex_fields.keys())[0]&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if (type(complex_fields[col_name]) == StructType):&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;expanded = [col(col_name + '.' + k).alias(col_name + '_' + k) for k in [ n.name for n in complex_fields[col_name]]]&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;print(col_name)&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;display(df2)&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;print(expanded)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;df2 = df2.select("*", *expanded).drop(col_name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#print(df2)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;elif (type(complex_fields[col_name]) == ArrayType):&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;df2 = df2.withColumn(col_name, explode_outer(col_name))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;complex_fields = dict([(field.name, field.dataType)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;for field in df2.schema.fields&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if type(field.dataType) == ArrayType or type(field.dataType) == StructType])&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#return df&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#print("good")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#print("morning")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;return df2&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;Flatten_df2 = Flatten(df2)&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;Flatten_df2.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;************************************&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Please help me on this. Many Thanks&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 14 Oct 2021 09:23:19 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13594#M8252</guid>
      <dc:creator>Data_Bricks1</dc:creator>
      <dc:date>2021-10-14T09:23:19Z</dc:date>
    </item>
    <item>
      <title>Re: data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates</title>
      <link>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13595#M8253</link>
      <description>&lt;P&gt;Many Thanks Piper&lt;/P&gt;</description>
      <pubDate>Thu, 14 Oct 2021 09:24:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13595#M8253</guid>
      <dc:creator>Data_Bricks1</dc:creator>
      <dc:date>2021-10-14T09:24:04Z</dc:date>
    </item>
    <item>
      <title>Re: data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates</title>
      <link>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13596#M8254</link>
      <description>&lt;P&gt;for sure function (def) should be declared outside loop, move it after importing libraries,&lt;/P&gt;&lt;P&gt;logic is a bit complicated you need to debug it using display(Flatten_df2)  (or .show()) and validating json after each iteration (using break or sleep etc.)&lt;/P&gt;</description>
      <pubDate>Thu, 14 Oct 2021 10:48:17 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13596#M8254</guid>
      <dc:creator>Hubert-Dudek</dc:creator>
      <dc:date>2021-10-14T10:48:17Z</dc:date>
    </item>
    <item>
      <title>Re: data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates</title>
      <link>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13597#M8255</link>
      <description>&lt;P&gt;I tried without flatten, even though it is not working. no records are inserting into table.&lt;/P&gt;&lt;P&gt;import os&lt;/P&gt;&lt;P&gt;from azure.storage.blob import BlockBlobService&lt;/P&gt;&lt;P&gt;from datetime import date, timedelta&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;block_blob_service = BlockBlobService(account_name="*******", account_key="************")&amp;nbsp;&lt;/P&gt;&lt;P&gt;containers = block_blob_service.list_containers()&lt;/P&gt;&lt;P&gt;for c in containers:&lt;/P&gt;&lt;P&gt;&amp;nbsp;if "self-verification" in c.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;top_level_container_name = c.name&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;generator = block_blob_service.list_blobs(top_level_container_name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;for blob in generator:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;if "/PageViews/" in blob.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;if (date.today() - timedelta(1)).isoformat()+"/05/" in blob.name:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;# print(c.name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;dfoct11 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#dfoct11.write.format("delta").mode('append').save("usr/hive/warehouse/stg_pageviews") #write with format as delta&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#dfoct11.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;dfoct11.write.format("json").mode("append").option("SaveMode.Overwrite",True)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.save("/usr/hive/warehouse/stg_pageviews")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;#dfoct11.write.format("parquet").saveAsTable("stg_pageviews")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;df = spark.sql('select * from&amp;nbsp;stg_pageviews')&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;display(dfoct11) #displaying data&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;display(df) #No records found&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;print(blob.name) #[Displaying path&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;***************************************]​&lt;/P&gt;&lt;P&gt;Somehow data is not appending or inserting into table&lt;/P&gt;&lt;P&gt;Please let me know, if I need to do something else&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 14 Oct 2021 11:31:35 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-from-10-blob-containers-and-multiple-hierarchical-folders/m-p/13597#M8255</guid>
      <dc:creator>Data_Bricks1</dc:creator>
      <dc:date>2021-10-14T11:31:35Z</dc:date>
    </item>
  </channel>
</rss>

