cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Forum Posts

UmaMahesh1
by Honored Contributor III
  • 1606 Views
  • 1 replies
  • 2 kudos

Checkpoint issue when loading data from confluent kafka

I have a streaming notebook which fetches messages from confluent Kafka topic and loads them into adls. It is a streaming notebook with the trigger as continuous processing. Before loading the message (which is in Avro format), I'm flattening out the...

  • 1606 Views
  • 1 replies
  • 2 kudos
Latest Reply
Avinash_94
New Contributor III
  • 2 kudos

Best approach is to not to depend on Kafka’s commit mechanism! We can store processing result and message offset to external data store in the same database transaction. So, if the database transaction fails, both commit and processing will fail and ...

  • 2 kudos
Fed
by New Contributor III
  • 5899 Views
  • 1 replies
  • 0 kudos

Setting checkpoint directory for checkpointInterval argument of estimators in pyspark.ml

Tree-based estimators in pyspark.ml have an argument called checkpointIntervalcheckpointInterval = Param(parent='undefined', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will ...

  • 5899 Views
  • 1 replies
  • 0 kudos
Latest Reply
Anonymous
Not applicable
  • 0 kudos

@Federico Trifoglio​ :If sc.getCheckpointDir() returns None, it means that no checkpoint directory is set in the SparkContext. In this case, the checkpointInterval argument will indeed be ignored. To set a checkpoint directory, you can use the SparkC...

  • 0 kudos
mriccardi
by New Contributor II
  • 2648 Views
  • 1 replies
  • 0 kudos

Structured Streaming Checkpoint corrupted.

Hello,We are experiencing an error with one Structured Streaming Job that we have, that basically the checkpoint gets corrupted and we are unable to continue with the execution.I've checked the errors and this happens when it triggers an autocompact,...

  • 2648 Views
  • 1 replies
  • 0 kudos
Latest Reply
jose_gonzalez
Moderator
  • 0 kudos

Hi @Martin Riccardi​,Could you share the following please:1) whats your Source?2) whats your Sink?3) could you share your readStream() and writeStream() code?4) full error stack trace5) did you stop and re-run your query after weeks of not being acti...

  • 0 kudos
hello_world
by New Contributor III
  • 2794 Views
  • 3 replies
  • 2 kudos

What exact difference does Auto Loader make?

New to Databricks and here is one thing that confuses me.Since Spark Streaming is already capable of incremental loading by checkpointing. What difference does it make by enabling Auto Loader?

  • 2794 Views
  • 3 replies
  • 2 kudos
Latest Reply
Meghala
Valued Contributor II
  • 2 kudos

Auto Loader provides a Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files i...

  • 2 kudos
2 More Replies
alejandrofm
by Valued Contributor
  • 1913 Views
  • 2 replies
  • 5 kudos

Resolved! How to set a global checkpoint for all notebooks?

I have several users doing data analysis on Databricks Spark notebooks, everything is smooth, now I want to make sure that the checkpointdir is configured on the cluster start, so every user doesn't had to set it on the Notebook (ending up in a lot o...

image
  • 1913 Views
  • 2 replies
  • 5 kudos
Latest Reply
Kaniz_Fatma
Community Manager
  • 5 kudos

Hi @Alejandro Martinez​ , Just a friendly follow-up. Do you still need help, or @Hubert Dudek (Customer)​ 's response help you to find the solution? Please let us know.

  • 5 kudos
1 More Replies
alejandrofm
by Valued Contributor
  • 4548 Views
  • 7 replies
  • 9 kudos

Resolved! Pandas.spark.checkpoint() doesn't broke lineage

