I need some solution for below problem.
We have set of json files which are keep coming to aws s3, these files contains details for a property . please note 1 property can have 10-12 rows in this json file. Attached is sample json file.
We need to read these files as stream and then we need to create compacted view of 1 property , which means take all rows for that property and merge and create a single row per property . once that is done , we can write that stream to delta/ documentDB/ DynamoDB.
I have tried below
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
 ACCESS_KEY_ID = access_key
 SECRET_ACCESS_KEY = secret_key
 ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")
 print ("Mounting", bucket_name)
 try:
  # Unmount the data in case it was already mounted.
  dbutils.fs.unmount("/mnt/%s" % mount_folder)
   
 except:
  # If it fails to unmount it most likely wasn't mounted in the first place
  print ("Directory not unmounted: ", mount_folder)
   
 finally:
  # Lastly, mount our bucket.
  dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)
  #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)
  print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")
  # Set AWS programmatic access credentials
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName('Comparis-data-stream-app').getOrCreate()
print('Session created')
# JSONschema = StructType([ 
#   StructField("username",StringType(),True), 
#   StructField("currency",StringType(),True), 
#   StructField("amount",LongType(),True), 
#  ])
JSONschema = StructType([
    StructField("id", StringType(), False),
    StructField("address1", StringType(), True, None),
    StructField("address2", StringType(), True, None),
    StructField("city", StringType(), True, None),
    StructField("state", StringType(), True, None),
    StructField("postalCode", StringType(), True, None),
    StructField("price", IntegerType(), True, None),
    StructField("size", IntegerType(), True, None),
    StructField("rooms", IntegerType(), True, None),
    StructField("lat", DecimalType(), True , None),
    StructField("lng", DecimalType(), True, None),
    StructField("hash_lat_lng", StringType(), True , None),
    StructField("source", StringType(), True , None),
    StructField("source_premium", StringType(), True, None),
    StructField("timestamp", TimestampType(), True , None),
    StructField("attributes", StructType([
      StructField("type", StringType()),
      StructField("lift", StringType()),
      StructField("garden", StringType()),
      StructField("heating", StringType()),
      StructField("washing_machine", StringType()),
      StructField("floor", StringType()),
      StructField("year_of_construction", IntegerType())
    ]))])
ds = (spark.readStream
   .schema(JSONschema)
   .format("json")
   .option("maxFilesPerTrigger", 1)
   .load("/mnt/raj-zuk-comparis-poc/messages*.json"))
flattened_df = (ds.withColumn("property_type", expr("attributes.type"))
        .withColumn("property_lift", expr("attributes.lift"))
        .withColumn("property_garden", expr("attributes.garden"))
        .withColumn("property_heating", expr("attributes.heating"))
        .withColumn("property_washing_machine", expr("attributes.washing_machine"))
        .withColumn("property_floor", expr("attributes.floor"))
        .withColumn("property_year_of_construction", expr("attributes.year_of_construction"))
        )
tumbling_df = (df_a44ed14d77a096c5197933f1e02b7a47
        .groupBy(window(col("timestamp"), "4 hour"), col("source"), col("hash_lat_lng"))
        .agg(max("timestamp").alias("timestamp"), 
        first("address1").alias("address1"),
        first("address2").alias("address2"),
        first("city").alias("city"),
        first("state").alias("state"),
        first("postalCode").alias("postalCode"),
        first("price").alias("price"),
        first("size").alias("size"),
        first("rooms").alias("rooms"),
        first("property_type").alias("property_type"),
        first("property_lift").alias("property_lift"),
        first("property_garden").alias("property_garden"),
        first("property_heating").alias("property_heating"),
        first("property_washing_machine").alias("property_washing_machine"),
        first("property_floor").alias("property_floor"),
        first("property_year_of_construction").alias("property_year_of_construction"),   
          
        first("source_premium").alias("source_premium"))
        .orderBy(col("window.start"))
       )
After this i am not sure how to write this ? could you please advise what should be correct way to write or solve this problem please?