Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
Hi, I'm doing some something simple on Databricks notebook:spark.sparkContext.setCheckpointDir("/tmp/")
import pyspark.pandas as ps
sql=("""select
field1, field2
From table
Where date>='2021-01.01""")
df = ps.sql(sql)
df.spark.checkpoint()That...
I have a streaming notebook which fetches messages from confluent Kafka topic and loads them into adls. It is a streaming notebook with the trigger as continuous processing. Before loading the message (which is in Avro format), I'm flattening out the...
Best approach is to not to depend on Kafka’s commit mechanism! We can store processing result and message offset to external data store in the same database transaction. So, if the database transaction fails, both commit and processing will fail and ...
Tree-based estimators in pyspark.ml have an argument called checkpointIntervalcheckpointInterval = Param(parent='undefined', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will ...
@Federico Trifoglio :If sc.getCheckpointDir() returns None, it means that no checkpoint directory is set in the SparkContext. In this case, the checkpointInterval argument will indeed be ignored. To set a checkpoint directory, you can use the SparkC...
Hello,We are experiencing an error with one Structured Streaming Job that we have, that basically the checkpoint gets corrupted and we are unable to continue with the execution.I've checked the errors and this happens when it triggers an autocompact,...
Hi @Martin Riccardi,Could you share the following please:1) whats your Source?2) whats your Sink?3) could you share your readStream() and writeStream() code?4) full error stack trace5) did you stop and re-run your query after weeks of not being acti...
New to Databricks and here is one thing that confuses me.Since Spark Streaming is already capable of incremental loading by checkpointing. What difference does it make by enabling Auto Loader?
Auto Loader provides a Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files i...
I have several users doing data analysis on Databricks Spark notebooks, everything is smooth, now I want to make sure that the checkpointdir is configured on the cluster start, so every user doesn't had to set it on the Notebook (ending up in a lot o...
@Alejandro Martinez , For streaming jobs, there are, but others couldn't find them. Here are spark conf Configuration - Spark 3.2.1 Documentation (apache.org)spark.sql.streaming.checkpointLocation
Use caseRead data from source table using structured spark streaming(Round the clock).Apply transformation logic etc etc and finally merge the dataframe in the target table.If there is any failure during transformation or merge ,databricks job should...
I have a Spark Structured Streaming job which reads from 2 Delta tables in streams , processes the data and then writes to a 3rd Delta table. The job is being run with the Databricks service on GCP.Sometimes the job fails with the following exception...
You can remove that folder so it will be recreated automatically.Additionally every new job run should have new (or just empty) checkpoint location.You can add in your code before running streaming:dbutils.fs.rm(checkpoint_path, True)Additionally you...
I've seen .cache() and .checkpoint() used similarly in some workflows I've come across. What's the difference, and when should I use one over the other?
Caching is extremely useful than checkpointing when you have lot of available memory to store your RDD or Dataframes if they are massive.Caching will maintain the result of your transformations so that those transformations will not have to be recomp...
Writing statistics in a checkpoint has a cost which is visible usually only for very large tables. However it is worth mentioning that, this statistics would be very useful for data skipping which speeds up subsequent operations. In Databricks Runti...
If the read stream definition has something similar to val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")resettin...