10-13-2021 11:47 AM
I am able to load data for single container by hard coding, but not able to load from multiple containers. I used for loop, but data frame is loading only last container's last folder record only.
Here one more issue is I have to flatten data, when I execute code in separate cell for flattening, it is working fine, but when kept in same cell under for loop it is throwing error.
Can someone help me on this, it is highly appreciated. Many Thanks in advance.
10-13-2021 12:36 PM
Hello there! My name is Piper and I'm one of the community moderators! Thank you for your question. Let's see how your fellows respond.
10-14-2021 02:24 AM
Many Thanks Piper
10-14-2021 01:14 AM
" I used for loop, but data frame is loading only last container's last folder record only."
It seems that dataframe is overwritten. Please check that you have main dataframe and you union/unionAll dataframes from loop with main one. Maybe you could share your code?
10-14-2021 02:23 AM
Many Thanks for your response HubertDudek. As mentioned in response, please find the following code which I am using:
*************************************
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import col
from pyspark.sql.functions import explode_outer
from array import array
from azure.storage.blob import BlockBlobService
from datetime import date, timedelta
block_blob_service = BlockBlobService(account_name="********", account_key="*************")
containers = block_blob_service.list_containers()
for c in containers:
top_level_container_name = c.name
generator = block_blob_service.list_blobs(top_level_container_name)
#print(c.name)
if "self-verification" in c.name:
for blob in generator:
if "/PageViews/" in blob.name:
if (date.today() - timedelta(1)).isoformat() in blob.name:
#print(c.name)
df2 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)
#print(df2)
def Flatten(df2):
complex_fields = dict([(field.name, field.dataType)
for field in df2.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields) != 0:
col_name = list(complex_fields.keys())[0]
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name + '.' + k).alias(col_name + '_' + k) for k in [ n.name for n in complex_fields[col_name]]]
# print(col_name)
# display(df2)
# print(expanded)
df2 = df2.select("*", *expanded).drop(col_name)
#print(df2)
elif (type(complex_fields[col_name]) == ArrayType):
df2 = df2.withColumn(col_name, explode_outer(col_name))
complex_fields = dict([(field.name, field.dataType)
for field in df2.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
#return df
#print("good")
#print("morning")
return df2
Flatten_df2 = Flatten(df2)
Flatten_df2.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")
************************************
Please help me on this. Many Thanks
10-14-2021 02:20 AM
Many Thanks for your response HubertDudek. As mentioned in response, please find the following code which I am using:
*************************************
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import col
from pyspark.sql.functions import explode_outer
from array import array
from azure.storage.blob import BlockBlobService
from datetime import date, timedelta
block_blob_service = BlockBlobService(account_name="********", account_key="*************")
containers = block_blob_service.list_containers()
for c in containers:
top_level_container_name = c.name
generator = block_blob_service.list_blobs(top_level_container_name)
#print(c.name)
if "self-verification" in c.name:
for blob in generator:
if "/PageViews/" in blob.name:
if (date.today() - timedelta(1)).isoformat() in blob.name:
#print(c.name)
df2 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)
#print(df2)
def Flatten(df2):
complex_fields = dict([(field.name, field.dataType)
for field in df2.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields) != 0:
col_name = list(complex_fields.keys())[0]
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name + '.' + k).alias(col_name + '_' + k) for k in [ n.name for n in complex_fields[col_name]]]
df2 = df2.select("*", *expanded).drop(col_name)
elif (type(complex_fields[col_name]) == ArrayType):
df2 = df2.withColumn(col_name, explode_outer(col_name))
complex_fields = dict([(field.name, field.dataType)
for field in df2.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df2
Flatten_df2 = Flatten(df2)
Flatten_df2.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")
*********************************************
Please help me.
Best Regards
Rajeswari Gummadi
10-14-2021 03:48 AM
for sure function (def) should be declared outside loop, move it after importing libraries,
logic is a bit complicated you need to debug it using display(Flatten_df2) (or .show()) and validating json after each iteration (using break or sleep etc.)
10-14-2021 04:31 AM
I tried without flatten, even though it is not working. no records are inserting into table.
import os
from azure.storage.blob import BlockBlobService
from datetime import date, timedelta
block_blob_service = BlockBlobService(account_name="*******", account_key="************")
containers = block_blob_service.list_containers()
for c in containers:
if "self-verification" in c.name:
top_level_container_name = c.name
generator = block_blob_service.list_blobs(top_level_container_name)
for blob in generator:
if "/PageViews/" in blob.name:
if (date.today() - timedelta(1)).isoformat()+"/05/" in blob.name:
# print(c.name)
dfoct11 = spark.read.option("multiline","true").option("inferSchema","true").option("header","True") .option("recursiveFileLookup","true").json("/mnt/"+c.name+"/"+blob.name)
#dfoct11.write.format("delta").mode('append').save("usr/hive/warehouse/stg_pageviews") #write with format as delta
#dfoct11.write.mode("append").json("/usr/hive/warehouse/stg_pageviews")
dfoct11.write.format("json").mode("append").option("SaveMode.Overwrite",True) .save("/usr/hive/warehouse/stg_pageviews")
#dfoct11.write.format("parquet").saveAsTable("stg_pageviews")
df = spark.sql('select * from stg_pageviews')
display(dfoct11) #displaying data
display(df) #No records found
print(blob.name) #[Displaying path
***************************************]
Somehow data is not appending or inserting into table
Please let me know, if I need to do something else
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group