Seems to work now actually. No idea what changed, as I tried multiple times exactly in this way and it did.not.work.
from pyspark.sql.functions import expr
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as f
data = [("John", 25), ("Alice", 30), ("Bob", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
try:
ENTER_YOUR_DATASTORE_HERE = ""
path = 'abfss://bronze@' + ENTER_YOUR_DATASTORE_HERE + '.dfs.core.windows.net/TEST/test_table'
df.write.format("delta").mode("overwrite").save(path)
except AnalysisException as e:
print("Error while writing the dataframe to Delta Lake:", e)
try:
spark.sql("CREATE DATABASE IF NOT EXISTS test")
spark.sql("USE test")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS test_table
USING delta
LOCATION '{path}'
""")
except AnalysisException as e:
print("Error while creating the catalog entry:", e)
try:
spark.sql("""
ALTER TABLE test_table
SET TBLPROPERTIES ('delta.enableChangeDataCapture' = 'true')
""")
except AnalysisException as e:
print("Error while enabling CDC on the table:", e)
def process_batch(df):
display(df)
# CDC columns available.
df.createOrReplaceGlobalTempView("test_view")
# They seem to show up now as well
df_new = spark.sql("SELECT _change_type FROM global_temp.test_view")
display(df_new)
streaming_query = spark.readStream \
.format("delta") \
.option("readChangeFeed", "true") \
.load(path) \
(streaming_query.writeStream
.foreachBatch(lambda batch_df, batch_id:
process_batch(
df = batch_df
))
.outputMode("append")
.trigger(once=True)
.start()
.awaitTermination()
)