Hi, I'm doing some something simple on Databricks notebook:spark.sparkContext.setCheckpointDir("/tmp/")   import pyspark.pandas as ps   sql=("""select field1, field2 From table Where date>='2021-01.01""")   df = ps.sql(sql) df.spark.checkpoint()That...

  • 4548 Views
  • 7 replies
  • 9 kudos
Latest Reply
Hubert-Dudek
Esteemed Contributor III
  • 9 kudos

If you need checkpointing, please try the below code. Thanks to persist, you will avoid reprocessing:df = ps.sql(sql).persist() df.spark.checkpoint()

  • 9 kudos
6 More Replies
_Orc
by New Contributor
  • 2424 Views
  • 2 replies
  • 1 kudos

Resolved! Checkpoint is getting created even the though the microbatch append has failed

Use caseRead data from source table using structured spark streaming(Round the clock).Apply transformation logic etc etc and finally merge the dataframe in the target table.If there is any failure during transformation or merge ,databricks job should...

  • 2424 Views
  • 2 replies
  • 1 kudos
Latest Reply
Anonymous
Not applicable
  • 1 kudos

Hi @Om Singh​ Hope you are doing well. Just wanted to check in and see if you were able to find a solution to your question?Cheers

  • 1 kudos
1 More Replies
RohanB
by New Contributor III
  • 4002 Views
  • 8 replies
  • 3 kudos

Resolved! Spark Streaming - Checkpoint State EOF Exception

I have a Spark Structured Streaming job which reads from 2 Delta tables in streams , processes the data and then writes to a 3rd Delta table. The job is being run with the Databricks service on GCP.Sometimes the job fails with the following exception...

  • 4002 Views
  • 8 replies
  • 3 kudos
Latest Reply
RohanB
New Contributor III
  • 3 kudos

Hi @Jose Gonzalez​ ,Do you require any more information regarding the code? Any idea what could be cause for the issue?Thanks and Regards,Rohan

  • 3 kudos
7 More Replies
BorislavBlagoev
by Valued Contributor III
  • 4328 Views
  • 5 replies
  • 4 kudos

Resolved! Databricks writeStream checkpoint

I'm trying to execute this writeStream data_frame.writeStream.format("delta") \ .option("checkpointLocation", checkpoint_path) \ .trigger(processingTime="1 second") \ .option("mergeSchema", "true") \ .o...

  • 4328 Views
  • 5 replies
  • 4 kudos
Latest Reply
Hubert-Dudek
Esteemed Contributor III
  • 4 kudos

You can remove that folder so it will be recreated automatically.Additionally every new job run should have new (or just empty) checkpoint location.You can add in your code before running streaming:dbutils.fs.rm(checkpoint_path, True)Additionally you...

  • 4 kudos
4 More Replies
User16752240150
by New Contributor II
  • 5133 Views
  • 1 replies
  • 0 kudos

When to use cache vs checkpoint?

I've seen .cache() and .checkpoint() used similarly in some workflows I've come across. What's the difference, and when should I use one over the other?

  • 5133 Views
  • 1 replies
  • 0 kudos
Latest Reply
Srikanth_Gupta_
Valued Contributor
  • 0 kudos

Caching is extremely useful than checkpointing when you have lot of available memory to store your RDD or Dataframes if they are massive.Caching will maintain the result of your transformations so that those transformations will not have to be recomp...

  • 0 kudos
User16826994223
by Honored Contributor III
  • 1257 Views
  • 2 replies
  • 0 kudos

Don't want checkpoint in delta

Suppose I am not interested in checkpoints, how can I disable Checkpoints write in delta

  • 1257 Views
  • 2 replies
  • 0 kudos
Latest Reply
sajith_appukutt
Honored Contributor II
  • 0 kudos

Writing statistics in a checkpoint has a cost which is visible usually only for very large tables. However it is worth mentioning that, this statistics would be very useful for data skipping which speeds up subsequent operations. In Databricks Runti...

  • 0 kudos
1 More Replies
User16826992666
by Valued Contributor
  • 3698 Views
  • 2 replies
  • 0 kudos
  • 3698 Views
  • 2 replies
  • 0 kudos
Latest Reply
sajith_appukutt
Honored Contributor II
  • 0 kudos

If the read stream definition has something similar to val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .option("startingOffsets", "earliest")resettin...

  • 0 kudos
1 More Replies
Labels