cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Questions about the design of bronze, silver, and gold for live streaming pipelines

rt-slowth
Contributor

I'm envisioning a live streaming pipeline.
The bronze, or data ingestion, is being fetched using the directory listing mode of the autoloader.
I'm not using File Notification Mode because I detect about 2-300 data changes per hour.
I'm thinking about implementing silver and gold tables later.
I tried to implement silver and gold as streaming tables, but it was not easy.


Do you usually implement silver and gold as a table or materialized view?

Also, can you introduce how to optimize when putting a large amount of data into redshift?

5 REPLIES 5

Palash01
Valued Contributor

Hey @rt-slowth 

The decision of whether to implement silver and gold data layers using tables or materialized views depends on several factors, and both approaches have their pros and cons. Here's a breakdown to help you choose:

Tables:

Pros:

  • Flexibility: Tables offer more flexibility for modifications and updates on the data itself.
  • Performance: Updating table data can be faster than refreshing a materialized view, especially for smaller datasets.
  • Fine-grained control: You have more control over data access and authorization on individual tables.

Cons:

  • Performance for querying: For large datasets, complex queries on tables can be slower than using materialized views pre-computing aggregations and joins.
  • Data consistency: Maintaining consistency between silver and gold layers can be more challenging with independent tables.

Materialized views:

Pros:

  • Query performance: Pre-computed data in materialized views often leads to faster query performance, especially for complex aggregations and joins on large datasets.
  • Data consistency: Materialized views automatically refresh based on their source tables, ensuring consistency between layers.

Cons:

  • Flexibility: The materialized view is based on the defined query, limiting modifications directly to the view data.
  • Maintenance: Refreshing materialized views can be resource-intensive, especially for large datasets and complex views.

So, which approach to choose?

Here's a general guideline:

  • Use tables:

    • If you need frequent updates to the data itself.
    • If data consistency is less critical or easily managed.
    • If query performance is not a major concern or the dataset is small.
  • Use materialized views:

    • If query performance for complex operations is crucial.
    • If data consistency between layers is critical.
    • If data updates are less frequent.

Regarding your question: Also, can you introduce how to optimize when putting a large amount of data into Redshift? I would encourage you to post this to a Redshift help community or StackOverflow where you can find more appropriate answers to this. Though I can provide a few insights while loading data keep a keen eye on 

  1. Data Compression (GZIP or LZO)
  2. Filter any unnecessary data that you don't need
  3. Work on schema optimization
  4. Proper use of Manifest files and Copy Activity
  5. Adjust your cluster size or concurrency settings
  6. Use Vaccum and Analyze commands to maintain table statistics and improve query performance.

Hope this helps, any follow-ups are appreciated.

 

Leave a like if this helps! Kudos,
Palash

rt-slowth
Contributor

@Palash01 
This is my bronze pipeline.

# goods_grp ํ…Œ์ด๋ธ” load
tables = {
    "USER": {"id": ["USER_NO"]}
}


def generate_tables(table, info):
    @dlt.table(
        name=f"{table.lower()}_cdc_raw",
        table_properties={"quality": "bronze"},
        comment=f"Raw(Source) MySQL Data from DMS for the table: {table}",
        temporary=True,
    )
    def create_call_table():
        stream = (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "parquet")
            # .option("cloudFiles.includeExistingFiles", "false")
            .load(f"{INPUT_S3_PATH}/{table}/")
        )

        if "Op" not in stream.columns:
            stream = stream.withColumn("Op", F.lit(None).cast(T.StringType()))

        return stream.selectExpr("*", "_metadata as source_metadata", "source_metadata.file_name as source_file_name", "source_metadata.file_path as source_file_path")

    dlt.create_streaming_table(
        name=f"{table.lower()}",
        comment="Bronze MySQL Data from DMS for the table: {table}",
        table_properties={
            "myCompanyPipeline.quality": "bronze",
            "pipelines.autoOptimize.managed": "true",
        }
    )

    dlt.apply_changes(
        target=f"{table.lower()}",
        source=f"{table.lower()}_cdc_raw",
        keys=[*info["id"]],
        sequence_by=F.col("ADD_TIME"),
        apply_as_deletes=F.expr("Op = 'D'"),
        except_column_list=["Op", "_rescued_data"],
        stored_as_scd_type=1,
    )

for table, info in tables.items():
    generate_tables(table, info)

