3 weeks ago - last edited 3 weeks ago
Hello everyone. I am new to DLT and I am trying to practice with it by doing some basic ingestions. I have a query like the following where I am getting data from two tables using UNION. I have noticed that everything gets ingested at the first column as a comma separated string. In my pipeline I am executing somethin like the following. Any suggestions would be appreciated. Cheers!
query = """
SELECT
a.column_a as id_column
a.column_b as val_column
FROM
catalog_a.schema_a.table_a a
UNION ALL
SELECT
b.column_a as id_column
b.column_b as val_column
FROM
catalog_b.shema_b.table_b b"""
@dlt.table
def dim_ship():
return spark.sql(query)
3 weeks ago - last edited 3 weeks ago
Actually I found the solution by using spark.readStream to read the external tables a and b into two dataframes and then I just did combined_df = df_a.union(df_b) to create my DLT table. Thank you!
3 weeks ago
Can you try with the following code?
query_a = """
SELECT
a.column_a as id_column,
a.column_b as val_column
FROM
catalog_a.schema_a.table_a a
"""
query_b = """
SELECT
b.column_a as id_column,
b.column_b as val_column
FROM
catalog_b.schema_b.table_b b
"""
@dlt.table
def table_a_data():
return spark.sql(query_a)
@dlt.table
def table_b_data():
return spark.sql(query_b)
@dlt.table
def dim_ship():
return spark.sql("""
SELECT * FROM table_a_data
UNION ALL
SELECT * FROM table_b_data
""")
3 weeks ago
Unfortunately I am getting the same behavior.
3 weeks ago
You are missing the commas that separate the columns.
3 weeks ago
The weird thing also is that it doesn't fetch only the specified columns but all the columns from the relevant tables.
3 weeks ago
try to run the DLT pipeline with the code either as an SQL cell in a notebook or an *.sql file to see if you have the same problem:
SELECT
3 weeks ago
@Costas96 I would recommend to verify the sql behavior in a notebook/sql editor.
# SQL query with proper comma separation between columns
query = """
SELECT
a.column_a as id_column,
a.column_b as val_column
FROM
catalog_a.schema_a.table_a a
UNION ALL
SELECT
b.column_a as id_column,
b.column_b as val_column
FROM
catalog_b.shema_b.table_b b"""
# Define the Delta Live Table
@dlt.table
def dim_ship():
return spark.sql(query)
# Optional: Verify the output
df = spark.sql(query)
# Check schema
print("Schema:")
df.printSchema()
# Preview data
print("\nData Preview:")
df.show(5)
3 weeks ago - last edited 3 weeks ago
Actually I found the solution by using spark.readStream to read the external tables a and b into two dataframes and then I just did combined_df = df_a.union(df_b) to create my DLT table. Thank you!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group