cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

ForeachBatch() - Get results from batchDF._jdf.sparkSession().sql('merge stmt')

jm99
New Contributor III

Most python examples show the structure of the foreachBatch method as:

def foreachBatchFunc(batchDF, batchId):
    batchDF.createOrReplaceTempView('viewName')
    
    (
        batchDF
            ._jdf.sparkSession()
            .sql(
                """
   <<   merge statement >>
 
               """
    )

._jdf.sparkSession().sql() returns a java object not a dataframe

How do you get access to the results dataframe containing the (affected, inserted, updated, deleted) row counts?

1 ACCEPTED SOLUTION

Accepted Solutions

jm99
New Contributor III

Just found a solution...

Need to convert the Java Dataframe (jdf) to a DataFrame

from pyspark import sql
 
def batchFunc(batchDF, batchId):
  batchDF.createOrReplaceTempView('viewName')
  sparkSession = batchDF._jdf.sparkSession()
 
  resJdf = sparkSession .sql('merge statement')
 
  resultDf = sql.DataFrame(resJdf, batchDF.sql_ctx)
  firstRow = resultDf.first()
 
  insertedRowCount = 0
  updatedRowCount = 0
  deletedRowCount = 0
  affectedRowCount = 0
 
  if firstRow:
      if ('num_affected_rows' in resultDf .columns):
          affectedRowCount += firstRow['num_affected_rows']
      if ('num_inserted_rows' in resultDf .columns):
          insertedRowCount += firstRow['num_inserted_rows']
      if ('num_updated_rows' in resultDf .columns):
          updatedRowCount += firstRow['num_updated_rows']
      if ('num_deleted_rows' in resultDf .columns):
          deletedRowCount += firstRow['num_deleted_rows']  
 
  # do what you want with the counts!

View solution in original post

1 REPLY 1

jm99
New Contributor III

Just found a solution...

Need to convert the Java Dataframe (jdf) to a DataFrame

from pyspark import sql
 
def batchFunc(batchDF, batchId):
  batchDF.createOrReplaceTempView('viewName')
  sparkSession = batchDF._jdf.sparkSession()
 
  resJdf = sparkSession .sql('merge statement')
 
  resultDf = sql.DataFrame(resJdf, batchDF.sql_ctx)
  firstRow = resultDf.first()
 
  insertedRowCount = 0
  updatedRowCount = 0
  deletedRowCount = 0
  affectedRowCount = 0
 
  if firstRow:
      if ('num_affected_rows' in resultDf .columns):
          affectedRowCount += firstRow['num_affected_rows']
      if ('num_inserted_rows' in resultDf .columns):
          insertedRowCount += firstRow['num_inserted_rows']
      if ('num_updated_rows' in resultDf .columns):
          updatedRowCount += firstRow['num_updated_rows']
      if ('num_deleted_rows' in resultDf .columns):
          deletedRowCount += firstRow['num_deleted_rows']  
 
  # do what you want with the counts!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.