I have some data in silver that I read in as a view using the __apply_changes function on. I create a table based on this, and I then want to create my gold-table, after doing a .groupBy() and .pivot(). The transformations I do in the gold-table aren't giving my any error messages, they simply aren't being done. If I create my gold-table based on a view instead of a table, the transformations are done. See bottom for fabricated example.
Here's my code:
import dlt
from pyspark.sql.functions import col, concat_ws, explode, split, explode_outer, posexplode, concat, lit, expr, first, create_map
@dlt.view
def silver_messages():
"""Silver messages table"""
return spark.readStream.table("eqs_cloud.__apply_changes_storage_silver_messages")
@dlt.view
def groups_hierarchy_vw():
"""Get all groups' hierarchy"""
assigned = dlt.read_stream("silver_messages").select("assignedToUnit.*")
from_unit = dlt.read_stream("silver_messages").select("fromUnit.*")
to_unit = dlt.read_stream("silver_messages").select("toUnit.*")
groups_hierarchy_vw = assigned.unionByName(from_unit).unionByName(to_unit)
return groups_hierarchy_vw
@dlt.table
def groups_hierarchy_tbl():
return dlt.read_stream("groups_hierarchy_vw")
@dlt.table
def groups_hierarchy_tbl_pivoted():
return(dlt.read("groups_hierarchy_tbl")
.select(
"id",
"name",
split("path","/").alias("groups_in_path"),
posexplode(split("path","/")).alias("pos","value"))
.select(
"id",
"name",
concat(lit("group"),"pos").alias("group_name"),
expr("groups_in_path[pos]").alias("val"))
.groupBy("id", "name")
.pivot("group_name")
.agg(first("val"))
)
Is there a way I can use the groups_hierarchy_vw view directly in the gold-table at the bottom? If I enter it with dlt.read_stream("groups_hierarchy_vw") I get an error "pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();"
Here's a fabricated example you can try out.
import pandas as pd
from pyspark.sql.functions import col, concat_ws, explode, split, explode_outer, posexplode, concat, lit, expr, first, create_map
pdf = pd.DataFrame({"id": ["1001", "1002", "1003"],
"name": ["Dep1", "Dep2", "Dep3"],
"path": ["1001", "1001/1002", "1001/1002/1003"]
})
df = spark.createDataFrame(pdf)
df.write.mode('overwrite').saveAsTable('fabricated_testtable')
@dlt.view
def fabricated_hierarchy_vw():
return spark.read.table('fabricated_testtable')
@dlt.table
def fabricated_tbl_pivoted():
return (dlt.read('fabricated_hierarchy_vw')
.select(
"id",
"name",
split("path","/").alias("groups_in_path"),
posexplode(split("path","/")).alias("pos","value"))
.select(
"id",
"name",
concat(lit("group"),"pos").alias("group_name"),
expr("groups_in_path[pos]").alias("val"))
.groupBy("id", "name")
.pivot("group_name")
.agg(first("val"))
)
This gives the expected outcome:
But if you change line 12 to @dlt.table, the transformations are not performed and the outcome is this: