I have been trying to use autoloader with foreach so that I could able to use merge into in databricks, but while using I have been getting below error.
from delta.tables import *
streaming_df=(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", f"{output_path}/schemalocation")
.load(f"{input_path}"))
deltaTable = DeltaTable.forName(spark,"vyxdbsucdev.silver_generalledger_erpcloud_gl.journalline")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId😞
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.JeHeaderId = t.JeHeaderId" and "s.JeLineId = t.JeLineId")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
query=(streaming_df.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Complete error:
File "/databricks/spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 90, in process func(batch_df, batch_id) File "/home/spark-ba8c57e6-702e-433f-960d-93/.ipykernel/2858/command-1205523182602632-3905487436", line 22, in upsertToDelta File "/databricks/spark/python/delta/connect/tables.py", line 577, in execute return self._spark.createDataFrame(df.toPandas()) File "/databricks/spark/python/pyspark/sql/connect/dataframe.py", line 1874, in toPandas return self._session.client.to_pandas(query, self._plan.observations) File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1076, in to_pandas table, schema, metrics, observed_metrics, _ = self._execute_and_fetch( File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1627, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator( File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1604, in _execute_and_fetch_as_iterator self._handle_error(error) File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1913, in _handle_error self._handle_rpc_error(error) File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1988, in _handle_rpc_error raise convert_exception( pyspark.errors.exceptions.connect.AnalysisException: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve s.JeLineId in search condition given columns t.infa_operation_type, t.infa_versioned_sortable_sequence, t.GlJeLinesAccountedCr, t.GlJeLinesAccountedDr, t.GlJeLinesCodeCombinationId, t.GlJeLinesCreatedBy, t.GlJeLinesCreationDate, t.GlJeLinesCurrencyCode, t.GlJeLinesCurrencyConversionDate, t.GlJeLinesCurrencyConversionRate, t.GlJeLinesCurrencyConversionType, t.GlJeLinesDescription, t.GlJeLinesEffectiveDate, t.GlJeLinesEnteredCr, t.GlJeLinesEnteredDr, t.GlJeLinesGlSlLinkId, t.GlJeLinesGlSlLinkTable, t.GlJeLinesIgnoreRateFlag, t.GlJeLinesLastUpdateDate, t.GlJeLinesLastUpdatedBy, t.GlJeLinesLastUpdateLogin, t.GlJeLinesLedgerId, t.GlJeLinesLineTypeCode, t.GlJeLinesObjectVersionNumber, t.GlJeLinesPeriodName, t.GlJeLinesReference1, t.GlJeLinesReference10, t.GlJeLinesReference2, t.GlJeLinesReference3, t.GlJeLinesReference4, t.GlJeLinesReference5, t.GlJeLinesReference6, t.GlJeLinesReference7, t.GlJeLinesReference8, t.GlJeLinesReference9, t.GlJeLinesStatAmount, t.GlJeLinesStatus, t.GlJeLinesSubledgerDocSequenceId, t.GlJeLinesSubledgerDocSequenceValue, t.JeHeaderId, t.JeLineNum, s.infa_operation_type, s.infa_versioned_sortable_sequence, s.GlJeLinesAccountedCr, s.GlJeLinesAccountedDr, s.GlJeLinesCodeCombinationId, s.GlJeLinesCreatedBy, s.GlJeLinesCreationDate, s.GlJeLinesCurrencyCode, s.GlJeLinesCurrencyConversionDate, s.GlJeLinesCurrencyConversionRate, s.GlJeLinesCurrencyConversionType, s.GlJeLinesDescription, s.GlJeLinesEffectiveDate, s.GlJeLinesEnteredCr, s.GlJeLinesEnteredDr, s.GlJeLinesGlSlLinkId, s.GlJeLinesGlSlLinkTable, s.GlJeLinesIgnoreRateFlag, s.GlJeLinesLastUpdateDate, s.GlJeLinesLastUpdatedBy, s.GlJeLinesLastUpdateLogin, s.GlJeLinesLedgerId, s.GlJeLinesLineTypeCode, s.GlJeLinesObjectVersionNumber, s.GlJeLinesPeriodName, s.GlJeLinesReference1, s.GlJeLinesReference10, s.GlJeLinesReference2, s.GlJeLinesReference3, s.GlJeLinesReference4, s.GlJeLinesReference5, s.GlJeLinesReference6, s.GlJeLinesReference7, s.GlJeLinesReference8, s.GlJeLinesReference9, s.GlJeLinesStatAmount, s.GlJeLinesStatus, s.GlJeLinesSubledgerDocSequenceId, s.GlJeLinesSubledgerDocSequenceValue, s.JeHeaderId, s.JeLineNum, s._rescued_data.; line 1 pos 0 JVM stacktrace: com.databricks.sql.transaction.tahoe.DeltaAnalysisException at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$4(deltaMerge.scala:446) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.assertResolved$1(deltaMerge.scala:439) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$1(deltaMerge.scala:425) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$1$adapted(deltaMerge.scala:425) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.resolveOrFail$1(deltaMerge.scala:425) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.resolveSingleExprOrFail$1(deltaMerge.scala:436) at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.resolveReferencesAndSchema(deltaMerge.scala:716) at com.databricks.sql.transaction.tahoe.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:827) at com.databricks.sql.transaction.tahoe.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:107) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:219) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:219) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:400) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:217) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:213) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:39) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:209) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:208) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:39) at com.databricks.sql.transaction.tahoe.DeltaAnalysis.apply(DeltaAnalysis.scala:107) at com.databricks.sql.transaction.tahoe.DeltaAnalysis.apply(DeltaAnalysis.scala:101) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$4(RuleExecutor.scala:312) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:312) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:309) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:292) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9(RuleExecutor.scala:385) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9$adapted(RuleExecutor.scala:385) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:385) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:256) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:450) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:443) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:357) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:443) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:376) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:248) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:167) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:248) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:428) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:407) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:427) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:247) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:395) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:582) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1103) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:582) at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:578) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:578) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:241) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:240) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:222) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:102) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175) at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1182) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1182) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:100) at io.delta.connect.DeltaRelationPlugin.transformMergeIntoTable(DeltaRelationPlugin.scala:195) at io.delta.connect.DeltaRelationPlugin.transform(DeltaRelationPlugin.scala:80) at io.delta.connect.DeltaRelationPlugin.transform(DeltaRelationPlugin.scala:53) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelationPlugin$1(SparkConnectPlanner.scala:249)
Please check and guide the solution please.