Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
I have a simple job scheduled every 5 min. Basically it listens to cloudfiles on storage account and writes them into delta table, extremely simple. The code is something like this:df = (spark
.readStream
.format("cloudFiles")
.option('cloudFil...
I have a delta table that is partitioned by Year, Date and month. I'm trying to merge data to this on all three partition columns + an extra column (an ID). My merge statement is below:MERGE INTO delta.<path of delta table> oldData
using df newData ...
Isn't the suggested idea only filtering the input dataframe (resulting in a smaller amount of data to match across the whole delta table) rather than prune the delta table for relevant partitions to scan?
I'm reading data into a dataframe withdf = spark.read.json("s3://somepath/")I've tried first creating a delta table using the DeltaTable API with:DeltaTable.createIfNotExists(spark)\
.location(target_path)\
.addColumns(df.sche...
Hi Everyone, I am using the below sql query to generate the days in order in hive & it is working fine. The table got migrated to delta and my query is failing. It would be appreciated if someone helps me to figure out the issue.SQL Query :with ex...
Hello all,Background:I am having an issue today with databricks using pyspark-sql and writing a delta table. The dataframe is made by doing an inner join between two tables and that is the table which I am trying to write to a delta table. The table ...
I have a PySpark streaming pipeline which reads data from a Kafka topic, data undergoes thru various transformations and finally gets merged into a databricks delta table. In the beginning we were loading data into the delta table by using the merge ...
@bobbysidhartha :When merging data into a partitioned Delta table in parallel, it is important to ensure that each job only accesses and modifies the files in its own partition to avoid concurrency issues. One way to achieve this is to use partition...
The previous two answers did not work for me (DBX 15.4).I found a hacky way using the delta log: find latest (group of) checkpoint (parquet) file(s) in delta log and use it as source prefix `000000000000xxxxxxx.checkpoint`:SELECT partition_column_1,...
A deltaTable.dropDuplicates(columns) would be a very nice feature, simplifying the complex procedures that are suggested online. Or am I missing any existing procedures that can be done withouth merge operations or similar?
I created a feature request in the delta table project: [Feature Request] data deduplication on existing delta table · Issue #1767 · delta-io/delta (github.com)
I was creating delta table from ADLS json input file. but the job was running long while creating delta table from json. Below is my cluster configuration. Is the issue related to cluster config ? Do I need to upgrade the cluster config ?The cluster ...
I am trying to import a table from oracle which has around 1.3 mill rows and one of the column is a Blob, the total size of data on oracle is around 250+ GB. read and save to S3 as delta table is taking around 60 min. I tried with parallel(200 thread...
Hello experts. We are trying to clarify how to clean up the large amount of files that are being accumulated in the _delta_log folder (json, crc and checkpoint files). We went through the related posts in the forum and followed the below:SET spark.da...
Hi All, I have a scenario where my Exisiting Delta Table looks like below:Now I have an incremental data with an additional column i.e. owner:Dataframe Name --> scdDFBelow is the code snippet to merge Incremental Dataframe to targetTable, but the new...
In Databricks Runtime 15.2 and above, you can specify schema evolution in a merge statement using SQL or Delta table APIs:MERGE WITH SCHEMA EVOLUTION INTO targetUSING sourceON source.key = target.keyWHEN MATCHED THENUPDATE SET *WHEN NOT MATCHED THENI...
I am trying to save a dataframe after a series of data manipulations using Udf functions to a delta table. I tried using this code( df .write .format('delta') .mode('overwrite') .option('overwriteSchema', 'true') .saveAsTable('output_table'))but this...
You should also look into the sql plan if the writing phase is indeed the part that is taking time. Since spark works on lazy evaluation, there might be some other phase that might be taking time
Hi, I want to keep track of the streaming lag from the source table, which is a delta table. I see that in query progress logs, there is some information about the last version and the last file in the version for the end offset, but this don't give ...
Hey @Yerachmiel Feltzman I hope all is well.Just wanted to check in if you were able to resolve your issue or do you need more help? We'd love to hear from you.Thanks!
Does databricks have support for writing to same Delta Table from multiple clusters concurrently. I am specifically interested to know if there is any solution for https://github.com/delta-io/delta/issues/41 implemented in databricks OR if you have a...
Please note, the issue noted above [Storage System] Support for AWS S3 (multiple clusters/drivers/JVMs) is for Delta Lake OSS. As noted in this issue as well as Issue 324, as of this writing, S3 lacks putIfAbsent transactional consistency. For Del...