cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Insert data to a CDF-enabled Delta table throwing java.lang.StackOverflowError

Johny
New Contributor III

I am building a bronze table with CDF-enables in these steps:

  • Initially, Reading json file from landing zone and write to table location
df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.schemaLocation", <schema_loc>) \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
  .option("cloudFiles.includeExistingFiles", "true") \
  .load(<landing_loc_info>)
df.writeStream \
      .format("delta") \
      .trigger(once = True) \
      .option("mergeSchema", "true") \
      .option("checkpointLocation", <bronzechk_loc) \
      .start(<bronzetbl_loc>)
  • Create a delta table and enable CDF
CREATE TABLE bronze.mytable
USING DELTA
LOCATION '<file location>';
 
ALTER TABLE bronze.mytable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
  • Read more data with exactly the same structure from landing zone and insert into bronze table
df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.schemaLocation", schema_loc) \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
  .option("cloudFiles.includeExistingFiles", "true") \
  .load(<landing_loc>)
 
df.createOrReplaceTempView("bronze_company_info_dataset")
sql_query = "INSERT INTO bronze.mytable TABLE bronze_dataset"
spark.sql(sql_query)

  • It throws java.lang.StackOverflowError when executing the sql_query:
/databricks/spark/python/pyspark/instrumentation_utils.py in wrapper(*args, **kwargs)
     46             start = time.perf_counter()
     47             try:
---> 48                 res = func(*args, **kwargs)
     49                 logger.log_success(
     50                     module_name, class_name, function_name, time.perf_counter() - start, signature
 
/databricks/spark/python/pyspark/sql/session.py in sql(self, sqlQuery, **kwargs)
   1117             sqlQuery = formatter.format(sqlQuery, **kwargs)
   1118         try:
-> 1119             return DataFrame(self._jsparkSession.sql(sqlQuery), self)
   1120         finally:
   1121             if len(kwargs) > 0:
 
/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   
  • I am using Community Edition, with runtime version 11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12)
2 REPLIES 2

Priyag1
Honored Contributor II

Its a run time error , check if there are any recursive calls , if everything is okay run it freshly again

Johny
New Contributor III

I tried with a simple csv file that only has one column. I got the same error.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group