I am building a bronze table with CDF-enables in these steps:
- Initially, Reading json file from landing zone and write to table location
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.schemaLocation", <schema_loc>) \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.option("cloudFiles.includeExistingFiles", "true") \
.load(<landing_loc_info>)
df.writeStream \
.format("delta") \
.trigger(once = True) \
.option("mergeSchema", "true") \
.option("checkpointLocation", <bronzechk_loc) \
.start(<bronzetbl_loc>)
- Create a delta table and enable CDF
CREATE TABLE bronze.mytable
USING DELTA
LOCATION '<file location>';
ALTER TABLE bronze.mytable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
- Read more data with exactly the same structure from landing zone and insert into bronze table
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.schemaLocation", schema_loc) \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.option("cloudFiles.includeExistingFiles", "true") \
.load(<landing_loc>)
df.createOrReplaceTempView("bronze_company_info_dataset")
sql_query = "INSERT INTO bronze.mytable TABLE bronze_dataset"
spark.sql(sql_query)
- It throws java.lang.StackOverflowError when executing the sql_query:
/databricks/spark/python/pyspark/instrumentation_utils.py in wrapper(*args, **kwargs)
46 start = time.perf_counter()
47 try:
---> 48 res = func(*args, **kwargs)
49 logger.log_success(
50 module_name, class_name, function_name, time.perf_counter() - start, signature
/databricks/spark/python/pyspark/sql/session.py in sql(self, sqlQuery, **kwargs)
1117 sqlQuery = formatter.format(sqlQuery, **kwargs)
1118 try:
-> 1119 return DataFrame(self._jsparkSession.sql(sqlQuery), self)
1120 finally:
1121 if len(kwargs) > 0:
/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
1319
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
- I am using Community Edition, with runtime version 11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12)