cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Using Autoloader with merge

MonuDatabricks
Visitor

Hi Everyone, 

I have been trying to use autoloader with foreach so that I could able to use merge into in databricks, but while using I have been getting below error.

error-Found error inside foreachBatch Python process

My code-

from delta.tables import *

streaming_df=(spark.readStream.format("cloudFiles")
 .option("cloudFiles.format", "parquet")
 .option("cloudFiles.schemaLocation", f"{output_path}/schemalocation")
 .load(f"{input_path}"))






deltaTable = DeltaTable.forName(spark,"vyxdbsucdev.silver_generalledger_erpcloud_gl.journalline")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId😞
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.JeHeaderId = t.JeHeaderId" and "s.JeLineId = t.JeLineId")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
query=(streaming_df.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)
 
Complete error:
 
File "/databricks/spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 90, in process func(batch_df, batch_id) File "/home/spark-ba8c57e6-702e-433f-960d-93/.ipykernel/2858/command-1205523182602632-3905487436", line 22, in upsertToDelta File "/databricks/spark/python/delta/connect/tables.py", line 577, in execute return self._spark.createDataFrame(df.toPandas()) File "/databricks/spark/python/pyspark/sql/connect/dataframe.py", line 1874, in toPandas return self._session.client.to_pandas(query, self._plan.observations) File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1076, in to_pandas table, schema, metrics, observed_metrics, _ = self._execute_and_fetch( File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1627, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator( File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1604, in _execute_and_fetch_as_iterator self._handle_error(error) File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1913, in _handle_error self._handle_rpc_error(error) File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1988, in _handle_rpc_error raise convert_exception( pyspark.errors.exceptions.connect.AnalysisException: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve s.JeLineId in search condition given columns t.infa_operation_type, t.infa_versioned_sortable_sequence, t.GlJeLinesAccountedCr, t.GlJeLinesAccountedDr, t.GlJeLinesCodeCombinationId, t.GlJeLinesCreatedBy, t.GlJeLinesCreationDate, t.GlJeLinesCurrencyCode, t.GlJeLinesCurrencyConversionDate, t.GlJeLinesCurrencyConversionRate, t.GlJeLinesCurrencyConversionType, t.GlJeLinesDescription, t.GlJeLinesEffectiveDate, t.GlJeLinesEnteredCr, t.GlJeLinesEnteredDr, t.GlJeLinesGlSlLinkId, t.GlJeLinesGlSlLinkTable, t.GlJeLinesIgnoreRateFlag, t.GlJeLinesLastUpdateDate, t.GlJeLinesLastUpdatedBy, t.GlJeLinesLastUpdateLogin, t.GlJeLinesLedgerId, t.GlJeLinesLineTypeCode, t.GlJeLinesObjectVersionNumber, t.GlJeLinesPeriodName, t.GlJeLinesReference1, t.GlJeLinesReference10, t.GlJeLinesReference2, t.GlJeLinesReference3, t.GlJeLinesReference4, t.GlJeLinesReference5, t.GlJeLinesReference6, t.GlJeLinesReference7, t.GlJeLinesReference8, t.GlJeLinesReference9, t.GlJeLinesStatAmount, t.GlJeLinesStatus, t.GlJeLinesSubledgerDocSequenceId, t.GlJeLinesSubledgerDocSequenceValue, t.JeHeaderId, t.JeLineNum, s.infa_operation_type, s.infa_versioned_sortable_sequence, s.GlJeLinesAccountedCr, s.GlJeLinesAccountedDr, s.GlJeLinesCodeCombinationId, s.GlJeLinesCreatedBy, s.GlJeLinesCreationDate, s.GlJeLinesCurrencyCode, s.GlJeLinesCurrencyConversionDate, s.GlJeLinesCurrencyConversionRate, s.GlJeLinesCurrencyConversionType, s.GlJeLinesDescription, s.GlJeLinesEffectiveDate, s.GlJeLinesEnteredCr, s.GlJeLinesEnteredDr, s.GlJeLinesGlSlLinkId, s.GlJeLinesGlSlLinkTable, s.GlJeLinesIgnoreRateFlag, s.GlJeLinesLastUpdateDate, s.GlJeLinesLastUpdatedBy, s.GlJeLinesLastUpdateLogin, s.GlJeLinesLedgerId, s.GlJeLinesLineTypeCode, s.GlJeLinesObjectVersionNumber, s.GlJeLinesPeriodName, s.GlJeLinesReference1, s.GlJeLinesReference10, s.GlJeLinesReference2, s.GlJeLinesReference3, s.GlJeLinesReference4, s.GlJeLinesReference5, s.GlJeLinesReference6, s.GlJeLinesReference7, s.GlJeLinesReference8, s.GlJeLinesReference9, s.GlJeLinesStatAmount, s.GlJeLinesStatus, s.GlJeLinesSubledgerDocSequenceId, s.GlJeLinesSubledgerDocSequenceValue, s.JeHeaderId, s.JeLineNum, s._rescued_data.; line 1 pos 0 JVM stacktrace: com.databricks.sql.transaction.tahoe.DeltaAnalysisException at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$4(deltaMerge.scala:446) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.assertResolved$1(deltaMerge.scala:439) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$1(deltaMerge.scala:425) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$1$adapted(deltaMerge.scala:425) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.resolveOrFail$1(deltaMerge.scala:425) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.resolveSingleExprOrFail$1(deltaMerge.scala:436) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.resolveReferencesAndSchema(deltaMerge.scala:716) at com.databricks.sql.transaction.tahoe.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:827) at com.databricks.sql.transaction.tahoe.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:107) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:219) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:219) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:400) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:217) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:213) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:39) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:209) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:208) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:39) at com.databricks.sql.transaction.tahoe.DeltaAnalysis.apply(DeltaAnalysis.scala:107) at com.databricks.sql.transaction.tahoe.DeltaAnalysis.apply(DeltaAnalysis.scala:101) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$4(RuleExecutor.scala:312) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:312) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:309) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:292) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9(RuleExecutor.scala:385) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9$adapted(RuleExecutor.scala:385) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:385) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:256) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:450) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:443) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:357) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:443) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:376) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:248) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:167) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:248) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:428) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:407) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:427) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:247) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:395) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:582) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1103) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:582) at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:578) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:578) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:241) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:240) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:222) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:102) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175) at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1182) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1182) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:100) at io.delta.connect.DeltaRelationPlugin.transformMergeIntoTable(DeltaRelationPlugin.scala:195) at io.delta.connect.DeltaRelationPlugin.transform(DeltaRelationPlugin.scala:80) at io.delta.connect.DeltaRelationPlugin.transform(DeltaRelationPlugin.scala:53) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelationPlugin$1(SparkConnectPlanner.scala:249)
 
 
 
Please check and guide the solution please.
1 REPLY 1

-werners-
Esteemed Contributor III

It seems the columns of your join condition are not found.  Are they in the dataframes/table?
Also try to put the whole join condition in a single string:
"s.JeHeaderId = t.JeHeaderId and s.JeLineId = t.JeLineId"

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group