I keep on getting Parse_syntax_error on autoloader run foreachbatch

sakuraDev
New Contributor II

Hey guys, I keep on getting this error message when trying to call a function with soda DQ's:

 [PARSE_SYNTAX_ERROR] Syntax error at or near '{'. SQLSTATE: 42601
File <command-81221799516900>, line 4
      1 dfBronze.writeStream \
      2     .foreachBatch(process_batch) \
      3     .trigger(processingTime="1 second") \
----> 4     .start()
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1988, in SparkConnectClient._handle_rpc_error(self, rpc_error)
   1985             info = error_details_pb2.ErrorInfo()
   1986             d.Unpack(info)
-> 1988             raise convert_exception(
   1989                 info,
   1990                 status.message,
   1991                 self._fetch_enriched_error(info),
   1992                 self._display_server_stack_trace(),
   1993             ) from None
   1995     raise SparkConnectGrpcException(status.message) from None
   1996 else:

this is my code, in general I'm just reading a stream with autoloader, the ingestion was going great until i added the function, but I can't find the error:

def process_batch(df: DataFrame, batch_id):
    #! Soda Cloud configuration (move this to environment variables later)
    soda_config = """
    soda_cloud:
      host: cloud.soda.io
      api_key_id: mykey
      api_key_secret: mysecret
    """

    # Define the checks for the batch
    checks = """
    checks for df:
      - missing_count(site) = 0:
          name: Ensure there are no null values in the Site column
          fail: 1
          warn: 0
      - missing_count(meter) = 0:
          name: Ensure there are no null values in the Meter column
          fail: 1
          warn: 0
      - missing_count(device_time) = 0:
          name: Ensure there are no null values in the Device Time column
          fail: 1
          warn: 0
      - missing_count(energy_cumulative_active_value) = 0:
          name: Ensure there are no null values in the Cumulative Active Energy Value column
          fail: 1
          warn: 0
      - missing_count(energy_cumulative_active_unit) = 0:
          name: Ensure there are no null values in the Cumulative Active Energy Unit column
          fail: 1
          warn: 0
      - missing_count(power_instantaneous_active_value) = 0:
          name: Ensure there are no null values in the Instantaneous Active Power Value column
          fail: 1
          warn: 0
      - missing_count(power_instantaneous_active_unit) = 0:
          name: Ensure there are no null values in the Instantaneous Active Power Unit column
          fail: 1
          warn: 0
      - missing_count(power_average_active_value) = 0:
          name: Ensure there are no null values in the Average Active Power Value column
          fail: 1
          warn: 0
      - missing_count(power_average_active_unit) = 0:
          name: Ensure there are no null values in the Average Active Power Unit column
          fail: 1
          warn: 0
    """
    df=df.select(
    col("site"),
    col("meter"),
    col("device_time").cast("timestamp"),
    
    col("data_parsed.energy.cumulative.active.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"),
    col("data_parsed.energy.cumulative.active.unit").alias("energy_cumulative_active_unit"),

    col("data_parsed.energy.cumulative.apparent.value").alias("energy_cumulative_apparent_value").cast("decimal(30,14)"),
    col("data_parsed.energy.cumulative.apparent.unit").alias("energy_cumulative_apparent_unit"),

    col("data_parsed.energy.cumulative.reactive.value").alias("energy_cumulative_reactive_value").cast("decimal(30,14)"),
    col("data_parsed.energy.cumulative.reactive.unit").alias("energy_cumulative_reactive_unit"),

    col("data_parsed.power.instantaneous.active.value").alias("power_instantaneous_active_value").cast("decimal(30,14)"),
    col("data_parsed.power.instantaneous.active.unit").alias("power_instantaneous_active_unit"),

    col("data_parsed.power.instantaneous.apparent.value").alias("power_instantaneous_apparent_value").cast("decimal(30,14)"),
    col("data_parsed.power.instantaneous.apparent.unit").alias("power_instantaneous_apparent_unit"),

    col("data_parsed.power.instantaneous.reactive.value").alias("power_instantaneous_reactive_value").cast("decimal(30,14)"),
    col("data_parsed.power.instantaneous.reactive.unit").alias("power_instantaneous_reactive_unit"),

    col("data_parsed.power.average.active.value").alias("power_average_active_value").cast("decimal(30,14)"),
    col("data_parsed.power.average.active.unit").alias("power_average_active_unit"),

    col("data_parsed.power.average.apparent.value").alias("power_average_apparent_value").cast("decimal(30,14)"),
    col("data_parsed.power.average.apparent.unit").alias("power_average_apparent_unit"),

    col("data_parsed.power.average.reactive.value").alias("power_average_reactive_value").cast("decimal(30,14)"),
    col("data_parsed.power.average.reactive.unit").alias("power_average_reactive_unit"),

    col("ingestion_time")
    )

    # Initialize the Soda scan
    scan = Scan()

    # Add the Soda Cloud configuration
    scan.add_configuration_yaml_str(soda_config)

    # Set the scan name
    scan.set_scan_definition_name("Bronze Ingestion DQs")

    # Add the checks configuration
    scan.add_sodacl_yaml_str(checks)

    # Add the PySpark DataFrame as the data source for the scan
    scan.add_spark_session(df.sparkSession)

    # Execute the scan
    scan.execute()
