cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Using streaming data received from Pub/sub topic

sumitdesai
New Contributor II

I have a notebook in Databricks in which I am streaming a Pub/sub topic. The code for this looks like following-

%pip install --upgrade google-cloud-pubsub[pandas]    
from pyspark.sql import SparkSession

authOptions={"clientId" : "123","clientEmail" : "123@project-id.iam.gserviceaccount.com", "privateKey" : "-----BEGIN PRIVATE KEY-----1234-----END PRIVATE KEY-----\n","privateKeyId" : "1234"}    
stream=spark.readStream.format("pubsub").option("subscriptionId","firstfuel-reporting-test-subscription").option("topicId","firstfuel-reporting-test").option("projectId","project-id").options(**authOptions).load()
decodedStream = stream.withColumn("decodedData", stream["payload"].cast("string"))
result = decodedStream.writeStream.outputMode("append").format("console").start()

When I run this, I can see that streaming starts successfully and any mesages published on the Pub/sub topic are acknowledged right away. But ,I am not able to see exact payload printed on console. How can I do that. If I have to use received messages for any other purpose, how can I do that? I am attaching a view of what I am seeing after streaming starts below-

 

1 REPLY 1

Kaniz_Fatma
Community Manager
Community Manager

Hi @sumitdesai

  1. Console Sink Issue: The Console sink does not work in Databricks as you would expect it to work in your local IDE or when submitting it to your cluster. Unfortunately, the Console sink doesn’t display the results in the same way. However, there’s an alternative approach you can use to view the streaming data.

  2. Alternative Approach: To see the streaming data in Databricks, you can use the display function provided by Databricks. Here’s how you can modify your code:

    # Assuming you have already read the stream into 'decodedStream'
    decodedStream.writeStream.outputMode("append").format("memory").queryName("myStream").start()
    
    # Now use 'display' to view the streaming data
    display(spark.table("myStream"))
    

    The display function will show the streaming data in a tabular format, which is more convenient for debugging and exploration.

  3. Using Received Messages: If you want to process the received messages for other purposes (e.g., further analysis, transformation, or saving to another sink), you can add additional logic after reading the stream. For example:

    # Assuming you have already read the stream into 'decodedStream'
    # Process the 'decodedData' column as needed (e.g., filter, transform, etc.)
    processedStream = decodedStream.select("decodedData")
    
    # Now you can write the processed stream to another sink (e.g., Parquet, Delta, etc.)
    processedStream.writeStream.outputMode("append").format("parquet").option("path", "/path/to/processed_data").start()
    

    Replace /path/to/processed_data with the desired location for saving the processed data.

Remember that Databricks provides powerful tools for visualizing and analyzing streaming data, so explore the display function and adapt your code accordingly. Happy streaming! 🚀

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group