This is a bronze pipeline that uses autoloader to process parquet files in S3 and cdc processing.
In addition to this user table, I'm fetching other tables in the same way as above, and I want to make them into silver tables by doing JOIN and other operations.

 

@dlt.view
def user_raw():
    user_raw = read_delta_table(
        schema=TargetSchema.HMMALL, table_name="user", is_stream=False
    ).selectExpr('*', 'to_timestamp(ADD_TIME) as USER_ADD_TIME', 'REG_DT as USER_REG_DT')

    return user_raw


@dlt.view
def shop_raw():
    return read_delta_table(
        schema=TargetSchema.HMMALL, table_name="shop", is_stream=False
    ).selectExpr("SHOP_NO", "DEF_SHOP_NM")


@dlt.view
def nation_gaon_raw():
    return read_delta_table(
        schema=TargetSchema.HMMALL, table_name="nation_gaon", is_stream=False
    ).selectExpr("NATION_NO", "NATION_NM_KR")


def join_table_with_expr(
    left: DataFrame, right: DataFrame, join_expr: str, how_to: str
) -> DataFrame:
    return left.join(right, on=F.expr(join_expr), how=how_to)


def generate_user_silver() -> DataFrame:
    join_shop_df = join_table_with_expr(
        left=dlt.read('user_raw'),
        right=dlt.read('shop_raw'),
        join_expr="REG_SHOP_NO = SHOP_NO",
        how_to="left",
    )
    join_nation_df = join_table_with_expr(
        left=join_shop_df,
        right=dlt.read('nation_gaon_raw'),
        join_expr="NATI_NO = NATION_NO",
        how_to="left",
    )
    return join_nation_df


def split_user_register_date(df: DataFrame, datetime_cols: Column) -> DataFrame:
    split_date_cols = {
        "USER_REG_DATE": F.to_date(datetime_cols),
        "USER_REG_YEAR" : F.year(datetime_cols),
        "USER_REG_MONTH" : F.month(datetime_cols),
        "USER_REG_DAY" : F.dayofmonth(datetime_cols),
        "USER_REG_TIME" : F.date_format(datetime_cols, "HH"),
        "USER_REG_DAY_OF_WEEK" : F.dayofweek(datetime_cols)
    }

    return df.withColumns(split_date_cols)


@dlt.create_table(
    name=f"user_silver",
    comment=f"Silver Table from user - Join shop, nation_gaon / Add broker_type",
    table_properties={
        "myCompanyPipeline.quality": "silver",
        "pipelines.autoOptimize.managed": "true",
        # "pipelines.reset.allowed": "true"
    },
)
def user_silver():
    # user_stream = dlt.readStream("user_raw")
    # shop = dlt.read('shop_raw')
    # nation_gaon = dlt.read('nation_gaon_raw')

    # join_shop_df = join_table_with_expr(left=dlt.readStream('user_raw'), right=dlt.read('shop_raw'), join_expr="REG_SHOP_NO = SHOP_NO", how_to="left")
    # join_nation_df = join_table_with_expr(left=join_shop_df, right=dlt.read('nation_gaon_raw'), join_expr="NATI_NO = NATION_NO", how_to="left")

    join_df = generate_user_silver()
    user_reg_dt_cols = F.col('USER_REG_DT')
    result = split_user_register_date(df=join_df, datetime_cols=user_reg_dt_cols)
    return lower_cols_name(target=result).drop(
        "source_metadata", "source_file_name", "source_file_path"
    )

The code above is a SILVER table.
The problem here is that bronze is streaming, so data is constantly coming in, but if you make the output of the silver table streaming, you will get an error saying that you need to turn on the skipCommitChanges option for future data.
If you turn on skipCommitChanges, data after the first time you turn on the pipeline will not be updated.
So, out of necessity, I read the bronze streaming table as a read and made it a materialized view.
The notebooks for bronze and silver are all different.

Is this how you typically create a silver table?
I would like to know the example code of how you usually create SILVER, and I would also like to know what to fix about the above code.

Palash01
Valued Contributor

Hey @rt-slowth 

Thank you for sharing the code snippets. The code structure appears to be on the right track, and its dynamic nature is promising. With a few minor adjustments, it should achieve the desired outcome. Also, find the attached code syntax in sql and python which explains how can you perform different operations such a adding a column, creating a join, subsetting etc. 

SQL Syntax (Notebook: Delta Live Tables Code SQL)

 

