cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

data from 10 BLOB containers and multiple hierarchical folders(every day and every hour folders) in each container to Delta lake table in parquet format - Incremental loading for latest data only insert no updates

Data_Bricks1
New Contributor III

I am able to load data for single container by hard coding, but not able to load from multiple containers. I used for loop, but data frame is loading only last container's last folder record only.

Here one more issue is I have to flatten data, when I execute code in separate cell for flattening, it is working fine, but when kept in same cell under for loop it is throwing error.

Can someone help me on this, it is highly appreciated. Many Thanks in advance.

7 REPLIES 7

Anonymous
Not applicable

Hello there! My name is Piper and I'm one of the community moderators! Thank you for your question. Let's see how your fellows respond.

Many Thanks Piper

Hubert-Dudek
Esteemed Contributor III

" I used for loop, but data frame is loading only last container's last folder record only."

It seems that dataframe is overwritten. Please check that you have main dataframe and you union/unionAll dataframes from loop with main one. Maybe you could share your code?

Many Thanks for your response HubertDudek. As mentioned in response, please find the following code which I am using:

*************************************

import os

import pyspark

from pyspark.sql import SparkSession

from pyspark.sql.types import StructType

from pyspark.sql.types import ArrayType

from pyspark.sql.functions import col

from pyspark.sql.functions import explode_outer

from array import array

from azure.storage.blob import BlockBlobService

from datetime import date, timedelta

block_blob_service = BlockBlobService(account_name="********", account_key="*************") 

containers = block_blob_service.list_containers()

for c in containers:

  top_level_container_name = c.name

  generator = block_blob_service.list_blobs(top_level_container_name)

  #print(c.name)

  if "self-verification" in c.name:

   for blob in generator:

     if "/PageViews/" in blob.name:

      if (date.today() - timedelta(1)).isoformat() in blob.name:

       #print(c.name)

       df2 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)

       #print(df2)

       def Flatten(df2):

         complex_fields = dict([(field.name, field.dataType)

              for field in df2.schema.fields

              if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

         while len(complex_fields) != 0:

           col_name = list(complex_fields.keys())[0]

           if (type(complex_fields[col_name]) == StructType):

            expanded = [col(col_name + '.' + k).alias(col_name + '_' + k) for k in [ n.name for n in complex_fields[col_name]]]

#             print(col_name)

#             display(df2)

#             print(expanded)

            df2 = df2.select("*", *expanded).drop(col_name)

            #print(df2)

           elif (type(complex_fields[col_name]) == ArrayType):

            df2 = df2.withColumn(col_name, explode_outer(col_name))

            complex_fields = dict([(field.name, field.dataType)

                for field in df2.schema.fields

                if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

                #return df

            #print("good")

            #print("morning")

         return df2

       Flatten_df2 = Flatten(df2)  

       Flatten_df2.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")    

************************************

Please help me on this. Many Thanks

Data_Bricks1
New Contributor III

Many Thanks for your response HubertDudek. As mentioned in response, please find the following code which I am using:

*************************************

import os

import pyspark

from pyspark.sql import SparkSession

from pyspark.sql.types import StructType

from pyspark.sql.types import ArrayType

from pyspark.sql.functions import col

from pyspark.sql.functions import explode_outer

from array import array

from azure.storage.blob import BlockBlobService

from datetime import date, timedelta

block_blob_service = BlockBlobService(account_name="********", account_key="*************")

containers = block_blob_service.list_containers()

for c in containers:

  top_level_container_name = c.name

  generator = block_blob_service.list_blobs(top_level_container_name)

  #print(c.name)

  if "self-verification" in c.name:

   for blob in generator:

     if "/PageViews/" in blob.name:

      if (date.today() - timedelta(1)).isoformat() in blob.name:

       #print(c.name)

       df2 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)

       #print(df2)

       def Flatten(df2):

         complex_fields = dict([(field.name, field.dataType)

              for field in df2.schema.fields

              if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

         while len(complex_fields) != 0:

           col_name = list(complex_fields.keys())[0]

           if (type(complex_fields[col_name]) == StructType):

            expanded = [col(col_name + '.' + k).alias(col_name + '_' + k) for k in [ n.name for n in complex_fields[col_name]]]

            df2 = df2.select("*", *expanded).drop(col_name)

            elif (type(complex_fields[col_name]) == ArrayType):

            df2 = df2.withColumn(col_name, explode_outer(col_name))

            complex_fields = dict([(field.name, field.dataType)

                for field in df2.schema.fields

                if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

         return df2

       Flatten_df2 = Flatten(df2) 

       Flatten_df2.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")    

*********************************************

Please help me.

Best Regards

Rajeswari Gummadi

Hubert-Dudek
Esteemed Contributor III

for sure function (def) should be declared outside loop, move it after importing libraries,

logic is a bit complicated you need to debug it using display(Flatten_df2) (or .show()) and validating json after each iteration (using break or sleep etc.)

I tried without flatten, even though it is not working. no records are inserting into table.

import os

from azure.storage.blob import BlockBlobService

from datetime import date, timedelta

block_blob_service = BlockBlobService(account_name="*******", account_key="************") 

containers = block_blob_service.list_containers()

for c in containers:

 if "self-verification" in c.name:

  top_level_container_name = c.name

  generator = block_blob_service.list_blobs(top_level_container_name)

  for blob in generator:

   if "/PageViews/" in blob.name:

    if (date.today() - timedelta(1)).isoformat()+"/05/" in blob.name:

     # print(c.name)

     dfoct11 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)

     #dfoct11.write.format("delta").mode('append').save("usr/hive/warehouse/stg_pageviews") #write with format as delta

     #dfoct11.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")

     dfoct11.write.format("json").mode("append").option("SaveMode.Overwrite",True)    .save("/usr/hive/warehouse/stg_pageviews")

     #dfoct11.write.format("parquet").saveAsTable("stg_pageviews")

     df = spark.sql('select * from stg_pageviews')

     display(dfoct11) #displaying data

     display(df) #No records found

     print(blob.name) #[Displaying path

***************************************]​

Somehow data is not appending or inserting into table

Please let me know, if I need to do something else

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!