cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Registering a dataframe coming from a CDC data stream removes the CDC columns from the resulting temporary view, even when explicitly adding a copy of the column to the dataframe.

Louis_Databrick
New Contributor II
                            df_source_records.filter(F.col("_change_type").isin("delete", "insert", "update_postimage"))
                            .withColumn("ROW_NUMBER", F.row_number().over(window))
                            .filter("ROW_NUMBER = 1")
                            .withColumn("change_indicator", F.col("_change_type"))
                            .drop("_commit_version", "_commit_timestamp","ROW_NUMBER")
 
 
df_source_records.createOrReplaceGlobalTempView (temporary_view_name)

When now selecting from temporary_view_name, neither the _change_type column nor the change_indicator columns are available to select from. Why is this the case? Seems like a bug? Or is there a workaround?

Thanks!

2 REPLIES 2

Ajay-Pandey
Esteemed Contributor III

Hi @Louis De Geest​ 

This seems like a bug.

Can you send the snippet of error which you are getting?

 #DAIS2023​ 

Louis_Databrick
New Contributor II

Seems to work now actually. No idea what changed, as I tried multiple times exactly in this way and it did.not.work.

from pyspark.sql.functions import expr
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as f
 
 
data = [("John", 25), ("Alice", 30), ("Bob", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
 
try:
    ENTER_YOUR_DATASTORE_HERE = ""
    path = 'abfss://bronze@' + ENTER_YOUR_DATASTORE_HERE + '.dfs.core.windows.net/TEST/test_table'
    df.write.format("delta").mode("overwrite").save(path)
except AnalysisException as e:
    print("Error while writing the dataframe to Delta Lake:", e)
 
try:
    spark.sql("CREATE DATABASE IF NOT EXISTS test")
    spark.sql("USE test")
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS test_table
        USING delta
        LOCATION '{path}'
    """)
except AnalysisException as e:
    print("Error while creating the catalog entry:", e)
 
try:
    spark.sql("""
        ALTER TABLE test_table
        SET TBLPROPERTIES ('delta.enableChangeDataCapture' = 'true')
    """)
except AnalysisException as e:
    print("Error while enabling CDC on the table:", e)
 
def process_batch(df):
    display(df)
    # CDC columns available.
    df.createOrReplaceGlobalTempView("test_view")
    # They seem to show up now as well
    df_new = spark.sql("SELECT _change_type FROM global_temp.test_view")
    display(df_new)
 
 
streaming_query = spark.readStream \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .load(path) \
 
 
(streaming_query.writeStream 
    .foreachBatch(lambda batch_df, batch_id:
                      process_batch(
                        df = batch_df
                      )) 
    .outputMode("append")
    .trigger(once=True)
    .start()
    .awaitTermination()
)

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.