04-01-2024 10:58 AM
Hello, I am trying to write a simple upsert statement following the steps in the tutorials. here is what my code looks like:
from pyspark.sql import functions as F
def upsert_source_one(self😞
df_source = spark.readStream.format("delta").table(self.source_one_bronze_table)
df_targ_state = spark.read.format("delta").table(f"{self.silver_db}.state")
df_delta = (df_source
.join(df_targ_state, df_source.training_state == df_targ_state.id, "inner") # inner join data
.select(
F.col("badgenumber"),
F.col("name"),
F.col(f"{self.silver_db}.state.state_code").alias("training_state_id")
)
.withColumn("badgenumber", F.regexp_replace('badgenumber', '-', '')) # clean up data
.withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))
)
ws = (df_delta.writeStream
.format("delta")
.foreachBatch(upsertToDelta)
.outputMode("update")
.option("checkpointLocation", self.checkpointlocation)
)
streaming_query = ws.start()
streaming_query.awaitTermination(self.trigger_time_in_sec)
streaming_query.stop()
def upsertToDelta(microBatchOutputDF, batchId😞
# Set the dataframe to view name
user_profile_df =
DeltaTable.forName(microBatchOutputDF.sparkSession(), f"{self.silver_db}.user_profile")
(
user_profile_df.alias("up")
.merge(microBatchOutputDF.alias("br"), "up.badge_number = br.badgenumber")
.whenMatchedUpdate(
set={"officer_name": "br.name",
"training_state_id": "br.training_state_id"
}
)
.whenNotMatchedInsert(
values={
"badge_number": "br.badge_number",
"officer_name": "br.name",
"training_state_id": "br.training_state_id"
}
)
.execute()
)
I've shrunk the code a bit from what I have and the formatting is a bit off maybe.
I am running on a shared compute with a runtime of "15.0 (includes Apache Spark 3.5.0, Scala 2.12)". When I run this, it gets all the way to the .start(), however fails with SparkConnectGrpcException: (java.io.EOFException) error
I am not able to find what I'm doing wrong so any hints or suggestions would be helpful. My guess is it has something to do with the runtime I'm using, but don't want to just use an old version.
Thanks!
04-02-2024 04:17 AM
Hi @hyedesign,
04-02-2024 07:05 AM
Using sample data sets. Here is the full code. This error does seem to be related to runtime version 15,
df_source = spark.readStream.format("delta").table("`cat1`.`bronze`.`officer_info`")
df_orig_state = spark.read.format("delta").table("`sample-db`.`public`.state")
df_targ_state = spark.read.format("delta").table("`cat1`.`silver`.state")
df_delta = (df_source
.join(df_orig_state, df_source.training_state == df_orig_state.id, "inner")
.join(df_targ_state, df_orig_state.state_code == df_targ_state.state_code, "inner")
.select(F.col("badgenumber"),
F.col("name"),
F.col("`cat1`.`silver`.state.id").alias("training_state_id"),
F.col("isVeteran").cast("boolean"),
F.col("cert_date").cast("date")
)
.withColumn("badgenumber", F.regexp_replace('badgenumber', '-', ''))
.withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))
)
def upsertToDelta(microBatchOutputDF, batchId):
officer_profile_df = DeltaTable.forName(microBatchOutputDF.sparkSession,
"`cat1`.`silver`.officer_profile")
(
officer_profile_df
.alias("up")
.merge(microBatchOutputDF.alias("br"), "up.badge_number = br.badgenumber")
.whenMatchedUpdate(
set={
"officer_name": "br.name",
"certification_date": "br.cert_date",
"isVeteran": "br.isVeteran",
"training_state_id": "br.training_state_id"
}
)
.whenNotMatchedInsert(
values={
"up.badge_number": "br.badgenumber",
"up.officer_name": "br.name",
"up.training_state_id": "br.training_state_id",
"up.isVeteran": "br.isVeteran",
"up.certification_date": "br.cert_date"
}
).execute()
)
ws = (df_delta.writeStream
.format("delta")
.foreachBatch(upsertToDelta)
.outputMode("update")
.option("checkpointLocation", "s3://somebucket/checkpoint/silver/source_one_test1")
)
streaming_query = ws.trigger(availableNow=True).start() # <~~~~ FAILS HERE on start() with pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.EOFException)
print(f"waiting for 30 seconds")
streaming_query.awaitTermination(30)
print("Stopping trigger...")
streaming_query.stop()
print(f"Upsert Complete: {datetime.now()}")
04-02-2024 08:48 PM
One more bit of information that I just figured out. This seems to be happening only when i run this via PyCharm. Not sure why it would be giving me this error though. Any help is greatly appreciated.
Friday
I'm getting this error also, in VS Code.
Friday
Was getting errors trying to include the code. Here is my eighth attempt:
for_each_batch_partial = partial(
for_each_batch,
spark=spark,
environment=config.environment,
kinesis_options=config.kinesis_options,
mongo_options=config.mongo_options,
mock_target=config.mock_target,
collection_schemas=create_collection_schema(spark, EVENT_TO_COLLECTION),
log_level = config.log_level
)
query = (
spark.readStream.format("kinesis")
.options(**config.kinesis_options)
.load()
.writeStream.queryName("datapipe")
.option("checkpointLocation", config.checkpoint_path)
.foreachBatch(for_each_batch_partial)
.start()
)
Friday
I figured out the error is hiding an underlying issue with the code, which you can get to if you deploy the bundle (if you are using asset bundles) and run from a notebook in a browser.
So the issue is more about the debugger not being able to stop on an exception thrown by the user function specified in the foreachbatch property of the streaming query.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group