CREATE STREAMING LIVE TABLE sales_orders_cleaned(
  CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW
)
PARTITIONED BY (order_date)
COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime."
TBLPROPERTIES ("quality" = "silver")
AS
SELECT f.customer_id, f.customer_name, f.number_of_line_items,
  TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime,
  DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date,
  f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment
  FROM STREAM(LIVE.sales_orders_raw) f
  LEFT JOIN LIVE.customers c
      ON c.customer_id = f.customer_id
     AND c.customer_name = f.customer_name

 

Python Syntax (Delta Live Tables Code Python)

 

@dlt.create_table(
  comment="The cleaned sales orders with valid order_number(s) and partitioned by order_date",
  partition_cols=["order_date"],
  table_properties={
    "myCompanyPipeline.quality": "silver",
    "pipelines.autoOptimize.managed": "true"
  }
)
@dlt.expect_or_drop("valid order_number", "order_number IS NOT NULL")
def sales_orders_cleaned():
  df = dlt.read_stream("sales_orders_raw").join(dlt.read("customers"), ["customer_id", "customer_name"], "left")
  df = df.withColumn("order_date", df.order_datetime.cast("DATE")) 
  df = df.select("customer_id", "customer_name", "number_of_line_items")
  return df

 

Doing the join this way you can avoid creating multiple temp views and this method of creating join will help data sharing between your bronze and silver layer. Here is a git repo which has the code for the initial recommended setup of bronze, silver and gold layer. 

TIP: Make sure while creating DLT Pipeline add your bronze, silver and gold layer notebook in a single pipeline. (Refer to Databricks Documentation for more details)

Please feel free to share additional information or specific questions you have, and I'll be happy to assist further.

 

Leave a like if this helps! Kudos,
Palash

Thank you, @Palash01  @Retired_mod .

Thank you very much for the example code. I have one more additional question. Suppose the bronze table has dlt.apply_changes applied to it. In the silver table, the source data is the bronze table with dlt.apply_changes already applied. If any data is added to the source data after the first run, the silver table gets the following error

Flow 'user_silver' failed fatally, error occurred because it detected an update or delete on one or more rows in the source table. Streaming tables can only use append-only streaming sources. If you plan to delete or update rows in the source table in the future, convert the user_silver table to a live table rather than a streaming live table. To resolve this issue, perform a full refresh on the user_silver table. A full refresh clears all data from the user_silver table and then attempts to load all data from the streaming source.

Non-additive changes can be found in version 11.
Task MERGE
User name: doheekim


How do I resolve this case?

Palash01
Valued Contributor

Hey @rt-slowth 

Thanks for getting back, appreciate your follow up. Let's see how can I help:

  • The error arises because we have streaming table for the silver table, but its source (the bronze table with dlt.apply_changes) isn't append-only.
  • Streaming tables expect only new data to be added, not updates or deletes.

In simpler words the silver layer table does not expect the rows to get deleted from the source table but in your case you are trying to do a apply_changes which essentially means bronze table rows can be deleted based on the condition you define and that's the reason you are getting this error. 

1. Resolving the Issue:

  • Change the silver table to a "live table" instead of a "streaming live table." Live tables inherently support updates and deletes, aligning with the bronze table's nature after dlt.apply_changes.

2. Reconsider Data Flow:

  • If converting to a live table isn't feasible, restructure your pipeline:
    • Store the raw data in a separate table/layer without applying dlt.apply_changes.
    • Create a new silver table that directly consumes the raw data and handles any updates or deletes appropriately.

3. Full Refresh (Temporary Fix):

  • While a full refresh can clear the error, it's a temporary workaround and not a sustainable solution.
  • It clears and reloads all data, potentially causing performance issues and data loss if not handled carefully.

Personal Recommendation:

  • Ensure data consistency between bronze and silver tables, especially regarding updates and deletes. 
  • Have apply changes in silver layer or any other layer except bronze as it is a ideal practice to keep raw data without any transformations in bronze layer.
  • Prioritize converting the silver table to a live table for a more robust and long-term solution.
  • If that's not possible, consider restructuring your pipeline to manage updates and deletes effectively.
  • Use full refresh only as a temporary measure and explore sustainable alternatives.

Also, please refer to the error-if-you-expect-to-delete-or-update-rows-to-the-source-table community blog post where I and Kaniz have addressed a similar issue.

 

Leave a like if this helps! Kudos,
Palash

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group