cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Insert data to a CDF-enabled Delta table throwing java.lang.StackOverflowError

Johny
New Contributor III

I am building a bronze table with CDF-enables in these steps:

  • Initially, Reading json file from landing zone and write to table location
df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.schemaLocation", <schema_loc>) \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
  .option("cloudFiles.includeExistingFiles", "true") \
  .load(<landing_loc_info>)
df.writeStream \
      .format("delta") \
      .trigger(once = True) \
      .option("mergeSchema", "true") \
      .option("checkpointLocation", <bronzechk_loc) \
      .start(<bronzetbl_loc>)
  • Create a delta table and enable CDF
CREATE TABLE bronze.mytable
USING DELTA
LOCATION '<file location>';
 
ALTER TABLE bronze.mytable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
  • Read more data with exactly the same structure from landing zone and insert into bronze table
df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.schemaLocation", schema_loc) \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
  .option("cloudFiles.includeExistingFiles", "true") \
  .load(<landing_loc>)
 
df.createOrReplaceTempView("bronze_company_info_dataset")
sql_query = "INSERT INTO bronze.mytable TABLE bronze_dataset"
spark.sql(sql_query)

  • It throws java.lang.StackOverflowError when executing the sql_query:
/databricks/spark/python/pyspark/instrumentation_utils.py in wrapper(*args, **kwargs)
     46             start = time.perf_counter()
     47             try:
---> 48                 res = func(*args, **kwargs)
     49                 logger.log_success(
     50                     module_name, class_name, function_name, time.perf_counter() - start, signature
 
/databricks/spark/python/pyspark/sql/session.py in sql(self, sqlQuery, **kwargs)
   1117             sqlQuery = formatter.format(sqlQuery, **kwargs)
   1118         try:
-> 1119             return DataFrame(self._jsparkSession.sql(sqlQuery), self)
   1120         finally:
   1121             if len(kwargs) > 0:
 
/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   
  • I am using Community Edition, with runtime version 11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12)
2 REPLIES 2

Priyag1
Honored Contributor II

Its a run time error , check if there are any recursive calls , if everything is okay run it freshly again

Johny
New Contributor III

I tried with a simple csv file that only has one column. I got the same error.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.