cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

I keep on getting Parse_syntax_error on autoloader run foreachbatch

sakuraDev
New Contributor II

Hey guys, I keep on getting this error message when trying to call a function with soda DQ's:

 [PARSE_SYNTAX_ERROR] Syntax error at or near '{'. SQLSTATE: 42601
File <command-81221799516900>, line 4
      1 dfBronze.writeStream \
      2     .foreachBatch(process_batch) \
      3     .trigger(processingTime="1 second") \
----> 4     .start()
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1988, in SparkConnectClient._handle_rpc_error(self, rpc_error)
   1985             info = error_details_pb2.ErrorInfo()
   1986             d.Unpack(info)
-> 1988             raise convert_exception(
   1989                 info,
   1990                 status.message,
   1991                 self._fetch_enriched_error(info),
   1992                 self._display_server_stack_trace(),
   1993             ) from None
   1995     raise SparkConnectGrpcException(status.message) from None
   1996 else:

this is my code, in general I'm just reading a stream with autoloader, the ingestion was going great until i added the function, but I can't find the error:

def process_batch(df: DataFrame, batch_id):
    #! Soda Cloud configuration (move this to environment variables later)
    soda_config = """
    soda_cloud:
      host: cloud.soda.io
      api_key_id: mykey
      api_key_secret: mysecret
    """

    # Define the checks for the batch
    checks = """
    checks for df:
      - missing_count(site) = 0:
          name: Ensure there are no null values in the Site column
          fail: 1
          warn: 0
      - missing_count(meter) = 0:
          name: Ensure there are no null values in the Meter column
          fail: 1
          warn: 0
      - missing_count(device_time) = 0:
          name: Ensure there are no null values in the Device Time column
          fail: 1
          warn: 0
      - missing_count(energy_cumulative_active_value) = 0:
          name: Ensure there are no null values in the Cumulative Active Energy Value column
          fail: 1
          warn: 0
      - missing_count(energy_cumulative_active_unit) = 0:
          name: Ensure there are no null values in the Cumulative Active Energy Unit column
          fail: 1
          warn: 0
      - missing_count(power_instantaneous_active_value) = 0:
          name: Ensure there are no null values in the Instantaneous Active Power Value column
          fail: 1
          warn: 0
      - missing_count(power_instantaneous_active_unit) = 0:
          name: Ensure there are no null values in the Instantaneous Active Power Unit column
          fail: 1
          warn: 0
      - missing_count(power_average_active_value) = 0:
          name: Ensure there are no null values in the Average Active Power Value column
          fail: 1
          warn: 0
      - missing_count(power_average_active_unit) = 0:
          name: Ensure there are no null values in the Average Active Power Unit column
          fail: 1
          warn: 0
    """
    df=df.select(
    col("site"),
    col("meter"),
    col("device_time").cast("timestamp"),
    
    col("data_parsed.energy.cumulative.active.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"),
    col("data_parsed.energy.cumulative.active.unit").alias("energy_cumulative_active_unit"),

    col("data_parsed.energy.cumulative.apparent.value").alias("energy_cumulative_apparent_value").cast("decimal(30,14)"),
    col("data_parsed.energy.cumulative.apparent.unit").alias("energy_cumulative_apparent_unit"),

    col("data_parsed.energy.cumulative.reactive.value").alias("energy_cumulative_reactive_value").cast("decimal(30,14)"),
    col("data_parsed.energy.cumulative.reactive.unit").alias("energy_cumulative_reactive_unit"),

    col("data_parsed.power.instantaneous.active.value").alias("power_instantaneous_active_value").cast("decimal(30,14)"),
    col("data_parsed.power.instantaneous.active.unit").alias("power_instantaneous_active_unit"),

    col("data_parsed.power.instantaneous.apparent.value").alias("power_instantaneous_apparent_value").cast("decimal(30,14)"),
    col("data_parsed.power.instantaneous.apparent.unit").alias("power_instantaneous_apparent_unit"),

    col("data_parsed.power.instantaneous.reactive.value").alias("power_instantaneous_reactive_value").cast("decimal(30,14)"),
    col("data_parsed.power.instantaneous.reactive.unit").alias("power_instantaneous_reactive_unit"),

    col("data_parsed.power.average.active.value").alias("power_average_active_value").cast("decimal(30,14)"),
    col("data_parsed.power.average.active.unit").alias("power_average_active_unit"),

    col("data_parsed.power.average.apparent.value").alias("power_average_apparent_value").cast("decimal(30,14)"),
    col("data_parsed.power.average.apparent.unit").alias("power_average_apparent_unit"),

    col("data_parsed.power.average.reactive.value").alias("power_average_reactive_value").cast("decimal(30,14)"),
    col("data_parsed.power.average.reactive.unit").alias("power_average_reactive_unit"),

    col("ingestion_time")
    )

    # Initialize the Soda scan
    scan = Scan()

    # Add the Soda Cloud configuration
    scan.add_configuration_yaml_str(soda_config)

    # Set the scan name
    scan.set_scan_definition_name("Bronze Ingestion DQs")

    # Add the checks configuration
    scan.add_sodacl_yaml_str(checks)

    # Add the PySpark DataFrame as the data source for the scan
    scan.add_spark_session(df.sparkSession)

    # Execute the scan
    scan.execute()
