<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: I keep on getting Parse_syntax_error on autoloader run foreachbatch in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/i-keep-on-getting-parse-syntax-error-on-autoloader-run/m-p/102100#M40965</link>
    <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/119002"&gt;@sakuraDev&lt;/a&gt;&amp;nbsp;, this looks like a Soda syntax issue.&lt;/P&gt;
&lt;P&gt;Try fixing the "fail" and "warn" fields in your Soda checks. For example, instead of writing:&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="markup"&gt;- missing_count(site) = 0:
    name: Ensure no null values
    fail: 1
    warn: 0&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Use Soda's threshold syntax like this:&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="markup"&gt;checks for df:
  missing_count(site):
    name: Ensure there are no null values in the Site column
    warn: when &amp;gt; 0
    fail: when &amp;gt;= 1&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Follow Soda’s documented syntax for thresholds and conditions, and ensure the table name in your checks matches a registered temporary view. This should help resolving the parse error.&lt;/P&gt;</description>
    <pubDate>Fri, 13 Dec 2024 16:28:33 GMT</pubDate>
    <dc:creator>VZLA</dc:creator>
    <dc:date>2024-12-13T16:28:33Z</dc:date>
    <item>
      <title>I keep on getting Parse_syntax_error on autoloader run foreachbatch</title>
      <link>https://community.databricks.com/t5/data-engineering/i-keep-on-getting-parse-syntax-error-on-autoloader-run/m-p/89183#M37720</link>
      <description>&lt;P&gt;Hey guys, I keep on getting this error message when trying to call a function with soda DQ's:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt; [PARSE_SYNTAX_ERROR] Syntax error at or near '{'. SQLSTATE: 42601
File &amp;lt;command-81221799516900&amp;gt;, line 4
      1 dfBronze.writeStream \
      2     .foreachBatch(process_batch) \
      3     .trigger(processingTime="1 second") \
----&amp;gt; 4     .start()
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1988, in SparkConnectClient._handle_rpc_error(self, rpc_error)
   1985             info = error_details_pb2.ErrorInfo()
   1986             d.Unpack(info)
-&amp;gt; 1988             raise convert_exception(
   1989                 info,
   1990                 status.message,
   1991                 self._fetch_enriched_error(info),
   1992                 self._display_server_stack_trace(),
   1993             ) from None
   1995     raise SparkConnectGrpcException(status.message) from None
   1996 else:&lt;/LI-CODE&gt;&lt;P&gt;this is my code, in general I'm just reading a stream with autoloader, the ingestion was going great until i added the function, but I can't find the error:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def process_batch(df: DataFrame, batch_id):
    #! Soda Cloud configuration (move this to environment variables later)
    soda_config = """
    soda_cloud:
      host: cloud.soda.io
      api_key_id: mykey
      api_key_secret: mysecret
    """

    # Define the checks for the batch
    checks = """
    checks for df:
      - missing_count(site) = 0:
          name: Ensure there are no null values in the Site column
          fail: 1
          warn: 0
      - missing_count(meter) = 0:
          name: Ensure there are no null values in the Meter column
          fail: 1
          warn: 0
      - missing_count(device_time) = 0:
          name: Ensure there are no null values in the Device Time column
          fail: 1
          warn: 0
      - missing_count(energy_cumulative_active_value) = 0:
          name: Ensure there are no null values in the Cumulative Active Energy Value column
          fail: 1
          warn: 0
      - missing_count(energy_cumulative_active_unit) = 0:
          name: Ensure there are no null values in the Cumulative Active Energy Unit column
          fail: 1
          warn: 0
      - missing_count(power_instantaneous_active_value) = 0:
          name: Ensure there are no null values in the Instantaneous Active Power Value column
          fail: 1
          warn: 0
      - missing_count(power_instantaneous_active_unit) = 0:
          name: Ensure there are no null values in the Instantaneous Active Power Unit column
          fail: 1
          warn: 0
      - missing_count(power_average_active_value) = 0:
          name: Ensure there are no null values in the Average Active Power Value column
          fail: 1
          warn: 0
      - missing_count(power_average_active_unit) = 0:
          name: Ensure there are no null values in the Average Active Power Unit column
          fail: 1
          warn: 0
    """
    df=df.select(
    col("site"),
    col("meter"),
    col("device_time").cast("timestamp"),
    
    col("data_parsed.energy.cumulative.active.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"),
    col("data_parsed.energy.cumulative.active.unit").alias("energy_cumulative_active_unit"),

    col("data_parsed.energy.cumulative.apparent.value").alias("energy_cumulative_apparent_value").cast("decimal(30,14)"),
    col("data_parsed.energy.cumulative.apparent.unit").alias("energy_cumulative_apparent_unit"),

    col("data_parsed.energy.cumulative.reactive.value").alias("energy_cumulative_reactive_value").cast("decimal(30,14)"),
    col("data_parsed.energy.cumulative.reactive.unit").alias("energy_cumulative_reactive_unit"),

    col("data_parsed.power.instantaneous.active.value").alias("power_instantaneous_active_value").cast("decimal(30,14)"),
    col("data_parsed.power.instantaneous.active.unit").alias("power_instantaneous_active_unit"),

    col("data_parsed.power.instantaneous.apparent.value").alias("power_instantaneous_apparent_value").cast("decimal(30,14)"),
    col("data_parsed.power.instantaneous.apparent.unit").alias("power_instantaneous_apparent_unit"),

    col("data_parsed.power.instantaneous.reactive.value").alias("power_instantaneous_reactive_value").cast("decimal(30,14)"),
    col("data_parsed.power.instantaneous.reactive.unit").alias("power_instantaneous_reactive_unit"),

    col("data_parsed.power.average.active.value").alias("power_average_active_value").cast("decimal(30,14)"),
    col("data_parsed.power.average.active.unit").alias("power_average_active_unit"),

    col("data_parsed.power.average.apparent.value").alias("power_average_apparent_value").cast("decimal(30,14)"),
    col("data_parsed.power.average.apparent.unit").alias("power_average_apparent_unit"),

    col("data_parsed.power.average.reactive.value").alias("power_average_reactive_value").cast("decimal(30,14)"),
    col("data_parsed.power.average.reactive.unit").alias("power_average_reactive_unit"),

    col("ingestion_time")
    )

    # Initialize the Soda scan
    scan = Scan()

    # Add the Soda Cloud configuration
    scan.add_configuration_yaml_str(soda_config)

    # Set the scan name
    scan.set_scan_definition_name("Bronze Ingestion DQs")

    # Add the checks configuration
    scan.add_sodacl_yaml_str(checks)

    # Add the PySpark DataFrame as the data source for the scan
    scan.add_spark_session(df.sparkSession)

    # Execute the scan
    scan.execute()
#end of function
json_schema_bronze= StructType([
    StructField("site", StringType(), nullable=True),
    StructField("meter", StringType(), nullable=True),
     StructField("device_time", TimestampType(), nullable=True),
          StructField("data", StringType(), nullable=True)
    ])

dfBronze = dfBronze.withColumn(
    "data_parsed",
    when((col("data").isNotNull()) &amp;amp; (col("data") != ""), from_json(col("data"), json_schema_silver["data"]))
    .otherwise(None)
)

# fill the data quality check log in db
dfBronze = dfBronze.withColumn(
    "quality_check_message",
    concat_ws(",\n",
        # Check for null or missing fields
        when(col("site").isNull(), lit("Missing site")).otherwise(lit("")),
        when(col("meter").isNull(), lit("Missing meter")).otherwise(lit("")),
        when(col("device_time").isNull(), lit("Missing device_time")).otherwise(lit("")),
        when(col("data_parsed").isNull(), lit("Data field is missing")).otherwise(lit("")),
        
        # Check for missing cumulative energy values, but only if data_parsed is not null
        when(col("data_parsed").isNotNull() &amp;amp; col("data_parsed.energy.cumulative.active.value").isNull(), lit("Missing cumulative active energy value")).otherwise(lit("")),
        when(col("data_parsed").isNotNull() &amp;amp; col("data_parsed.energy.cumulative.active.unit").isNull(), lit("Missing cumulative active energy unit")).otherwise(lit("")),
        
        # Check for missing instantaneous power values, but only if data_parsed is not null
        when(col("data_parsed").isNotNull() &amp;amp; col("data_parsed.power.instantaneous.active.value").isNull(), lit("Missing instantaneous active power value")).otherwise(lit("")),
        when(col("data_parsed").isNotNull() &amp;amp; col("data_parsed.power.instantaneous.active.unit").isNull(), lit("Missing instantaneous active power unit")).otherwise(lit("")),
        
        # Check for missing average power values, but only if data_parsed is not null
        when(col("data_parsed").isNotNull() &amp;amp; col("data_parsed.power.average.active.value").isNull(), lit("Missing average active power value")).otherwise(lit("")),
        when(col("data_parsed").isNotNull() &amp;amp; col("data_parsed.power.average.active.unit").isNull(), lit("Missing average active power unit")).otherwise(lit(""))
        
        # # Check for invalid unit value for cumulative energy, but only if data_parsed is not null
        # when(col("data_parsed").isNotNull() &amp;amp; ~col("data_parsed.energy.cumulative.active.unit").isin("kWh"), lit("Invalid active energy unit")).otherwise(lit(""))
    )
)
dfBronze.writeStream \
    .foreachBatch(process_batch) \
    .trigger(processingTime="1 second") \
    .start()
dfBronze=dfBronze.drop("data_parsed") #remove formatted data column
query = (dfBronze.writeStream
    .format("delta")  # Use the Delta format for writing to Delta tables
    .option("checkpointLocation", checkpoint_dir_path_bronze)  # Checkpoint directory
    .outputMode("append")\
    .option("mergeSchema", "true")  # Use "append" mode to add new data
    .trigger(processingTime="1 second")  # Trigger every second
    .table(bronze_table)  # Write to the Delta table
)

&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 09 Sep 2024 13:02:40 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/i-keep-on-getting-parse-syntax-error-on-autoloader-run/m-p/89183#M37720</guid>
      <dc:creator>sakuraDev</dc:creator>
      <dc:date>2024-09-09T13:02:40Z</dc:date>
    </item>
    <item>
      <title>Re: I keep on getting Parse_syntax_error on autoloader run foreachbatch</title>
      <link>https://community.databricks.com/t5/data-engineering/i-keep-on-getting-parse-syntax-error-on-autoloader-run/m-p/102100#M40965</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/119002"&gt;@sakuraDev&lt;/a&gt;&amp;nbsp;, this looks like a Soda syntax issue.&lt;/P&gt;
&lt;P&gt;Try fixing the "fail" and "warn" fields in your Soda checks. For example, instead of writing:&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="markup"&gt;- missing_count(site) = 0:
    name: Ensure no null values
    fail: 1
    warn: 0&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Use Soda's threshold syntax like this:&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="markup"&gt;checks for df:
  missing_count(site):
    name: Ensure there are no null values in the Site column
    warn: when &amp;gt; 0
    fail: when &amp;gt;= 1&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Follow Soda’s documented syntax for thresholds and conditions, and ensure the table name in your checks matches a registered temporary view. This should help resolving the parse error.&lt;/P&gt;</description>
      <pubDate>Fri, 13 Dec 2024 16:28:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/i-keep-on-getting-parse-syntax-error-on-autoloader-run/m-p/102100#M40965</guid>
      <dc:creator>VZLA</dc:creator>
      <dc:date>2024-12-13T16:28:33Z</dc:date>
    </item>
  </channel>
</rss>

