cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Structured Streaming

sudhanshu1
New Contributor III

I need some solution for below problem.

We have set of json files which are keep coming to aws s3, these files contains details for a property . please note 1 property can have 10-12 rows in this json file. Attached is sample json file.

We need to read these files as stream and then we need to create compacted view of 1 property , which means take all rows for that property and merge and create a single row per property . once that is done , we can write that stream to delta/ documentDB/ DynamoDB.

I have tried below

def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):

 ACCESS_KEY_ID = access_key

 SECRET_ACCESS_KEY = secret_key

 ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")

 print ("Mounting", bucket_name)

 try:

  # Unmount the data in case it was already mounted.

  dbutils.fs.unmount("/mnt/%s" % mount_folder)

   

 except:

  # If it fails to unmount it most likely wasn't mounted in the first place

  print ("Directory not unmounted: ", mount_folder)

   

 finally:

  # Lastly, mount our bucket.

  dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)

  #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)

  print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")

  # Set AWS programmatic access credentials

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *

spark = SparkSession.builder.appName('Comparis-data-stream-app').getOrCreate()

print('Session created')

# JSONschema = StructType([ 

#   StructField("username",StringType(),True), 

#   StructField("currency",StringType(),True), 

#   StructField("amount",LongType(),True), 

#  ])

JSONschema = StructType([

    StructField("id", StringType(), False),

    StructField("address1", StringType(), True, None),

    StructField("address2", StringType(), True, None),

    StructField("city", StringType(), True, None),

    StructField("state", StringType(), True, None),

    StructField("postalCode", StringType(), True, None),

    StructField("price", IntegerType(), True, None),

    StructField("size", IntegerType(), True, None),

    StructField("rooms", IntegerType(), True, None),

    StructField("lat", DecimalType(), True , None),

    StructField("lng", DecimalType(), True, None),

    StructField("hash_lat_lng", StringType(), True , None),

    StructField("source", StringType(), True , None),

    StructField("source_premium", StringType(), True, None),

    StructField("timestamp", TimestampType(), True , None),

    StructField("attributes", StructType([

      StructField("type", StringType()),

      StructField("lift", StringType()),

      StructField("garden", StringType()),

      StructField("heating", StringType()),

      StructField("washing_machine", StringType()),

      StructField("floor", StringType()),

      StructField("year_of_construction", IntegerType())

    ]))])

ds = (spark.readStream

   .schema(JSONschema)

   .format("json")

   .option("maxFilesPerTrigger", 1)

   .load("/mnt/raj-zuk-comparis-poc/messages*.json"))

flattened_df = (ds.withColumn("property_type", expr("attributes.type"))

        .withColumn("property_lift", expr("attributes.lift"))

        .withColumn("property_garden", expr("attributes.garden"))

        .withColumn("property_heating", expr("attributes.heating"))

        .withColumn("property_washing_machine", expr("attributes.washing_machine"))

        .withColumn("property_floor", expr("attributes.floor"))

        .withColumn("property_year_of_construction", expr("attributes.year_of_construction"))

        )

tumbling_df = (df_a44ed14d77a096c5197933f1e02b7a47

        .groupBy(window(col("timestamp"), "4 hour"), col("source"), col("hash_lat_lng"))

        .agg(max("timestamp").alias("timestamp"), 

        first("address1").alias("address1"),

        first("address2").alias("address2"),

        first("city").alias("city"),

        first("state").alias("state"),

        first("postalCode").alias("postalCode"),

        first("price").alias("price"),

        first("size").alias("size"),

        first("rooms").alias("rooms"),

        first("property_type").alias("property_type"),

        first("property_lift").alias("property_lift"),

        first("property_garden").alias("property_garden"),

        first("property_heating").alias("property_heating"),

        first("property_washing_machine").alias("property_washing_machine"),

        first("property_floor").alias("property_floor"),

        first("property_year_of_construction").alias("property_year_of_construction"),   

          

        first("source_premium").alias("source_premium"))

        .orderBy(col("window.start"))

       )

After this i am not sure how to write this ? could you please advise what should be correct way to write or solve this problem please?

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group