<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Structured Streaming in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/structured-streaming/m-p/15378#M9709</link>
    <description>&lt;P&gt;I need some solution for below problem.&lt;/P&gt;&lt;P&gt;We have set of json files which are keep coming to aws s3, these files contains details for a property . please note 1 property can have 10-12 rows in this json file. Attached is sample json file.&lt;/P&gt;&lt;P&gt;We need to read these files as stream and then we need to create compacted view of 1 property , which means take all rows for that property and merge and create a single row per property . once that is done , we can write that stream to delta/ documentDB/ DynamoDB.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I have tried below&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):&lt;/P&gt;&lt;P&gt;&amp;nbsp;ACCESS_KEY_ID = access_key&lt;/P&gt;&lt;P&gt;&amp;nbsp;SECRET_ACCESS_KEY = secret_key&lt;/P&gt;&lt;P&gt;&amp;nbsp;ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;print ("Mounting", bucket_name)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;try:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;# Unmount the data in case it was already mounted.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;dbutils.fs.unmount("/mnt/%s" % mount_folder)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;except:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;# If it fails to unmount it most likely wasn't mounted in the first place&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;print ("Directory not unmounted: ", mount_folder)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;finally:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;# Lastly, mount our bucket.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;#dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;# Set AWS programmatic access credentials&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import *&lt;/P&gt;&lt;P&gt;from pyspark.sql.types import *&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;spark = SparkSession.builder.appName('Comparis-data-stream-app').getOrCreate()&lt;/P&gt;&lt;P&gt;print('Session created')&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;# JSONschema = StructType([&amp;nbsp;&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("username",StringType(),True),&amp;nbsp;&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("currency",StringType(),True),&amp;nbsp;&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("amount",LongType(),True),&amp;nbsp;&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;])&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;JSONschema = StructType([&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("id", StringType(), False),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("address1", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("address2", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("city", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("state", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("postalCode", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("price", IntegerType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("size", IntegerType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("rooms", IntegerType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("lat", DecimalType(), True , None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("lng", DecimalType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("hash_lat_lng", StringType(), True , None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("source", StringType(), True , None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("source_premium", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("timestamp", TimestampType(), True , None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("attributes", StructType([&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("type", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("lift", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("garden", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("heating", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("washing_machine", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("floor", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("year_of_construction", IntegerType())&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;]))])&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;ds = (spark.readStream&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.schema(JSONschema)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.format("json")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.option("maxFilesPerTrigger", 1)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.load("/mnt/raj-zuk-comparis-poc/messages*.json"))&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;flattened_df = (ds.withColumn("property_type", expr("attributes.type"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_lift", expr("attributes.lift"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_garden", expr("attributes.garden"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_heating", expr("attributes.heating"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_washing_machine", expr("attributes.washing_machine"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_floor", expr("attributes.floor"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_year_of_construction", expr("attributes.year_of_construction"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;tumbling_df = (df_a44ed14d77a096c5197933f1e02b7a47&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.groupBy(window(col("timestamp"), "4 hour"), col("source"), col("hash_lat_lng"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.agg(max("timestamp").alias("timestamp"),&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("address1").alias("address1"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("address2").alias("address2"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("city").alias("city"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("state").alias("state"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("postalCode").alias("postalCode"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("price").alias("price"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("size").alias("size"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("rooms").alias("rooms"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_type").alias("property_type"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_lift").alias("property_lift"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_garden").alias("property_garden"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_heating").alias("property_heating"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_washing_machine").alias("property_washing_machine"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_floor").alias("property_floor"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_year_of_construction").alias("property_year_of_construction"),&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("source_premium").alias("source_premium"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.orderBy(col("window.start"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;After this i am not sure how to write this ? could you please advise what should be correct way to write or solve this problem please?&lt;/P&gt;</description>
    <pubDate>Wed, 21 Dec 2022 12:43:11 GMT</pubDate>
    <dc:creator>sudhanshu1</dc:creator>
    <dc:date>2022-12-21T12:43:11Z</dc:date>
    <item>
      <title>Structured Streaming</title>
      <link>https://community.databricks.com/t5/data-engineering/structured-streaming/m-p/15378#M9709</link>
      <description>&lt;P&gt;I need some solution for below problem.&lt;/P&gt;&lt;P&gt;We have set of json files which are keep coming to aws s3, these files contains details for a property . please note 1 property can have 10-12 rows in this json file. Attached is sample json file.&lt;/P&gt;&lt;P&gt;We need to read these files as stream and then we need to create compacted view of 1 property , which means take all rows for that property and merge and create a single row per property . once that is done , we can write that stream to delta/ documentDB/ DynamoDB.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I have tried below&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):&lt;/P&gt;&lt;P&gt;&amp;nbsp;ACCESS_KEY_ID = access_key&lt;/P&gt;&lt;P&gt;&amp;nbsp;SECRET_ACCESS_KEY = secret_key&lt;/P&gt;&lt;P&gt;&amp;nbsp;ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;print ("Mounting", bucket_name)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;try:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;# Unmount the data in case it was already mounted.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;dbutils.fs.unmount("/mnt/%s" % mount_folder)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;except:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;# If it fails to unmount it most likely wasn't mounted in the first place&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;print ("Directory not unmounted: ", mount_folder)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;finally:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;# Lastly, mount our bucket.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;#dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;# Set AWS programmatic access credentials&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import *&lt;/P&gt;&lt;P&gt;from pyspark.sql.types import *&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;spark = SparkSession.builder.appName('Comparis-data-stream-app').getOrCreate()&lt;/P&gt;&lt;P&gt;print('Session created')&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;# JSONschema = StructType([&amp;nbsp;&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("username",StringType(),True),&amp;nbsp;&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("currency",StringType(),True),&amp;nbsp;&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("amount",LongType(),True),&amp;nbsp;&lt;/P&gt;&lt;P&gt;#&amp;nbsp;&amp;nbsp;])&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;JSONschema = StructType([&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("id", StringType(), False),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("address1", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("address2", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("city", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("state", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("postalCode", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("price", IntegerType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("size", IntegerType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("rooms", IntegerType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("lat", DecimalType(), True , None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("lng", DecimalType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("hash_lat_lng", StringType(), True , None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("source", StringType(), True , None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("source_premium", StringType(), True, None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("timestamp", TimestampType(), True , None),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("attributes", StructType([&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("type", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("lift", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("garden", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("heating", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("washing_machine", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("floor", StringType()),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;StructField("year_of_construction", IntegerType())&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;]))])&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;ds = (spark.readStream&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.schema(JSONschema)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.format("json")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.option("maxFilesPerTrigger", 1)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.load("/mnt/raj-zuk-comparis-poc/messages*.json"))&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;flattened_df = (ds.withColumn("property_type", expr("attributes.type"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_lift", expr("attributes.lift"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_garden", expr("attributes.garden"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_heating", expr("attributes.heating"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_washing_machine", expr("attributes.washing_machine"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_floor", expr("attributes.floor"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.withColumn("property_year_of_construction", expr("attributes.year_of_construction"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;tumbling_df = (df_a44ed14d77a096c5197933f1e02b7a47&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.groupBy(window(col("timestamp"), "4 hour"), col("source"), col("hash_lat_lng"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.agg(max("timestamp").alias("timestamp"),&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("address1").alias("address1"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("address2").alias("address2"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("city").alias("city"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("state").alias("state"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("postalCode").alias("postalCode"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("price").alias("price"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("size").alias("size"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("rooms").alias("rooms"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_type").alias("property_type"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_lift").alias("property_lift"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_garden").alias("property_garden"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_heating").alias("property_heating"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_washing_machine").alias("property_washing_machine"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_floor").alias("property_floor"),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("property_year_of_construction").alias("property_year_of_construction"),&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;first("source_premium").alias("source_premium"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;.orderBy(col("window.start"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;After this i am not sure how to write this ? could you please advise what should be correct way to write or solve this problem please?&lt;/P&gt;</description>
      <pubDate>Wed, 21 Dec 2022 12:43:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/structured-streaming/m-p/15378#M9709</guid>
      <dc:creator>sudhanshu1</dc:creator>
      <dc:date>2022-12-21T12:43:11Z</dc:date>
    </item>
  </channel>
</rss>

