02-01-2024 05:48 PM
I'm envisioning a live streaming pipeline.
The bronze, or data ingestion, is being fetched using the directory listing mode of the autoloader.
I'm not using File Notification Mode because I detect about 2-300 data changes per hour.
I'm thinking about implementing silver and gold tables later.
I tried to implement silver and gold as streaming tables, but it was not easy.
Do you usually implement silver and gold as a table or materialized view?
Also, can you introduce how to optimize when putting a large amount of data into redshift?
02-03-2024 10:30 PM
Hey @rt-slowth
The decision of whether to implement silver and gold data layers using tables or materialized views depends on several factors, and both approaches have their pros and cons. Here's a breakdown to help you choose:
Tables:
Pros:
Cons:
Materialized views:
Pros:
Cons:
So, which approach to choose?
Here's a general guideline:
Use tables:
Use materialized views:
Regarding your question: Also, can you introduce how to optimize when putting a large amount of data into Redshift? I would encourage you to post this to a Redshift help community or StackOverflow where you can find more appropriate answers to this. Though I can provide a few insights while loading data keep a keen eye on
Hope this helps, any follow-ups are appreciated.
02-04-2024 08:49 PM
@Palash01
This is my bronze pipeline.
# goods_grp 테이블 load
tables = {
"USER": {"id": ["USER_NO"]}
}
def generate_tables(table, info):
@dlt.table(
name=f"{table.lower()}_cdc_raw",
table_properties={"quality": "bronze"},
comment=f"Raw(Source) MySQL Data from DMS for the table: {table}",
temporary=True,
)
def create_call_table():
stream = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# .option("cloudFiles.includeExistingFiles", "false")
.load(f"{INPUT_S3_PATH}/{table}/")
)
if "Op" not in stream.columns:
stream = stream.withColumn("Op", F.lit(None).cast(T.StringType()))
return stream.selectExpr("*", "_metadata as source_metadata", "source_metadata.file_name as source_file_name", "source_metadata.file_path as source_file_path")
dlt.create_streaming_table(
name=f"{table.lower()}",
comment="Bronze MySQL Data from DMS for the table: {table}",
table_properties={
"myCompanyPipeline.quality": "bronze",
"pipelines.autoOptimize.managed": "true",
}
)
dlt.apply_changes(
target=f"{table.lower()}",
source=f"{table.lower()}_cdc_raw",
keys=[*info["id"]],
sequence_by=F.col("ADD_TIME"),
apply_as_deletes=F.expr("Op = 'D'"),
except_column_list=["Op", "_rescued_data"],
stored_as_scd_type=1,
)
for table, info in tables.items():
generate_tables(table, info)
This is a bronze pipeline that uses autoloader to process parquet files in S3 and cdc processing.
In addition to this user table, I'm fetching other tables in the same way as above, and I want to make them into silver tables by doing JOIN and other operations.
@dlt.view
def user_raw():
user_raw = read_delta_table(
schema=TargetSchema.HMMALL, table_name="user", is_stream=False
).selectExpr('*', 'to_timestamp(ADD_TIME) as USER_ADD_TIME', 'REG_DT as USER_REG_DT')
return user_raw
@dlt.view
def shop_raw():
return read_delta_table(
schema=TargetSchema.HMMALL, table_name="shop", is_stream=False
).selectExpr("SHOP_NO", "DEF_SHOP_NM")
@dlt.view
def nation_gaon_raw():
return read_delta_table(
schema=TargetSchema.HMMALL, table_name="nation_gaon", is_stream=False
).selectExpr("NATION_NO", "NATION_NM_KR")
def join_table_with_expr(
left: DataFrame, right: DataFrame, join_expr: str, how_to: str
) -> DataFrame:
return left.join(right, on=F.expr(join_expr), how=how_to)
def generate_user_silver() -> DataFrame:
join_shop_df = join_table_with_expr(
left=dlt.read('user_raw'),
right=dlt.read('shop_raw'),
join_expr="REG_SHOP_NO = SHOP_NO",
how_to="left",
)
join_nation_df = join_table_with_expr(
left=join_shop_df,
right=dlt.read('nation_gaon_raw'),
join_expr="NATI_NO = NATION_NO",
how_to="left",
)
return join_nation_df
def split_user_register_date(df: DataFrame, datetime_cols: Column) -> DataFrame:
split_date_cols = {
"USER_REG_DATE": F.to_date(datetime_cols),
"USER_REG_YEAR" : F.year(datetime_cols),
"USER_REG_MONTH" : F.month(datetime_cols),
"USER_REG_DAY" : F.dayofmonth(datetime_cols),
"USER_REG_TIME" : F.date_format(datetime_cols, "HH"),
"USER_REG_DAY_OF_WEEK" : F.dayofweek(datetime_cols)
}
return df.withColumns(split_date_cols)
@dlt.create_table(
name=f"user_silver",
comment=f"Silver Table from user - Join shop, nation_gaon / Add broker_type",
table_properties={
"myCompanyPipeline.quality": "silver",
"pipelines.autoOptimize.managed": "true",
# "pipelines.reset.allowed": "true"
},
)
def user_silver():
# user_stream = dlt.readStream("user_raw")
# shop = dlt.read('shop_raw')
# nation_gaon = dlt.read('nation_gaon_raw')
# join_shop_df = join_table_with_expr(left=dlt.readStream('user_raw'), right=dlt.read('shop_raw'), join_expr="REG_SHOP_NO = SHOP_NO", how_to="left")
# join_nation_df = join_table_with_expr(left=join_shop_df, right=dlt.read('nation_gaon_raw'), join_expr="NATI_NO = NATION_NO", how_to="left")
join_df = generate_user_silver()
user_reg_dt_cols = F.col('USER_REG_DT')
result = split_user_register_date(df=join_df, datetime_cols=user_reg_dt_cols)
return lower_cols_name(target=result).drop(
"source_metadata", "source_file_name", "source_file_path"
)
The code above is a SILVER table.
The problem here is that bronze is streaming, so data is constantly coming in, but if you make the output of the silver table streaming, you will get an error saying that you need to turn on the skipCommitChanges option for future data.
If you turn on skipCommitChanges, data after the first time you turn on the pipeline will not be updated.
So, out of necessity, I read the bronze streaming table as a read and made it a materialized view.
The notebooks for bronze and silver are all different.
Is this how you typically create a silver table?
I would like to know the example code of how you usually create SILVER, and I would also like to know what to fix about the above code.
02-06-2024 10:38 PM - edited 02-06-2024 10:53 PM
Hey @rt-slowth
Thank you for sharing the code snippets. The code structure appears to be on the right track, and its dynamic nature is promising. With a few minor adjustments, it should achieve the desired outcome. Also, find the attached code syntax in sql and python which explains how can you perform different operations such a adding a column, creating a join, subsetting etc.
SQL Syntax (Notebook: Delta Live Tables Code SQL)
CREATE STREAMING LIVE TABLE sales_orders_cleaned(
CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW
)
PARTITIONED BY (order_date)
COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime."
TBLPROPERTIES ("quality" = "silver")
AS
SELECT f.customer_id, f.customer_name, f.number_of_line_items,
TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime,
DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date,
f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment
FROM STREAM(LIVE.sales_orders_raw) f
LEFT JOIN LIVE.customers c
ON c.customer_id = f.customer_id
AND c.customer_name = f.customer_name
Python Syntax (Delta Live Tables Code Python)
@dlt.create_table(
comment="The cleaned sales orders with valid order_number(s) and partitioned by order_date",
partition_cols=["order_date"],
table_properties={
"myCompanyPipeline.quality": "silver",
"pipelines.autoOptimize.managed": "true"
}
)
@dlt.expect_or_drop("valid order_number", "order_number IS NOT NULL")
def sales_orders_cleaned():
df = dlt.read_stream("sales_orders_raw").join(dlt.read("customers"), ["customer_id", "customer_name"], "left")
df = df.withColumn("order_date", df.order_datetime.cast("DATE"))
df = df.select("customer_id", "customer_name", "number_of_line_items")
return df
Doing the join this way you can avoid creating multiple temp views and this method of creating join will help data sharing between your bronze and silver layer. Here is a git repo which has the code for the initial recommended setup of bronze, silver and gold layer.
TIP: Make sure while creating DLT Pipeline add your bronze, silver and gold layer notebook in a single pipeline. (Refer to Databricks Documentation for more details)
Please feel free to share additional information or specific questions you have, and I'll be happy to assist further.
02-12-2024 04:34 PM
Thank you, @Palash01 @Retired_mod .
Thank you very much for the example code. I have one more additional question. Suppose the bronze table has dlt.apply_changes applied to it. In the silver table, the source data is the bronze table with dlt.apply_changes already applied. If any data is added to the source data after the first run, the silver table gets the following error
Flow 'user_silver' failed fatally, error occurred because it detected an update or delete on one or more rows in the source table. Streaming tables can only use append-only streaming sources. If you plan to delete or update rows in the source table in the future, convert the user_silver table to a live table rather than a streaming live table. To resolve this issue, perform a full refresh on the user_silver table. A full refresh clears all data from the user_silver table and then attempts to load all data from the streaming source.
Non-additive changes can be found in version 11.
Task MERGE
User name: doheekim
How do I resolve this case?
02-13-2024 09:32 PM - edited 02-13-2024 09:35 PM
Hey @rt-slowth
Thanks for getting back, appreciate your follow up. Let's see how can I help:
In simpler words the silver layer table does not expect the rows to get deleted from the source table but in your case you are trying to do a apply_changes which essentially means bronze table rows can be deleted based on the condition you define and that's the reason you are getting this error.
1. Resolving the Issue:
2. Reconsider Data Flow:
3. Full Refresh (Temporary Fix):
Personal Recommendation:
Also, please refer to the error-if-you-expect-to-delete-or-update-rows-to-the-source-table community blog post where I and Kaniz have addressed a similar issue.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group