Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
Showing results for 
Search instead for 
Did you mean: 

Structured Streaming

New Contributor III

I need some solution for below problem.

We have set of json files which are keep coming to aws s3, these files contains details for a property . please note 1 property can have 10-12 rows in this json file. Attached is sample json file.

We need to read these files as stream and then we need to create compacted view of 1 property , which means take all rows for that property and merge and create a single row per property . once that is done , we can write that stream to delta/ documentDB/ DynamoDB.

I have tried below

def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):

 ACCESS_KEY_ID = access_key

 SECRET_ACCESS_KEY = secret_key


 print ("Mounting", bucket_name)


  # Unmount the data in case it was already mounted.

  dbutils.fs.unmount("/mnt/%s" % mount_folder)



  # If it fails to unmount it most likely wasn't mounted in the first place

  print ("Directory not unmounted: ", mount_folder)



  # Lastly, mount our bucket.

  dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)

  #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)

  print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")

  # Set AWS programmatic access credentials

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *

spark = SparkSession.builder.appName('Comparis-data-stream-app').getOrCreate()

print('Session created')

# JSONschema = StructType([ 

#   StructField("username",StringType(),True), 

#   StructField("currency",StringType(),True), 

#   StructField("amount",LongType(),True), 

#  ])

JSONschema = StructType([

    StructField("id", StringType(), False),

    StructField("address1", StringType(), True, None),

    StructField("address2", StringType(), True, None),

    StructField("city", StringType(), True, None),

    StructField("state", StringType(), True, None),

    StructField("postalCode", StringType(), True, None),

    StructField("price", IntegerType(), True, None),

    StructField("size", IntegerType(), True, None),

    StructField("rooms", IntegerType(), True, None),

    StructField("lat", DecimalType(), True , None),

    StructField("lng", DecimalType(), True, None),

    StructField("hash_lat_lng", StringType(), True , None),

    StructField("source", StringType(), True , None),

    StructField("source_premium", StringType(), True, None),

    StructField("timestamp", TimestampType(), True , None),

    StructField("attributes", StructType([

      StructField("type", StringType()),

      StructField("lift", StringType()),

      StructField("garden", StringType()),

      StructField("heating", StringType()),

      StructField("washing_machine", StringType()),

      StructField("floor", StringType()),

      StructField("year_of_construction", IntegerType())


ds = (spark.readStream



   .option("maxFilesPerTrigger", 1)


flattened_df = (ds.withColumn("property_type", expr("attributes.type"))

        .withColumn("property_lift", expr("attributes.lift"))

        .withColumn("property_garden", expr(""))

        .withColumn("property_heating", expr("attributes.heating"))

        .withColumn("property_washing_machine", expr("attributes.washing_machine"))

        .withColumn("property_floor", expr("attributes.floor"))

        .withColumn("property_year_of_construction", expr("attributes.year_of_construction"))


tumbling_df = (df_a44ed14d77a096c5197933f1e02b7a47

        .groupBy(window(col("timestamp"), "4 hour"), col("source"), col("hash_lat_lng"))





















After this i am not sure how to write this ? could you please advise what should be correct way to write or solve this problem please?


Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now