cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Using Autoloader with merge

MonuDatabricks
New Contributor II

Hi Everyone, 

I have been trying to use autoloader with foreach so that I could able to use merge into in databricks, but while using I have been getting below error.

error-Found error inside foreachBatch Python process

My code-

from delta.tables import *

streaming_df=(spark.readStream.format("cloudFiles")
 .option("cloudFiles.format", "parquet")
 .option("cloudFiles.schemaLocation", f"{output_path}/schemalocation")
 .load(f"{input_path}"))






deltaTable = DeltaTable.forName(spark,"vyxdbsucdev.silver_generalledger_erpcloud_gl.journalline")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId😞
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.JeHeaderId = t.JeHeaderId" and "s.JeLineId = t.JeLineId")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
query=(streaming_df.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)
 
Complete error:
 
File "/databricks/spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 90, in process func(batch_df, batch_id) File "/home/spark-ba8c57e6-702e-433f-960d-93/.ipykernel/2858/command-1205523182602632-3905487436", line 22, in upsertToDelta File "/databricks/spark/python/delta/connect/tables.py", line 577, in execute return self._spark.createDataFrame(df.toPandas()) File "/databricks/spark/python/pyspark/sql/connect/dataframe.py", line 1874, in toPandas return self._session.client.to_pandas(query, self._plan.observations) File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1076, in to_pandas table, schema, metrics, observed_metrics, _ = self._execute_and_fetch( File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1627, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator( File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1604, in _execute_and_fetch_as_iterator self._handle_error(error) File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1913, in _handle_error self._handle_rpc_error(error) File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1988, in _handle_rpc_error raise convert_exception( pyspark.errors.exceptions.connect.AnalysisException: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve s.JeLineId in search condition given columns t.infa_operation_type, t.infa_versioned_sortable_sequence, t.GlJeLinesAccountedCr, t.GlJeLinesAccountedDr, t.GlJeLinesCodeCombinationId, t.GlJeLinesCreatedBy, t.GlJeLinesCreationDate, t.GlJeLinesCurrencyCode, t.GlJeLinesCurrencyConversionDate, t.GlJeLinesCurrencyConversionRate, t.GlJeLinesCurrencyConversionType, t.GlJeLinesDescription, t.GlJeLinesEffectiveDate, t.GlJeLinesEnteredCr, t.GlJeLinesEnteredDr, t.GlJeLinesGlSlLinkId, t.GlJeLinesGlSlLinkTable, t.GlJeLinesIgnoreRateFlag, t.GlJeLinesLastUpdateDate, t.GlJeLinesLastUpdatedBy, t.GlJeLinesLastUpdateLogin, t.GlJeLinesLedgerId, t.GlJeLinesLineTypeCode, t.GlJeLinesObjectVersionNumber, t.GlJeLinesPeriodName, t.GlJeLinesReference1, t.GlJeLinesReference10, t.GlJeLinesReference2, t.GlJeLinesReference3, t.GlJeLinesReference4, t.GlJeLinesReference5, t.GlJeLinesReference6, t.GlJeLinesReference7, t.GlJeLinesReference8, t.GlJeLinesReference9, t.GlJeLinesStatAmount, t.GlJeLinesStatus, t.GlJeLinesSubledgerDocSequenceId, t.GlJeLinesSubledgerDocSequenceValue, t.JeHeaderId, t.JeLineNum, s.infa_operation_type, s.infa_versioned_sortable_sequence, s.GlJeLinesAccountedCr, s.GlJeLinesAccountedDr, s.GlJeLinesCodeCombinationId, s.GlJeLinesCreatedBy, s.GlJeLinesCreationDate, s.GlJeLinesCurrencyCode, s.GlJeLinesCurrencyConversionDate, s.GlJeLinesCurrencyConversionRate, s.GlJeLinesCurrencyConversionType, s.GlJeLinesDescription, s.GlJeLinesEffectiveDate, s.GlJeLinesEnteredCr, s.GlJeLinesEnteredDr, s.GlJeLinesGlSlLinkId, s.GlJeLinesGlSlLinkTable, s.GlJeLinesIgnoreRateFlag, s.GlJeLinesLastUpdateDate, s.GlJeLinesLastUpdatedBy, s.GlJeLinesLastUpdateLogin, s.GlJeLinesLedgerId, s.GlJeLinesLineTypeCode, s.GlJeLinesObjectVersionNumber, s.GlJeLinesPeriodName, s.GlJeLinesReference1, s.GlJeLinesReference10, s.GlJeLinesReference2, s.GlJeLinesReference3, s.GlJeLinesReference4, s.GlJeLinesReference5, s.GlJeLinesReference6, s.GlJeLinesReference7, s.GlJeLinesReference8, s.GlJeLinesReference9, s.GlJeLinesStatAmount, s.GlJeLinesStatus, s.GlJeLinesSubledgerDocSequenceId, s.GlJeLinesSubledgerDocSequenceValue, s.JeHeaderId, s.JeLineNum, s._rescued_data.; line 1 pos 0 JVM stacktrace: com.databricks.sql.transaction.tahoe.DeltaAnalysisException at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$4(deltaMerge.scala:446) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.assertResolved$1(deltaMerge.scala:439) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$1(deltaMerge.scala:425) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$1$adapted(deltaMerge.scala:425) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.resolveOrFail$1(deltaMerge.scala:425) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.resolveSingleExprOrFail$1(deltaMerge.scala:436) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.resolveReferencesAndSchema(deltaMerge.scala:716) at com.databricks.sql.transaction.tahoe.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:827) at com.databricks.sql.transaction.tahoe.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:107) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:219) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:219) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:400) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:217) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:213) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:39) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:209) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:208) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:39) at com.databricks.sql.transaction.tahoe.DeltaAnalysis.apply(DeltaAnalysis.scala:107) at com.databricks.sql.transaction.tahoe.DeltaAnalysis.apply(DeltaAnalysis.scala:101) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$4(RuleExecutor.scala:312) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:312) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:309) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:292) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9(RuleExecutor.scala:385) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9$adapted(RuleExecutor.scala:385) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:385) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:256) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:450) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:443) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:357) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:443) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:376) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:248) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:167) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:248) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:428) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:407) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:427) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:247) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:395) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:582) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1103) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:582) at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:578) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:578) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:241) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:240) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:222) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:102) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175) at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1182) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1182) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:100) at io.delta.connect.DeltaRelationPlugin.transformMergeIntoTable(DeltaRelationPlugin.scala:195) at io.delta.connect.DeltaRelationPlugin.transform(DeltaRelationPlugin.scala:80) at io.delta.connect.DeltaRelationPlugin.transform(DeltaRelationPlugin.scala:53) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelationPlugin$1(SparkConnectPlanner.scala:249)
 
 
 
Please check and guide the solution please.
1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

It seems the columns of your join condition are not found.  Are they in the dataframes/table?
Also try to put the whole join condition in a single string:
"s.JeHeaderId = t.JeHeaderId and s.JeLineId = t.JeLineId"

View solution in original post

1 REPLY 1

-werners-
Esteemed Contributor III

It seems the columns of your join condition are not found.  Are they in the dataframes/table?
Also try to put the whole join condition in a single string:
"s.JeHeaderId = t.JeHeaderId and s.JeLineId = t.JeLineId"

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now