Hey guys, I keep on getting this error message when trying to call a function with soda DQ's:
[PARSE_SYNTAX_ERROR] Syntax error at or near '{'. SQLSTATE: 42601
File <command-81221799516900>, line 4
1 dfBronze.writeStream \
2 .foreachBatch(process_batch) \
3 .trigger(processingTime="1 second") \
----> 4 .start()
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1988, in SparkConnectClient._handle_rpc_error(self, rpc_error)
1985 info = error_details_pb2.ErrorInfo()
1986 d.Unpack(info)
-> 1988 raise convert_exception(
1989 info,
1990 status.message,
1991 self._fetch_enriched_error(info),
1992 self._display_server_stack_trace(),
1993 ) from None
1995 raise SparkConnectGrpcException(status.message) from None
1996 else:
this is my code, in general I'm just reading a stream with autoloader, the ingestion was going great until i added the function, but I can't find the error:
def process_batch(df: DataFrame, batch_id):
#! Soda Cloud configuration (move this to environment variables later)
soda_config = """
soda_cloud:
host: cloud.soda.io
api_key_id: mykey
api_key_secret: mysecret
"""
# Define the checks for the batch
checks = """
checks for df:
- missing_count(site) = 0:
name: Ensure there are no null values in the Site column
fail: 1
warn: 0
- missing_count(meter) = 0:
name: Ensure there are no null values in the Meter column
fail: 1
warn: 0
- missing_count(device_time) = 0:
name: Ensure there are no null values in the Device Time column
fail: 1
warn: 0
- missing_count(energy_cumulative_active_value) = 0:
name: Ensure there are no null values in the Cumulative Active Energy Value column
fail: 1
warn: 0
- missing_count(energy_cumulative_active_unit) = 0:
name: Ensure there are no null values in the Cumulative Active Energy Unit column
fail: 1
warn: 0
- missing_count(power_instantaneous_active_value) = 0:
name: Ensure there are no null values in the Instantaneous Active Power Value column
fail: 1
warn: 0
- missing_count(power_instantaneous_active_unit) = 0:
name: Ensure there are no null values in the Instantaneous Active Power Unit column
fail: 1
warn: 0
- missing_count(power_average_active_value) = 0:
name: Ensure there are no null values in the Average Active Power Value column
fail: 1
warn: 0
- missing_count(power_average_active_unit) = 0:
name: Ensure there are no null values in the Average Active Power Unit column
fail: 1
warn: 0
"""
df=df.select(
col("site"),
col("meter"),
col("device_time").cast("timestamp"),
col("data_parsed.energy.cumulative.active.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"),
col("data_parsed.energy.cumulative.active.unit").alias("energy_cumulative_active_unit"),
col("data_parsed.energy.cumulative.apparent.value").alias("energy_cumulative_apparent_value").cast("decimal(30,14)"),
col("data_parsed.energy.cumulative.apparent.unit").alias("energy_cumulative_apparent_unit"),
col("data_parsed.energy.cumulative.reactive.value").alias("energy_cumulative_reactive_value").cast("decimal(30,14)"),
col("data_parsed.energy.cumulative.reactive.unit").alias("energy_cumulative_reactive_unit"),
col("data_parsed.power.instantaneous.active.value").alias("power_instantaneous_active_value").cast("decimal(30,14)"),
col("data_parsed.power.instantaneous.active.unit").alias("power_instantaneous_active_unit"),
col("data_parsed.power.instantaneous.apparent.value").alias("power_instantaneous_apparent_value").cast("decimal(30,14)"),
col("data_parsed.power.instantaneous.apparent.unit").alias("power_instantaneous_apparent_unit"),
col("data_parsed.power.instantaneous.reactive.value").alias("power_instantaneous_reactive_value").cast("decimal(30,14)"),
col("data_parsed.power.instantaneous.reactive.unit").alias("power_instantaneous_reactive_unit"),
col("data_parsed.power.average.active.value").alias("power_average_active_value").cast("decimal(30,14)"),
col("data_parsed.power.average.active.unit").alias("power_average_active_unit"),
col("data_parsed.power.average.apparent.value").alias("power_average_apparent_value").cast("decimal(30,14)"),
col("data_parsed.power.average.apparent.unit").alias("power_average_apparent_unit"),
col("data_parsed.power.average.reactive.value").alias("power_average_reactive_value").cast("decimal(30,14)"),
col("data_parsed.power.average.reactive.unit").alias("power_average_reactive_unit"),
col("ingestion_time")
)
# Initialize the Soda scan
scan = Scan()
# Add the Soda Cloud configuration
scan.add_configuration_yaml_str(soda_config)
# Set the scan name
scan.set_scan_definition_name("Bronze Ingestion DQs")
# Add the checks configuration
scan.add_sodacl_yaml_str(checks)
# Add the PySpark DataFrame as the data source for the scan
scan.add_spark_session(df.sparkSession)
# Execute the scan
scan.execute()
#end of function
json_schema_bronze= StructType([
StructField("site", StringType(), nullable=True),
StructField("meter", StringType(), nullable=True),
StructField("device_time", TimestampType(), nullable=True),
StructField("data", StringType(), nullable=True)
])
dfBronze = dfBronze.withColumn(
"data_parsed",
when((col("data").isNotNull()) & (col("data") != ""), from_json(col("data"), json_schema_silver["data"]))
.otherwise(None)
)
# fill the data quality check log in db
dfBronze = dfBronze.withColumn(
"quality_check_message",
concat_ws(",\n",
# Check for null or missing fields
when(col("site").isNull(), lit("Missing site")).otherwise(lit("")),
when(col("meter").isNull(), lit("Missing meter")).otherwise(lit("")),
when(col("device_time").isNull(), lit("Missing device_time")).otherwise(lit("")),
when(col("data_parsed").isNull(), lit("Data field is missing")).otherwise(lit("")),
# Check for missing cumulative energy values, but only if data_parsed is not null
when(col("data_parsed").isNotNull() & col("data_parsed.energy.cumulative.active.value").isNull(), lit("Missing cumulative active energy value")).otherwise(lit("")),
when(col("data_parsed").isNotNull() & col("data_parsed.energy.cumulative.active.unit").isNull(), lit("Missing cumulative active energy unit")).otherwise(lit("")),
# Check for missing instantaneous power values, but only if data_parsed is not null
when(col("data_parsed").isNotNull() & col("data_parsed.power.instantaneous.active.value").isNull(), lit("Missing instantaneous active power value")).otherwise(lit("")),
when(col("data_parsed").isNotNull() & col("data_parsed.power.instantaneous.active.unit").isNull(), lit("Missing instantaneous active power unit")).otherwise(lit("")),
# Check for missing average power values, but only if data_parsed is not null
when(col("data_parsed").isNotNull() & col("data_parsed.power.average.active.value").isNull(), lit("Missing average active power value")).otherwise(lit("")),
when(col("data_parsed").isNotNull() & col("data_parsed.power.average.active.unit").isNull(), lit("Missing average active power unit")).otherwise(lit(""))
# # Check for invalid unit value for cumulative energy, but only if data_parsed is not null
# when(col("data_parsed").isNotNull() & ~col("data_parsed.energy.cumulative.active.unit").isin("kWh"), lit("Invalid active energy unit")).otherwise(lit(""))
)
)
dfBronze.writeStream \
.foreachBatch(process_batch) \
.trigger(processingTime="1 second") \
.start()
dfBronze=dfBronze.drop("data_parsed") #remove formatted data column
query = (dfBronze.writeStream
.format("delta") # Use the Delta format for writing to Delta tables
.option("checkpointLocation", checkpoint_dir_path_bronze) # Checkpoint directory
.outputMode("append")\
.option("mergeSchema", "true") # Use "append" mode to add new data
.trigger(processingTime="1 second") # Trigger every second
.table(bronze_table) # Write to the Delta table
)