cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Registering a dataframe coming from a CDC data stream removes the CDC columns from the resulting temporary view, even when explicitly adding a copy of the column to the dataframe.

Louis_Databrick
New Contributor II
                            df_source_records.filter(F.col("_change_type").isin("delete", "insert", "update_postimage"))
                            .withColumn("ROW_NUMBER", F.row_number().over(window))
                            .filter("ROW_NUMBER = 1")
                            .withColumn("change_indicator", F.col("_change_type"))
                            .drop("_commit_version", "_commit_timestamp","ROW_NUMBER")
 
 
df_source_records.createOrReplaceGlobalTempView (temporary_view_name)

When now selecting from temporary_view_name, neither the _change_type column nor the change_indicator columns are available to select from. Why is this the case? Seems like a bug? Or is there a workaround?

Thanks!

2 REPLIES 2

Ajay-Pandey
Esteemed Contributor III

Hi @Louis De Geest​ 

This seems like a bug.

Can you send the snippet of error which you are getting?

 #DAIS2023​ 

Louis_Databrick
New Contributor II

Seems to work now actually. No idea what changed, as I tried multiple times exactly in this way and it did.not.work.

from pyspark.sql.functions import expr
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as f
 
 
data = [("John", 25), ("Alice", 30), ("Bob", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
 
try:
    ENTER_YOUR_DATASTORE_HERE = ""
    path = 'abfss://bronze@' + ENTER_YOUR_DATASTORE_HERE + '.dfs.core.windows.net/TEST/test_table'
    df.write.format("delta").mode("overwrite").save(path)
except AnalysisException as e:
    print("Error while writing the dataframe to Delta Lake:", e)
 
try:
    spark.sql("CREATE DATABASE IF NOT EXISTS test")
    spark.sql("USE test")
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS test_table
        USING delta
        LOCATION '{path}'
    """)
except AnalysisException as e:
    print("Error while creating the catalog entry:", e)
 
try:
    spark.sql("""
        ALTER TABLE test_table
        SET TBLPROPERTIES ('delta.enableChangeDataCapture' = 'true')
    """)
except AnalysisException as e:
    print("Error while enabling CDC on the table:", e)
 
def process_batch(df):
    display(df)
    # CDC columns available.
    df.createOrReplaceGlobalTempView("test_view")
    # They seem to show up now as well
    df_new = spark.sql("SELECT _change_type FROM global_temp.test_view")
    display(df_new)
 
 
streaming_query = spark.readStream \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .load(path) \
 
 
(streaming_query.writeStream 
    .foreachBatch(lambda batch_df, batch_id:
                      process_batch(
                        df = batch_df
                      )) 
    .outputMode("append")
    .trigger(once=True)
    .start()
    .awaitTermination()
)