cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Structered Streamin from MongoDB Atlas not parsing JSON correctly

sharonbjehome
New Contributor

HI all,

I have a table in MongoDB Atlas that I am trying to read continuously to memory and then will write that file out eventually. However, when I look at the in-memory table it doesn't have the correct schema.

Code here:

from pyspark.sql.types import StructType, LongType, StringType, IntegerType

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

spark = SparkSession.builder \

    .appName("pdm_messagesStream") \

    .config("spark.jars", "org.mongodb.spark:mongo-spark-connector:10.0.5") \

    .getOrCreate()

readSchema = (StructType() \

    .add('_id', StringType()) \

    .add('deviceToken', StringType()) \

    .add('message', StringType()) \

    .add('messageId', StringType()) \

    .add('createdAt', StringType()) \

    .add('createdAtEpochSeconds', StringType())

)

dataStreamWriter = (spark.readStream \

    .format("mongodb") \

    .option("spark.mongodb.connection.uri", "mongodb+srv://xxxx@***/?retryWrites=true&readPreference=secondary&readPreferenceTags=nodeType:ANALYTICS&w=majority") \

    .option('spark.mongodb.database', "data") \

    .option('spark.mongodb.collection', "messages") \

    .option("forceDeleteTempCheckpointLocation", "true") \

    .schema(readSchema)

    .load() \

    .writeStream \

    .format('memory') \

    .queryName("messages") \

    .trigger(continuous="1 second")

)

query = dataStreamWriter.start()

result from spark.table("pdm_messages").show(truncate=False):

image.pngAny help would be greatly appreciated.

Thanks

Sharon

1 REPLY 1

Debayan
Esteemed Contributor III
Esteemed Contributor III

Hi @sharonbjehome​ , This has to be checked thoroughly via a support ticket, did you follow

: https://docs.databricks.com/external-data/mongodb.html

Also, could you please check with mongodb support, Was this working before?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.