<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Structered Streamin from MongoDB Atlas not parsing JSON correctly in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/structered-streamin-from-mongodb-atlas-not-parsing-json/m-p/22046#M15064</link>
    <description>&lt;P&gt;HI all,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I have a table in MongoDB Atlas that I am trying to read continuously to memory and then will write that file out eventually.  However, when I look at the in-memory table it doesn't have the correct schema.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Code here:&lt;/P&gt;&lt;P&gt;from pyspark.sql.types import StructType, LongType, StringType, IntegerType&lt;/P&gt;&lt;P&gt;from pyspark import SparkContext&lt;/P&gt;&lt;P&gt;from pyspark.streaming import StreamingContext&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import *&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;spark = SparkSession.builder \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .appName("pdm_messagesStream") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .config("spark.jars", "org.mongodb.spark:mongo-spark-connector:10.0.5") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .getOrCreate()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;readSchema = (StructType() \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('_id', StringType()) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('deviceToken', StringType()) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('message', StringType()) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('messageId', StringType()) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('createdAt', StringType()) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('createdAtEpochSeconds', StringType()) &lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;dataStreamWriter = (spark.readStream \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .format("mongodb") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .option("spark.mongodb.connection.uri", "mongodb+srv://xxxx@***/?retryWrites=true&amp;amp;readPreference=secondary&amp;amp;readPreferenceTags=nodeType:ANALYTICS&amp;amp;w=majority") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .option('spark.mongodb.database', "data") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .option('spark.mongodb.collection', "messages") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .option("forceDeleteTempCheckpointLocation", "true") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .schema(readSchema)&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .load() \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .writeStream \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .format('memory') \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .queryName("messages") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .trigger(continuous="1 second") &lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;query = dataStreamWriter.start() &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;result from spark.table("pdm_messages").show(truncate=False):&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="image.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/1182iD469F4FA669231EB/image-size/large?v=v2&amp;amp;px=999" role="button" title="image.png" alt="image.png" /&gt;&lt;/span&gt;Any help would be greatly appreciated.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;&lt;P&gt;Sharon&lt;/P&gt;</description>
    <pubDate>Wed, 16 Nov 2022 12:17:29 GMT</pubDate>
    <dc:creator>sharonbjehome</dc:creator>
    <dc:date>2022-11-16T12:17:29Z</dc:date>
    <item>
      <title>Structered Streamin from MongoDB Atlas not parsing JSON correctly</title>
      <link>https://community.databricks.com/t5/data-engineering/structered-streamin-from-mongodb-atlas-not-parsing-json/m-p/22046#M15064</link>
      <description>&lt;P&gt;HI all,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I have a table in MongoDB Atlas that I am trying to read continuously to memory and then will write that file out eventually.  However, when I look at the in-memory table it doesn't have the correct schema.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Code here:&lt;/P&gt;&lt;P&gt;from pyspark.sql.types import StructType, LongType, StringType, IntegerType&lt;/P&gt;&lt;P&gt;from pyspark import SparkContext&lt;/P&gt;&lt;P&gt;from pyspark.streaming import StreamingContext&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;/P&gt;&lt;P&gt;from pyspark.sql.functions import *&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;spark = SparkSession.builder \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .appName("pdm_messagesStream") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .config("spark.jars", "org.mongodb.spark:mongo-spark-connector:10.0.5") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .getOrCreate()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;readSchema = (StructType() \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('_id', StringType()) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('deviceToken', StringType()) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('message', StringType()) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('messageId', StringType()) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('createdAt', StringType()) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .add('createdAtEpochSeconds', StringType()) &lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;dataStreamWriter = (spark.readStream \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .format("mongodb") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .option("spark.mongodb.connection.uri", "mongodb+srv://xxxx@***/?retryWrites=true&amp;amp;readPreference=secondary&amp;amp;readPreferenceTags=nodeType:ANALYTICS&amp;amp;w=majority") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .option('spark.mongodb.database', "data") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .option('spark.mongodb.collection', "messages") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .option("forceDeleteTempCheckpointLocation", "true") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .schema(readSchema)&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .load() \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .writeStream \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .format('memory') \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .queryName("messages") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .trigger(continuous="1 second") &lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;query = dataStreamWriter.start() &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;result from spark.table("pdm_messages").show(truncate=False):&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="image.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/1182iD469F4FA669231EB/image-size/large?v=v2&amp;amp;px=999" role="button" title="image.png" alt="image.png" /&gt;&lt;/span&gt;Any help would be greatly appreciated.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;&lt;P&gt;Sharon&lt;/P&gt;</description>
      <pubDate>Wed, 16 Nov 2022 12:17:29 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/structered-streamin-from-mongodb-atlas-not-parsing-json/m-p/22046#M15064</guid>
      <dc:creator>sharonbjehome</dc:creator>
      <dc:date>2022-11-16T12:17:29Z</dc:date>
    </item>
    <item>
      <title>Re: Structered Streamin from MongoDB Atlas not parsing JSON correctly</title>
      <link>https://community.databricks.com/t5/data-engineering/structered-streamin-from-mongodb-atlas-not-parsing-json/m-p/22047#M15065</link>
      <description>&lt;P&gt;Hi @sharonbjehome​&amp;nbsp;, This has to be checked thoroughly via a support ticket, did you follow&lt;/P&gt;&lt;P&gt;: &lt;A href="https://docs.databricks.com/external-data/mongodb.html" alt="https://docs.databricks.com/external-data/mongodb.html" target="_blank"&gt;https://docs.databricks.com/external-data/mongodb.html&lt;/A&gt; &lt;/P&gt;&lt;P&gt;Also, could you please check with mongodb support, Was this working before? &lt;/P&gt;</description>
      <pubDate>Fri, 18 Nov 2022 07:36:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/structered-streamin-from-mongodb-atlas-not-parsing-json/m-p/22047#M15065</guid>
      <dc:creator>Debayan</dc:creator>
      <dc:date>2022-11-18T07:36:04Z</dc:date>
    </item>
  </channel>
</rss>

