I need some solution for below problem.
We have set of json files which are keep coming to aws s3, these files contains details for a property . please note 1 property can have 10-12 rows in this json file. Attached is sample json file.
We need to read these files as stream and then we need to create compacted view of 1 property , which means take all rows for that property and merge and create a single row per property . once that is done , we can write that stream to delta/ documentDB/ DynamoDB.
I have tried below
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
ACCESS_KEY_ID = access_key
SECRET_ACCESS_KEY = secret_key
ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")
print ("Mounting", bucket_name)
try:
# Unmount the data in case it was already mounted.
dbutils.fs.unmount("/mnt/%s" % mount_folder)
except:
# If it fails to unmount it most likely wasn't mounted in the first place
print ("Directory not unmounted: ", mount_folder)
finally:
# Lastly, mount our bucket.
dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)
#dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)
print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")
# Set AWS programmatic access credentials
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName('Comparis-data-stream-app').getOrCreate()
print('Session created')
# JSONschema = StructType([
# StructField("username",StringType(),True),
# StructField("currency",StringType(),True),
# StructField("amount",LongType(),True),
# ])
JSONschema = StructType([
StructField("id", StringType(), False),
StructField("address1", StringType(), True, None),
StructField("address2", StringType(), True, None),
StructField("city", StringType(), True, None),
StructField("state", StringType(), True, None),
StructField("postalCode", StringType(), True, None),
StructField("price", IntegerType(), True, None),
StructField("size", IntegerType(), True, None),
StructField("rooms", IntegerType(), True, None),
StructField("lat", DecimalType(), True , None),
StructField("lng", DecimalType(), True, None),
StructField("hash_lat_lng", StringType(), True , None),
StructField("source", StringType(), True , None),
StructField("source_premium", StringType(), True, None),
StructField("timestamp", TimestampType(), True , None),
StructField("attributes", StructType([
StructField("type", StringType()),
StructField("lift", StringType()),
StructField("garden", StringType()),
StructField("heating", StringType()),
StructField("washing_machine", StringType()),
StructField("floor", StringType()),
StructField("year_of_construction", IntegerType())
]))])
ds = (spark.readStream
.schema(JSONschema)
.format("json")
.option("maxFilesPerTrigger", 1)
.load("/mnt/raj-zuk-comparis-poc/messages*.json"))
flattened_df = (ds.withColumn("property_type", expr("attributes.type"))
.withColumn("property_lift", expr("attributes.lift"))
.withColumn("property_garden", expr("attributes.garden"))
.withColumn("property_heating", expr("attributes.heating"))
.withColumn("property_washing_machine", expr("attributes.washing_machine"))
.withColumn("property_floor", expr("attributes.floor"))
.withColumn("property_year_of_construction", expr("attributes.year_of_construction"))
)
tumbling_df = (df_a44ed14d77a096c5197933f1e02b7a47
.groupBy(window(col("timestamp"), "4 hour"), col("source"), col("hash_lat_lng"))
.agg(max("timestamp").alias("timestamp"),
first("address1").alias("address1"),
first("address2").alias("address2"),
first("city").alias("city"),
first("state").alias("state"),
first("postalCode").alias("postalCode"),
first("price").alias("price"),
first("size").alias("size"),
first("rooms").alias("rooms"),
first("property_type").alias("property_type"),
first("property_lift").alias("property_lift"),
first("property_garden").alias("property_garden"),
first("property_heating").alias("property_heating"),
first("property_washing_machine").alias("property_washing_machine"),
first("property_floor").alias("property_floor"),
first("property_year_of_construction").alias("property_year_of_construction"),
first("source_premium").alias("source_premium"))
.orderBy(col("window.start"))
)
After this i am not sure how to write this ? could you please advise what should be correct way to write or solve this problem please?