cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates

Data_Bricks1
New Contributor III

I am able to load data for single container by hard coding, but not able to load from multiple containers. I used for loop, but data frame is loading only last container's last folder record only.

Here one more issue is I have to flatten data, when I execute code in separate cell for flattening, it is working fine, but when kept in same cell under for loop it is throwing error.

Can someone help me on this, it is highly appreciated. Many Thanks in advance.

7 REPLIES 7

Anonymous
Not applicable

Hello there! My name is Piper and I'm one of the community moderators! Thank you for your question. Let's see how your fellows respond.

Many Thanks Piper

Hubert-Dudek
Esteemed Contributor III

" I used for loop, but data frame is loading only last container's last folder record only."

It seems that dataframe is overwritten. Please check that you have main dataframe and you union/unionAll dataframes from loop with main one. Maybe you could share your code?

Many Thanks for your response HubertDudek. As mentioned in response, please find the following code which I am using:

*************************************

import os

import pyspark

from pyspark.sql import SparkSession

from pyspark.sql.types import StructType

from pyspark.sql.types import ArrayType

from pyspark.sql.functions import col

from pyspark.sql.functions import explode_outer

from array import array

from azure.storage.blob import BlockBlobService

from datetime import date, timedelta

block_blob_service = BlockBlobService(account_name="********", account_key="*************") 

containers = block_blob_service.list_containers()

for c in containers:

  top_level_container_name = c.name

  generator = block_blob_service.list_blobs(top_level_container_name)

  #print(c.name)

  if "self-verification" in c.name:

   for blob in generator:

     if "/PageViews/" in blob.name:

      if (date.today() - timedelta(1)).isoformat() in blob.name:

       #print(c.name)

       df2 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)

       #print(df2)

       def Flatten(df2):

         complex_fields = dict([(field.name, field.dataType)

              for field in df2.schema.fields

              if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

         while len(complex_fields) != 0:

           col_name = list(complex_fields.keys())[0]

           if (type(complex_fields[col_name]) == StructType):

            expanded = [col(col_name + '.' + k).alias(col_name + '_' + k) for k in [ n.name for n in complex_fields[col_name]]]

#             print(col_name)

#             display(df2)

#             print(expanded)

            df2 = df2.select("*", *expanded).drop(col_name)

            #print(df2)

           elif (type(complex_fields[col_name]) == ArrayType):

            df2 = df2.withColumn(col_name, explode_outer(col_name))

            complex_fields = dict([(field.name, field.dataType)

                for field in df2.schema.fields

                if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

                #return df

            #print("good")

            #print("morning")

         return df2

       Flatten_df2 = Flatten(df2)  

       Flatten_df2.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")    

************************************

Please help me on this. Many Thanks

Data_Bricks1
New Contributor III

Many Thanks for your response HubertDudek. As mentioned in response, please find the following code which I am using:

*************************************

import os

import pyspark

from pyspark.sql import SparkSession

from pyspark.sql.types import StructType

from pyspark.sql.types import ArrayType

from pyspark.sql.functions import col

from pyspark.sql.functions import explode_outer

from array import array

from azure.storage.blob import BlockBlobService

from datetime import date, timedelta

block_blob_service = BlockBlobService(account_name="********", account_key="*************")

containers = block_blob_service.list_containers()

for c in containers:

  top_level_container_name = c.name

  generator = block_blob_service.list_blobs(top_level_container_name)

  #print(c.name)

  if "self-verification" in c.name:

   for blob in generator:

     if "/PageViews/" in blob.name:

      if (date.today() - timedelta(1)).isoformat() in blob.name:

       #print(c.name)

       df2 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)

       #print(df2)

       def Flatten(df2):

         complex_fields = dict([(field.name, field.dataType)

              for field in df2.schema.fields

              if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

         while len(complex_fields) != 0:

           col_name = list(complex_fields.keys())[0]

           if (type(complex_fields[col_name]) == StructType):

            expanded = [col(col_name + '.' + k).alias(col_name + '_' + k) for k in [ n.name for n in complex_fields[col_name]]]

            df2 = df2.select("*", *expanded).drop(col_name)

            elif (type(complex_fields[col_name]) == ArrayType):

            df2 = df2.withColumn(col_name, explode_outer(col_name))

            complex_fields = dict([(field.name, field.dataType)

                for field in df2.schema.fields

                if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

         return df2

       Flatten_df2 = Flatten(df2) 

       Flatten_df2.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")    

*********************************************

Please help me.

Best Regards

Rajeswari Gummadi

Hubert-Dudek
Esteemed Contributor III

for sure function (def) should be declared outside loop, move it after importing libraries,

logic is a bit complicated you need to debug it using display(Flatten_df2) (or .show()) and validating json after each iteration (using break or sleep etc.)

I tried without flatten, even though it is not working. no records are inserting into table.

import os

from azure.storage.blob import BlockBlobService

from datetime import date, timedelta

block_blob_service = BlockBlobService(account_name="*******", account_key="************") 

containers = block_blob_service.list_containers()

for c in containers:

 if "self-verification" in c.name:

  top_level_container_name = c.name

  generator = block_blob_service.list_blobs(top_level_container_name)

  for blob in generator:

   if "/PageViews/" in blob.name:

    if (date.today() - timedelta(1)).isoformat()+"/05/" in blob.name:

     # print(c.name)

     dfoct11 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)

     #dfoct11.write.format("delta").mode('append').save("usr/hive/warehouse/stg_pageviews") #write with format as delta

     #dfoct11.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")

     dfoct11.write.format("json").mode("append").option("SaveMode.Overwrite",True)    .save("/usr/hive/warehouse/stg_pageviews")

     #dfoct11.write.format("parquet").saveAsTable("stg_pageviews")

     df = spark.sql('select * from stg_pageviews')

     display(dfoct11) #displaying data

     display(df) #No records found

     print(blob.name) #[Displaying path

***************************************]​

Somehow data is not appending or inserting into table

Please let me know, if I need to do something else

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.