Registering a dataframe coming from a CDC data stream removes the CDC columns from the resulting temporary view, even when explicitly adding a copy of the column to the dataframe.
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-31-2023 12:12 AM
df_source_records.filter(F.col("_change_type").isin("delete", "insert", "update_postimage"))
.withColumn("ROW_NUMBER", F.row_number().over(window))
.filter("ROW_NUMBER = 1")
.withColumn("change_indicator", F.col("_change_type"))
.drop("_commit_version", "_commit_timestamp","ROW_NUMBER")
df_source_records.createOrReplaceGlobalTempView (temporary_view_name)
When now selecting from temporary_view_name, neither the _change_type column nor the change_indicator columns are available to select from. Why is this the case? Seems like a bug? Or is there a workaround?
Thanks!
Labels:
- Labels:
-
Cdc
-
DAIS2023
-
Temporary View
2 REPLIES 2
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-31-2023 05:43 AM
Hi @Louis De Geest
This seems like a bug.
Can you send the snippet of error which you are getting?
#DAIS2023
Ajay Kumar Pandey
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-08-2023 04:15 AM
Seems to work now actually. No idea what changed, as I tried multiple times exactly in this way and it did.not.work.
from pyspark.sql.functions import expr
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as f
data = [("John", 25), ("Alice", 30), ("Bob", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
try:
ENTER_YOUR_DATASTORE_HERE = ""
path = 'abfss://bronze@' + ENTER_YOUR_DATASTORE_HERE + '.dfs.core.windows.net/TEST/test_table'
df.write.format("delta").mode("overwrite").save(path)
except AnalysisException as e:
print("Error while writing the dataframe to Delta Lake:", e)
try:
spark.sql("CREATE DATABASE IF NOT EXISTS test")
spark.sql("USE test")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS test_table
USING delta
LOCATION '{path}'
""")
except AnalysisException as e:
print("Error while creating the catalog entry:", e)
try:
spark.sql("""
ALTER TABLE test_table
SET TBLPROPERTIES ('delta.enableChangeDataCapture' = 'true')
""")
except AnalysisException as e:
print("Error while enabling CDC on the table:", e)
def process_batch(df):
display(df)
# CDC columns available.
df.createOrReplaceGlobalTempView("test_view")
# They seem to show up now as well
df_new = spark.sql("SELECT _change_type FROM global_temp.test_view")
display(df_new)
streaming_query = spark.readStream \
.format("delta") \
.option("readChangeFeed", "true") \
.load(path) \
(streaming_query.writeStream
.foreachBatch(lambda batch_df, batch_id:
process_batch(
df = batch_df
))
.outputMode("append")
.trigger(once=True)
.start()
.awaitTermination()
)