#end of function
json_schema_bronze= StructType([
    StructField("site", StringType(), nullable=True),
    StructField("meter", StringType(), nullable=True),
     StructField("device_time", TimestampType(), nullable=True),
          StructField("data", StringType(), nullable=True)
    ])

dfBronze = dfBronze.withColumn(
    "data_parsed",
    when((col("data").isNotNull()) & (col("data") != ""), from_json(col("data"), json_schema_silver["data"]))
    .otherwise(None)
)

# fill the data quality check log in db
dfBronze = dfBronze.withColumn(
    "quality_check_message",
    concat_ws(",\n",
        # Check for null or missing fields
        when(col("site").isNull(), lit("Missing site")).otherwise(lit("")),
        when(col("meter").isNull(), lit("Missing meter")).otherwise(lit("")),
        when(col("device_time").isNull(), lit("Missing device_time")).otherwise(lit("")),
        when(col("data_parsed").isNull(), lit("Data field is missing")).otherwise(lit("")),
        
        # Check for missing cumulative energy values, but only if data_parsed is not null
        when(col("data_parsed").isNotNull() & col("data_parsed.energy.cumulative.active.value").isNull(), lit("Missing cumulative active energy value")).otherwise(lit("")),
        when(col("data_parsed").isNotNull() & col("data_parsed.energy.cumulative.active.unit").isNull(), lit("Missing cumulative active energy unit")).otherwise(lit("")),
        
        # Check for missing instantaneous power values, but only if data_parsed is not null
        when(col("data_parsed").isNotNull() & col("data_parsed.power.instantaneous.active.value").isNull(), lit("Missing instantaneous active power value")).otherwise(lit("")),
        when(col("data_parsed").isNotNull() & col("data_parsed.power.instantaneous.active.unit").isNull(), lit("Missing instantaneous active power unit")).otherwise(lit("")),
        
        # Check for missing average power values, but only if data_parsed is not null
        when(col("data_parsed").isNotNull() & col("data_parsed.power.average.active.value").isNull(), lit("Missing average active power value")).otherwise(lit("")),
        when(col("data_parsed").isNotNull() & col("data_parsed.power.average.active.unit").isNull(), lit("Missing average active power unit")).otherwise(lit(""))
        
        # # Check for invalid unit value for cumulative energy, but only if data_parsed is not null
        # when(col("data_parsed").isNotNull() & ~col("data_parsed.energy.cumulative.active.unit").isin("kWh"), lit("Invalid active energy unit")).otherwise(lit(""))
    )
)
dfBronze.writeStream \
    .foreachBatch(process_batch) \
    .trigger(processingTime="1 second") \
    .start()
dfBronze=dfBronze.drop("data_parsed") #remove formatted data column
query = (dfBronze.writeStream
    .format("delta")  # Use the Delta format for writing to Delta tables
    .option("checkpointLocation", checkpoint_dir_path_bronze)  # Checkpoint directory
    .outputMode("append")\
    .option("mergeSchema", "true")  # Use "append" mode to add new data
    .trigger(processingTime="1 second")  # Trigger every second
    .table(bronze_table)  # Write to the Delta table
)

 

VZLA
Databricks Employee
Databricks Employee

Hi @sakuraDev , this looks like a Soda syntax issue.

Try fixing the "fail" and "warn" fields in your Soda checks. For example, instead of writing:

 

- missing_count(site) = 0:
    name: Ensure no null values
    fail: 1
    warn: 0

 

Use Soda's threshold syntax like this:

 

checks for df:
  missing_count(site):
    name: Ensure there are no null values in the Site column
    warn: when > 0
    fail: when >= 1

 

Follow Soda’s documented syntax for thresholds and conditions, and ensure the table name in your checks matches a registered temporary view. This should help resolving the parse error.