cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Structured Streaming

sudhanshu1
New Contributor III

I need some solution for below problem.

We have set of json files which are keep coming to aws s3, these files contains details for a property . please note 1 property can have 10-12 rows in this json file. Attached is sample json file.

We need to read these files as stream and then we need to create compacted view of 1 property , which means take all rows for that property and merge and create a single row per property . once that is done , we can write that stream to delta/ documentDB/ DynamoDB.

I have tried below

def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):

 ACCESS_KEY_ID = access_key

 SECRET_ACCESS_KEY = secret_key

 ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")

 print ("Mounting", bucket_name)

 try:

  # Unmount the data in case it was already mounted.

  dbutils.fs.unmount("/mnt/%s" % mount_folder)

   

 except:

  # If it fails to unmount it most likely wasn't mounted in the first place

  print ("Directory not unmounted: ", mount_folder)

   

 finally:

  # Lastly, mount our bucket.

  dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)

  #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)

  print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")

  # Set AWS programmatic access credentials

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *

spark = SparkSession.builder.appName('Comparis-data-stream-app').getOrCreate()

print('Session created')

# JSONschema = StructType([ 

#   StructField("username",StringType(),True), 

#   StructField("currency",StringType(),True), 

#   StructField("amount",LongType(),True), 

#  ])

JSONschema = StructType([

    StructField("id", StringType(), False),

    StructField("address1", StringType(), True, None),

    StructField("address2", StringType(), True, None),

    StructField("city", StringType(), True, None),

    StructField("state", StringType(), True, None),

    StructField("postalCode", StringType(), True, None),

    StructField("price", IntegerType(), True, None),

    StructField("size", IntegerType(), True, None),

    StructField("rooms", IntegerType(), True, None),

    StructField("lat", DecimalType(), True , None),

    StructField("lng", DecimalType(), True, None),

    StructField("hash_lat_lng", StringType(), True , None),

    StructField("source", StringType(), True , None),

    StructField("source_premium", StringType(), True, None),

    StructField("timestamp", TimestampType(), True , None),

    StructField("attributes", StructType([

      StructField("type", StringType()),

      StructField("lift", StringType()),

      StructField("garden", StringType()),

      StructField("heating", StringType()),

      StructField("washing_machine", StringType()),

      StructField("floor", StringType()),

      StructField("year_of_construction", IntegerType())

    ]))])

ds = (spark.readStream

   .schema(JSONschema)

   .format("json")

   .option("maxFilesPerTrigger", 1)

   .load("/mnt/raj-zuk-comparis-poc/messages*.json"))

flattened_df = (ds.withColumn("property_type", expr("attributes.type"))

        .withColumn("property_lift", expr("attributes.lift"))

        .withColumn("property_garden", expr("attributes.garden"))

        .withColumn("property_heating", expr("attributes.heating"))

        .withColumn("property_washing_machine", expr("attributes.washing_machine"))

        .withColumn("property_floor", expr("attributes.floor"))

        .withColumn("property_year_of_construction", expr("attributes.year_of_construction"))

        )

tumbling_df = (df_a44ed14d77a096c5197933f1e02b7a47

        .groupBy(window(col("timestamp"), "4 hour"), col("source"), col("hash_lat_lng"))

        .agg(max("timestamp").alias("timestamp"), 

        first("address1").alias("address1"),

        first("address2").alias("address2"),

        first("city").alias("city"),

        first("state").alias("state"),

        first("postalCode").alias("postalCode"),

        first("price").alias("price"),

        first("size").alias("size"),

        first("rooms").alias("rooms"),

        first("property_type").alias("property_type"),

        first("property_lift").alias("property_lift"),

        first("property_garden").alias("property_garden"),

        first("property_heating").alias("property_heating"),

        first("property_washing_machine").alias("property_washing_machine"),

        first("property_floor").alias("property_floor"),

        first("property_year_of_construction").alias("property_year_of_construction"),   

          

        first("source_premium").alias("source_premium"))

        .orderBy(col("window.start"))

       )

After this i am not sure how to write this ? could you please advise what should be correct way to write or solve this problem please?

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.