HI all,
I have a table in MongoDB Atlas that I am trying to read continuously to memory and then will write that file out eventually. However, when I look at the in-memory table it doesn't have the correct schema.
Code here:
from pyspark.sql.types import StructType, LongType, StringType, IntegerType
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("pdm_messagesStream") \
.config("spark.jars", "org.mongodb.spark:mongo-spark-connector:10.0.5") \
.getOrCreate()
readSchema = (StructType() \
.add('_id', StringType()) \
.add('deviceToken', StringType()) \
.add('message', StringType()) \
.add('messageId', StringType()) \
.add('createdAt', StringType()) \
.add('createdAtEpochSeconds', StringType())
)
dataStreamWriter = (spark.readStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", "mongodb+srv://xxxx@***/?retryWrites=true&readPreference=secondary&readPreferenceTags=nodeType:ANALYTICS&w=majority") \
.option('spark.mongodb.database', "data") \
.option('spark.mongodb.collection', "messages") \
.option("forceDeleteTempCheckpointLocation", "true") \
.schema(readSchema)
.load() \
.writeStream \
.format('memory') \
.queryName("messages") \
.trigger(continuous="1 second")
)
query = dataStreamWriter.start()
result from spark.table("pdm_messages").show(truncate=False):
Any help would be greatly appreciated.
Thanks
Sharon