cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Redefine ETL strategy with pypskar approach

databird
New Contributor II
Hey everyone!
I've some previous experience with Data Engineering, but totally new in Databricks and Delta Tables.
Starting this thread hoping to ask some questions and asking for help on how to design a process.
So I have essentially 2 delta tables (same structure):
1) customer_stg (id int, segment string, name string, status string), partitioned by segment, located in my delta lake in /DATA/STG/CUSTOMER
2) customer_prep (id int, segment string, name string, status string), partitioned by segment, located in my delta lake in /DATA/PREP/CUSTOMER
segment can take values like TL, TI, TO, PA

So I have 4 ETL processes that load data into the same staging, and I have to deal with this scenario, and so that's why I have both tables partitioned by "segment" (also for selecting data on a different scenario).
Also: they are running in sequence, but in future they might run in parallel - and this is important as you'll see.

So, what I have so far:

1) Extract info for TL segment and save in DeltaLake
2) Read this info into a dataframe (tl_sgmnt_df) that has just 4 columns: ID, SEGMENT, NAME, STATUS (essentially same structure as staging/prepared tables)
tl_sgmnt_df = orig_sgmnt_tl_data_df\
                    .withColumn("ID", F.col("orig_sgmnt_tl_data_df.ID") )\
                    .withColumn("SEGMENT", F.col("orig_sgmnt_tl_data_df.SEGMENT") )\
                    .withColumn("NAME", F.col("orig_sgmnt_tl_data_df.NAME")\
                    .withColumn("STATUS", F.col("N"))

3) Load Staging and Prepared delta tables into a dataframe each (customer_stg_delta_df, customer_prep_delta_df)
customer_stg_delta_df = spark.read.format("delta").load(dl_url + "/DATA/STG/CUSTOMER")
customer_prep_delta_df = spark.read.format("delta").load(dl_url + "/DATA/PREP/CUSTOMER")


4) Run a left_anti join between dataframe and staging table, by grabbing ALL data in the dataframe (that has essentially all "old" info and "new" info).
new_and_oldstg_df = tl_sgmnt_df.alias('tl_sgmnt_df').join(customer_stg_delta_df.alias('customer_stg_delta_df'),(F.col('tl_sgmnt_df.id') == F.col('customer_stg_delta_df.id')),"left_anti" )


5) Next I overwrite staging table with the most recent data and old one (for all segments).
INSERT INTO PREP.CUST_PREP SELECT * FROM STG.CUST_STG
new_and_oldstg_df.write.format("delta").mode("overwrite").partitionBy("SEGMENT").save(dl_url + "/DATA/STG/CUSTOMER")



6) Next I append all info into the prepared table.



Next, TI ETL pipeline runs and does exactly the same thing.
TO and PA segments pipelines run and also do the same thing.


However, like I said in the future they might run in parallel so I'd like to transform this processes more robust and "safer" by working just just with each partition, and this is my first challenge.
Could you please help me on this?

I first need to delete all info from staging table that is already in prepared table with status "R":
DELETE FROM STG.CUST_STG where ID IN (SELECT ID FROM PREP.CUST_PREP WHERE SEGMENT = 'TL')

Then, based on the info loaded do a "union" between the dataframe and staging table (making sure I'm saving data that is not in prep table and at the same time having the new info:
tl_sgmnt_df.createOrReplaceTempView("tl_sgmnt_v")
INSERT INTO STG.CUST_STG SELECT * FROM tl_sgmnt_v
After this, I insert all data into the prep table:

INSERT INTO STG.CUST_STG SELECT * FROM tl_sgmnt_v


But I honestly don't know if this works or not (I presume pyspark will delete info from delta table but after that it might re-write the entire files again), so I believe it's not efficient. Am I right? Of course I would need to replicate this to the other ETL processes.
So I'd essentially like to change steps 4, 5 and 6 to work with partitions, and not with the entire table.
Also, I believe this is a very basic problem, I'm just new into pyspark and there's maybe something I'm missing here. I also noticed that there's a DELETE partition command, but I want to keep info on that partition 


If I'm not clear, please let me know and I'll try to explain better!


Thank you for your help and time!
4 REPLIES 4

databird
New Contributor II

Just a note:
I'm constantly having error when trying to post with correct code formatted text. My apologize, but I can't at all post correctly.

artsheiko
Databricks Employee
Databricks Employee

Hi, 

According to When to partition tables on Databricks :

  • Databricks recommends you do not partition tables that contain less than a terabyte of data.
  • If you proceed with partitions, please check if all partitions contain at least a gigabyte of data. 

Instead of partitions, take a look at :

It is highly possible you do not need to rewrite the whole dataset but rather use the MERGE operation.

While running the processes that may work with the same partitions please make the separation as explicit as possible in the operation condition (see ConcurrentAppendException).

You can find here a repository with demos that can contain useful hints and that you can install in your workspace (maybe the one on Delta Lake would be the most relevant for you at the current stage ? click on the "View the Notebooks" button to access to codes and run the pip command to play with the content)

Hope it helps,

Best,

databird
New Contributor II

Thanks @artsheiko ! 

Well I definitely do not meet all the requirements to build partitioned tables.

The biggest table I have so far has a miserable size of ~60MB just for one partition, and It will increase its space and records, but not enough to reach 1TB or even one 1GB (and this is probably the biggest source I'll have so far).

So, I'll need to review the approach to not have partitions.

Working with the Merge statement, seems the approach to follow.

 

Just a question about the demos you shared:
They're only available on a databricks environment, right?

Thanks for your help!

artsheiko
Databricks Employee
Databricks Employee

Hi @databird ,

You can review the code of each demo by opening the content via "View the Notebooks" or by exploring the following repo : https://github.com/databricks-demos (you can try to search for "merge" to see all the occurrences, for example)

To install the demo, indeed, you need a workspace - the installation process may bring not only the notebooks, but also workflows, DLTs and eventually dashboards. Another reason is that each demo is an independent asset. So, it should operate on top of some demo data - it's generated also during the installation.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group