#end of function
json_schema_bronze= StructType([
    StructField("site", StringType(), nullable=True),
    StructField("meter", StringType(), nullable=True),
     StructField("device_time", TimestampType(), nullable=True),
          StructField("data", StringType(), nullable=True)
    ])

dfBronze = dfBronze.withColumn(
    "data_parsed",
    when((col("data").isNotNull()) & (col("data") != ""), from_json(col("data"), json_schema_silver["data"]))
    .otherwise(None)
)

# fill the data quality check log in db
dfBronze = dfBronze.withColumn(
    "quality_check_message",
    concat_ws(",\n",
        # Check for null or missing fields
        when(col("site").isNull(), lit("Missing site")).otherwise(lit("")),
        when(col("meter").isNull(), lit("Missing meter")).otherwise(lit("")),
        when(col("device_time").isNull(), lit("Missing device_time")).otherwise(lit("")),
        when(col("data_parsed").isNull(), lit("Data field is missing")).otherwise(lit("")),
        
        # Check for missing cumulative energy values, but only if data_parsed is not null
        when(col("data_parsed").isNotNull() & col("data_parsed.energy.cumulative.active.value").isNull(), lit("Missing cumulative active energy value")).otherwise(lit("")),
        when(col("data_parsed").isNotNull() & col("data_parsed.energy.cumulative.active.unit").isNull(), lit("Missing cumulative active energy unit")).otherwise(lit("")),
        
        # Check for missing instantaneous power values, but only if data_parsed is not null
        when(col("data_parsed").isNotNull() & col("data_parsed.power.instantaneous.active.value").isNull(), lit("Missing instantaneous active power value")).otherwise(lit("")),
        when(col("data_parsed").isNotNull() & col("data_parsed.power.instantaneous.active.unit").isNull(), lit("Missing instantaneous active power unit")).otherwise(lit("")),
        
        # Check for missing average power values, but only if data_parsed is not null
        when(col("data_parsed").isNotNull() & col("data_parsed.power.average.active.value").isNull(), lit("Missing average active power value")).otherwise(lit("")),
        when(col("data_parsed").isNotNull() & col("data_parsed.power.average.active.unit").isNull(), lit("Missing average active power unit")).otherwise(lit(""))
        
        # # Check for invalid unit value for cumulative energy, but only if data_parsed is not null
        # when(col("data_parsed").isNotNull() & ~col("data_parsed.energy.cumulative.active.unit").isin("kWh"), lit("Invalid active energy unit")).otherwise(lit(""))
    )
)
dfBronze.writeStream \
    .foreachBatch(process_batch) \
    .trigger(processingTime="1 second") \
    .start()
dfBronze=dfBronze.drop("data_parsed") #remove formatted data column
query = (dfBronze.writeStream
    .format("delta")  # Use the Delta format for writing to Delta tables
    .option("checkpointLocation", checkpoint_dir_path_bronze)  # Checkpoint directory
    .outputMode("append")\
    .option("mergeSchema", "true")  # Use "append" mode to add new data
    .trigger(processingTime="1 second")  # Trigger every second
    .table(bronze_table)  # Write to the Delta table
)

 

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group