Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-13-2023 01:20 AM
Most python examples show the structure of the foreachBatch method as:
def foreachBatchFunc(batchDF, batchId):
batchDF.createOrReplaceTempView('viewName')
(
batchDF
._jdf.sparkSession()
.sql(
"""
<< merge statement >>
"""
)
._jdf.sparkSession().sql() returns a java object not a dataframe
How do you get access to the results dataframe containing the (affected, inserted, updated, deleted) row counts?
Labels:
- Labels:
-
Dataframe
-
Foreachbatch
-
Sparksession
-
SQL
1 ACCEPTED SOLUTION
Accepted Solutions
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-13-2023 04:14 AM
Just found a solution...
Need to convert the Java Dataframe (jdf) to a DataFrame
from pyspark import sql
def batchFunc(batchDF, batchId):
batchDF.createOrReplaceTempView('viewName')
sparkSession = batchDF._jdf.sparkSession()
resJdf = sparkSession .sql('merge statement')
resultDf = sql.DataFrame(resJdf, batchDF.sql_ctx)
firstRow = resultDf.first()
insertedRowCount = 0
updatedRowCount = 0
deletedRowCount = 0
affectedRowCount = 0
if firstRow:
if ('num_affected_rows' in resultDf .columns):
affectedRowCount += firstRow['num_affected_rows']
if ('num_inserted_rows' in resultDf .columns):
insertedRowCount += firstRow['num_inserted_rows']
if ('num_updated_rows' in resultDf .columns):
updatedRowCount += firstRow['num_updated_rows']
if ('num_deleted_rows' in resultDf .columns):
deletedRowCount += firstRow['num_deleted_rows']
# do what you want with the counts!
1 REPLY 1
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-13-2023 04:14 AM
Just found a solution...
Need to convert the Java Dataframe (jdf) to a DataFrame
from pyspark import sql
def batchFunc(batchDF, batchId):
batchDF.createOrReplaceTempView('viewName')
sparkSession = batchDF._jdf.sparkSession()
resJdf = sparkSession .sql('merge statement')
resultDf = sql.DataFrame(resJdf, batchDF.sql_ctx)
firstRow = resultDf.first()
insertedRowCount = 0
updatedRowCount = 0
deletedRowCount = 0
affectedRowCount = 0
if firstRow:
if ('num_affected_rows' in resultDf .columns):
affectedRowCount += firstRow['num_affected_rows']
if ('num_inserted_rows' in resultDf .columns):
insertedRowCount += firstRow['num_inserted_rows']
if ('num_updated_rows' in resultDf .columns):
updatedRowCount += firstRow['num_updated_rows']
if ('num_deleted_rows' in resultDf .columns):
deletedRowCount += firstRow['num_deleted_rows']
# do what you want with the counts!

