<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160505#M54893</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;H3&gt;&lt;FONT size="3"&gt;1. &lt;SPAN&gt;What situations typically trigger AMBIGUOUS_REFERENCE_TO_FIELDS&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/H3&gt;&lt;P&gt;&lt;FONT size="3"&gt;It occurs when Spark finds&amp;nbsp;multiple columns with the same name at the same nesting level&amp;nbsp;in a Data Frame. It most commonly happens due to&amp;nbsp;&lt;/FONT&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Wildcard expansion -&amp;nbsp;Using&amp;nbsp;.select("json.*")&amp;nbsp;followed by&amp;nbsp;.select("*", "k.*")&amp;nbsp;creates both a struct field&amp;nbsp;k&amp;nbsp;(containing nested&amp;nbsp;t) and a flat field&amp;nbsp;t&amp;nbsp;at the top level&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Union/Join collisions: Combining Data Frames that both have fields named&amp;nbsp;t&amp;nbsp;without proper aliasing&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Duplicate schema definitions: Defining the same field twice in a StructType&lt;/FONT&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;&lt;FONT size="3"&gt;2. &lt;SPAN&gt;Can nested fields in a separate schema cause this error&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/H3&gt;&lt;P&gt;&lt;FONT size="3"&gt;Not by itself. Trade schema with&amp;nbsp;t&amp;nbsp;and kline schema with&amp;nbsp;k.t&amp;nbsp;are good independently. &lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="3"&gt;Problem arises when you&lt;/FONT&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Expand struct fields with wildcards (select("k.*")&amp;nbsp;promotes nested&amp;nbsp;k.t&amp;nbsp;to top-level&amp;nbsp;t)&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Combine both streams without distinct column names&lt;/FONT&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;&lt;FONT size="3"&gt;3. &lt;SPAN&gt;Can nested fields in a separate schema cause this error&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/H3&gt;&lt;UL&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Wildcard expansion&amp;nbsp;(most common)&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Column expansion with&amp;nbsp;select("*")&amp;nbsp;or&amp;nbsp;select("struct_field.*")&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Union/join operations without explicit column selection&lt;/FONT&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;&lt;FONT size="3"&gt;4. &lt;SPAN&gt;What debugging steps would you recommend to identify which DataFrame contains the duplicate field&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/H3&gt;&lt;P&gt;&lt;FONT size="3"&gt;You can check parsing code&lt;/FONT&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Print&amp;nbsp;df.columns&amp;nbsp;after each transformation to spot duplicates&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Print&amp;nbsp;df.printSchema()&amp;nbsp;to see if&amp;nbsp;it&amp;nbsp;appears at multiple levels&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Check for&amp;nbsp;.select("json.*")&amp;nbsp;or&amp;nbsp;.select("*", "k.*")&amp;nbsp;patterns&lt;/FONT&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&lt;FONT size="3"&gt;Use explicit nested field paths with proper aliasing for kline stream like you already do for trade stream.&lt;/FONT&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 25 Jun 2026 09:49:50 GMT</pubDate>
    <dc:creator>balajij8</dc:creator>
    <dc:date>2026-06-25T09:49:50Z</dc:date>
    <item>
      <title>PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160498#M54891</link>
      <description>&lt;P&gt;I'm working on a personal data engineering project using Kafka, Spark Structured Streaming, and Docker.&lt;/P&gt;&lt;P&gt;The application consumes two Kafka topics that originate from an external market-data websocket source:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;a trade stream&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;a candlestick (kline/OHLCV) stream&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;I'm using the following schemas in my Spark job:&lt;/P&gt;&lt;PRE&gt;trade_schema = StructType([
    StructField("e", StringType(), True),
    StructField("s", StringType(), True),
    StructField("t", LongType(), True),
    StructField("p", StringType(), True),
    StructField("q", StringType(), True),
    StructField("T", LongType(), True),
    StructField("m", BooleanType(), True)
])&lt;/PRE&gt;&lt;PRE&gt;parsed_trade_df = (
    trade_raw_df
    .select(
        from_json(
            col("value").cast("string"),
            trade_schema
        ).alias("json")
    )
    .filter(col("json").isNotNull())
    .select(
        col("json.e").alias("event_type"),
        col("json.s").alias("symbol"),
        col("json.t").alias("trade_id"),
        col("json.p").cast(DecimalType(18, 2)).alias("price"),
        col("json.q").cast(DecimalType(18, 6)).alias("quantity"),
        col("json.T").alias("trade_time_ms"),
        col("json.m").alias("is_buyer_maker")
    )
)&lt;/PRE&gt;&lt;P&gt;The Spark application fails during parsing with:&lt;/P&gt;&lt;PRE&gt;AnalysisException:
[AMBIGUOUS_REFERENCE_TO_FIELDS]
Ambiguous reference to the field `t`.
It appears 2 times in the schema.&lt;/PRE&gt;&lt;P&gt;The traceback points to a .select(...) operation.&lt;/P&gt;&lt;P&gt;I also consume a second stream containing nested structures with fields such as:&lt;/P&gt;&lt;PRE&gt;{
  "e": "kline",
  "k": {
    "t": 1782371940000,
    "T": 1782371999999
  }
}&lt;/PRE&gt;&lt;P&gt;What I'm trying to understand is the root cause of Spark reporting an ambiguous reference to t.&lt;/P&gt;&lt;P&gt;My understanding is that Spark should distinguish between:&lt;/P&gt;&lt;PRE&gt;col("json.t")&lt;/PRE&gt;&lt;P&gt;and&lt;/P&gt;&lt;PRE&gt;col("json.k.t")&lt;/PRE&gt;&lt;P&gt;Questions:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;What situations typically trigger AMBIGUOUS_REFERENCE_TO_FIELDS?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Can nested fields in a separate schema cause this error?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Is this usually related to schema definitions, column expansion (select("*"), select("json.*")), joins, or something else?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;What debugging steps would you recommend to identify which DataFrame contains the duplicate field?&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;I'm mainly interested in understanding the cause so I can debug it myself.&lt;/P&gt;</description>
      <pubDate>Thu, 25 Jun 2026 08:18:08 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160498#M54891</guid>
      <dc:creator>VikasM</dc:creator>
      <dc:date>2026-06-25T08:18:08Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160505#M54893</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;H3&gt;&lt;FONT size="3"&gt;1. &lt;SPAN&gt;What situations typically trigger AMBIGUOUS_REFERENCE_TO_FIELDS&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/H3&gt;&lt;P&gt;&lt;FONT size="3"&gt;It occurs when Spark finds&amp;nbsp;multiple columns with the same name at the same nesting level&amp;nbsp;in a Data Frame. It most commonly happens due to&amp;nbsp;&lt;/FONT&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Wildcard expansion -&amp;nbsp;Using&amp;nbsp;.select("json.*")&amp;nbsp;followed by&amp;nbsp;.select("*", "k.*")&amp;nbsp;creates both a struct field&amp;nbsp;k&amp;nbsp;(containing nested&amp;nbsp;t) and a flat field&amp;nbsp;t&amp;nbsp;at the top level&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Union/Join collisions: Combining Data Frames that both have fields named&amp;nbsp;t&amp;nbsp;without proper aliasing&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Duplicate schema definitions: Defining the same field twice in a StructType&lt;/FONT&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;&lt;FONT size="3"&gt;2. &lt;SPAN&gt;Can nested fields in a separate schema cause this error&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/H3&gt;&lt;P&gt;&lt;FONT size="3"&gt;Not by itself. Trade schema with&amp;nbsp;t&amp;nbsp;and kline schema with&amp;nbsp;k.t&amp;nbsp;are good independently. &lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="3"&gt;Problem arises when you&lt;/FONT&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Expand struct fields with wildcards (select("k.*")&amp;nbsp;promotes nested&amp;nbsp;k.t&amp;nbsp;to top-level&amp;nbsp;t)&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Combine both streams without distinct column names&lt;/FONT&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;&lt;FONT size="3"&gt;3. &lt;SPAN&gt;Can nested fields in a separate schema cause this error&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/H3&gt;&lt;UL&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Wildcard expansion&amp;nbsp;(most common)&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Column expansion with&amp;nbsp;select("*")&amp;nbsp;or&amp;nbsp;select("struct_field.*")&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Union/join operations without explicit column selection&lt;/FONT&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;&lt;FONT size="3"&gt;4. &lt;SPAN&gt;What debugging steps would you recommend to identify which DataFrame contains the duplicate field&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/H3&gt;&lt;P&gt;&lt;FONT size="3"&gt;You can check parsing code&lt;/FONT&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Print&amp;nbsp;df.columns&amp;nbsp;after each transformation to spot duplicates&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Print&amp;nbsp;df.printSchema()&amp;nbsp;to see if&amp;nbsp;it&amp;nbsp;appears at multiple levels&lt;/FONT&gt;&lt;/LI&gt;&lt;LI&gt;&lt;FONT size="3"&gt;Check for&amp;nbsp;.select("json.*")&amp;nbsp;or&amp;nbsp;.select("*", "k.*")&amp;nbsp;patterns&lt;/FONT&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&lt;FONT size="3"&gt;Use explicit nested field paths with proper aliasing for kline stream like you already do for trade stream.&lt;/FONT&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 25 Jun 2026 09:49:50 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160505#M54893</guid>
      <dc:creator>balajij8</dc:creator>
      <dc:date>2026-06-25T09:49:50Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160630#M54916</link>
      <description>&lt;P&gt;Thank you for the detailed explanation. It helped me understand the common situations that can trigger AMBIGUOUS_REFERENCE_TO_FIELDS.&lt;/P&gt;&lt;P&gt;Based on your suggestions, I reviewed my code again, but I'm still confused because I don't believe any of those cases apply here.&lt;/P&gt;&lt;P&gt;Specifically:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;I'm not using .select("json.*") or .select("*").&lt;/LI&gt;&lt;LI&gt;I'm not expanding k.*.&lt;/LI&gt;&lt;LI&gt;I'm not performing any joins or unions before this error occurs.&lt;/LI&gt;&lt;LI&gt;The exception is raised while parsing the trade stream, before any processing of the kline stream begins.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;The parsing code is simply:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;trade_raw_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BROKER)
    .option("subscribe", TRADE_STREAM_NAME)
    .option("startingOffsets", "latest")
    .load()
)

# ========================================================================
# TRADE PARSING
# ========================================================================
parsed_trade_df = (
    trade_raw_df
    .select(
        from_json(
            col("value").cast("string"),
            trade_schema
        ).alias("json")
    )
    .filter(col("json").isNotNull())
    .select(
        col("json.e").alias("event_type"),
        col("json.s").alias("symbol"),
        col("json.t").alias("trade_id"),
        col("json.p").cast(DecimalType(18, 2)).alias("price"),
        col("json.q").cast(DecimalType(18, 6)).alias("quantity"),
        col("json.T").alias("trade_time_ms"),
        col("json.m").alias("is_buyer_maker")
    )
    .filter(col("event_type") == "trade")
    .filter(col("trade_time_ms").isNotNull())
    .filter(col("price").isNotNull())
    .filter(col("quantity").isNotNull())
)
parsed_trade_df.printSchema()
# ========================================================================
# TRADE TRANSFORMATION
# ========================================================================
trade_df = (
    parsed_trade_df
    .withColumn(
        "event_time",
        (col("trade_time_ms") / 1000).cast("timestamp")
    )
    .withColumn(
        "total_value_usd",
        col("price") * col("quantity")
    )
)&lt;/LI-CODE&gt;&lt;P&gt;&lt;BR /&gt;The traceback points to this .select(...) call, which is why I'm struggling to understand where Spark is finding two fields named t.&lt;/P&gt;&lt;P&gt;Would you mind taking a look at this parsing code? If it doesn't immediately stand out, I'm also happy to share the rest of the streaming script if that would help identify where the duplicate field might be coming from.&lt;/P&gt;&lt;P&gt;Thank you again for your guidance.&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 09:15:26 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160630#M54916</guid>
      <dc:creator>VikasM</dc:creator>
      <dc:date>2026-06-26T09:15:26Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160632#M54918</link>
      <description>&lt;P&gt;Hi Vikas,&amp;nbsp;&lt;/P&gt;&lt;P&gt;You may share the code directly in a message.&amp;nbsp;&lt;/P&gt;&lt;P&gt;You can follow below&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Enable case-sensitive mode spark.conf.set("spark.sql.caseSensitive", "true") &amp;amp; validate the code&lt;/LI&gt;&lt;LI&gt;Rename Fields immediately after parsing &amp;amp; validate the code.&lt;/LI&gt;&lt;/UL&gt;&lt;LI-CODE lang="python"&gt;parsed_trade_df = (
    trade_raw_df
    .select(from_json(col("value").cast("string"), trade_schema).alias("json"))
)​

changed_df = parsed_trade_df
    .select(col("json.t").alias("trade_id"),col("json.T").alias("trade_time_ms"), col("json.e").alias("event_type"))
)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 09:29:18 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160632#M54918</guid>
      <dc:creator>balajij8</dc:creator>
      <dc:date>2026-06-26T09:29:18Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160635#M54919</link>
      <description>&lt;P&gt;Hi Vikas, You can resolve the ambiguity by setting the schema upfront and it's a good approach&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Rename fields in schema to avoid differences
trade_schema = StructType([
    StructField("e", StringType(), True),
    StructField("s", StringType(), True),
    StructField("trade_id", LongType(), True),        # Renamed from "t"
    StructField("p", StringType(), True),
    StructField("q", StringType(), True),
    StructField("trade_time_ms", LongType(), True),   # Renamed from "T"
    StructField("m", BooleanType(), True)
])

# The JSON keys "t" and "T" map to schema fields "trade_id" and "trade_time_ms"
parsed_trade_df = (
    trade_raw_df
    .select(from_json(col("value").cast("string"), trade_schema).alias("json"))
    .filter(col("json").isNotNull())
    .select(
        col("json.e").alias("event_type"),
        col("json.s").alias("symbol"),
        col("json.trade_id").alias("trade_id"),
        col("json.p").cast(DecimalType(18, 2)).alias("price"),
        col("json.q").cast(DecimalType(18, 6)).alias("quantity"),
        col("json.trade_time_ms").alias("trade_time_ms"),
        col("json.m").alias("is_buyer_maker")
    )
    .filter(col("event_type") == "trade")
    .filter(col("trade_time_ms").isNotNull())
    .filter(col("price").isNotNull())
    .filter(col("quantity").isNotNull())
)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 09:54:38 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160635#M54919</guid>
      <dc:creator>balajij8</dc:creator>
      <dc:date>2026-06-26T09:54:38Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160637#M54921</link>
      <description>&lt;P&gt;I haven't tried enabling case-sensitive mode yet, but I'll test it and report back with the results.&lt;/P&gt;&lt;P&gt;In the meantime, I'm also attaching my 'spark_streaming.py'&amp;nbsp;script. If you have a chance to review it, I'd really appreciate it. I'm wondering if there's something in my parsing or transformation logic that's causing Spark to report the ambiguous field reference.&lt;/P&gt;&lt;P&gt;Thank you again for your guidance.&lt;/P&gt;&lt;P&gt;spark_streaming script&lt;/P&gt;&lt;LI-CODE lang="python"&gt;mport os

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DecimalType,
    LongType,
    BooleanType
)

# ========================================================================
# CONFIG
# ========================================================================
RUN_MODE = os.getenv("RUN_MODE", "LOCAL")
KAFKA_BROKER = os.getenv("KAFKA_BROKER", "kafka:29092")

TRADE_STREAM_NAME = "crypto-trade-stream"
KLINE_STREAM_NAME = "crypto-kline-stream"

WHALE_THRESHOLD_USD = "50000"

print(f"⚙️ Starting Spark Streaming [{RUN_MODE}]")

# ========================================================================
# TRADE STREAM SCHEMA (MICRO VIEW)
# ========================================================================
trade_schema = StructType([
    StructField("e", StringType(), True),
    StructField("s", StringType(), True),
    StructField("trade_id", LongType(), True),
    StructField("p", StringType(), True),
    StructField("q", StringType(), True),
    StructField("trade_time_ms", LongType(), True),
    StructField("m", BooleanType(), True)
])

# ========================================================================
# KLINE STREAM SCHEMA (MACRO VIEW)
# ========================================================================
kline_schema = StructType([
    StructField("e", StringType(), True),
    StructField("s", StringType(), True),
    StructField("E", LongType(), True),
    StructField(
        "k",
        StructType([
            StructField("t", LongType(), True),
            StructField("T", LongType(), True),
            StructField("i", StringType(), True),
            StructField("o", StringType(), True),
            StructField("h", StringType(), True),
            StructField("l", StringType(), True),
            StructField("c", StringType(), True),
            StructField("v", StringType(), True),
            StructField("q", StringType(), True),
            StructField("n", LongType(), True),
            StructField("x", BooleanType(), True)
        ])
    )
])

# ========================================================================
# SPARK SESSION
# ========================================================================
builder = (
    SparkSession.builder
    .appName("CryptoWhaleStreamingEngine")
)

if RUN_MODE == "LOCAL":
    builder = builder.master("spark://spark-master:7077")

spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# ========================================================================
# TRADE STREAM SOURCE
# ========================================================================
trade_raw_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BROKER)
    .option("subscribe", TRADE_STREAM_NAME)
    .option("startingOffsets", "latest")
    .load()
)

# ========================================================================
# TRADE PARSING
# ========================================================================
parsed_trade_df = (
    trade_raw_df
    .select(
        from_json(
            col("value").cast("string"),
            trade_schema
        ).alias("json")
    )
    .filter(col("json").isNotNull())
    .select(
        col("json.e").alias("event_type"),
        col("json.s").alias("symbol"),
        col("json.t").alias("trade_id"),
        col("json.p").cast(DecimalType(18, 2)).alias("price"),
        col("json.q").cast(DecimalType(18, 6)).alias("quantity"),
        col("json.T").alias("trade_time_ms"),
        col("json.m").alias("is_buyer_maker")
    )
    .filter(col("event_type") == "trade")
    .filter(col("trade_time_ms").isNotNull())
    .filter(col("price").isNotNull())
    .filter(col("quantity").isNotNull())
)
parsed_trade_df.printSchema()
# ========================================================================
# TRADE TRANSFORMATION
# ========================================================================
trade_df = (
    parsed_trade_df
    .withColumn(
        "event_time",
        (col("trade_time_ms") / 1000).cast("timestamp")
    )
    .withColumn(
        "total_value_usd",
        col("price") * col("quantity")
    )
)

# ========================================================================
# WATERMARK
# ========================================================================
watermarked_trade_df = (
    trade_df.withWatermark(
        "event_time",
        "10 minutes"
    )
)

# ========================================================================
# WHALE DETECTION
# ========================================================================
whale_df = (
    watermarked_trade_df
    .filter(col("total_value_usd") &amp;gt;= WHALE_THRESHOLD_USD)
)

print("✅ Starting KLINE source")

# ========================================================================
# KLINE STREAM SOURCE
# ========================================================================
kline_raw_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BROKER)
    .option("subscribe", KLINE_STREAM_NAME)
    .option("startingOffsets", "latest")
    .load()
)

# ========================================================================
# KLINE PARSING
# ========================================================================
parsed_kline_df = (
    kline_raw_df
    .select(
        from_json(
            col("value").cast("string"),
            kline_schema
        ).alias("json")
    )
    .filter(col("json").isNotNull())
    .select(
        col("json.e").alias("event_type"),
        col("json.s").alias("symbol"),
        col("json.E").alias("event_time_ms"),

        col("json.k.t").alias("candle_start_ms"),
        col("json.k.T").alias("candle_close_ms"),
        col("json.k.i").alias("interval"),

        col("json.k.o").cast(DecimalType(18, 2)).alias("open_price"),
        col("json.k.h").cast(DecimalType(18, 2)).alias("high_price"),
        col("json.k.l").cast(DecimalType(18, 2)).alias("low_price"),
        col("json.k.c").cast(DecimalType(18, 2)).alias("close_price"),

        col("json.k.v").cast(DecimalType(18, 6)).alias("base_volume"),
        col("json.k.q").cast(DecimalType(18, 2)).alias("quote_volume"),

        col("json.k.n").alias("trades_count"),
        col("json.k.x").alias("is_candle_closed")
    )
    .filter(col("event_type") == "kline")
    .filter(col("event_time_ms").isNotNull())
)
parsed_kline_df.printSchema()
# ========================================================================
# KLINE TRANSFORMATION
# ========================================================================
kline_df = (
    parsed_kline_df
    .withColumn(
        "event_time",
        (col("event_time_ms") / 1000).cast("timestamp")
    )
)

# ========================================================================
# DEBUG FUNCTIONS
# ========================================================================
def debug_whales(df, batch_id):
    count = df.count()
    if count &amp;gt; 0:
        print(
            f"🐋 Batch {batch_id}: "
            f"{count} whale trades detected"
        )

def debug_klines(df, batch_id):
    count = df.count()
    if count &amp;gt; 0:
        print(
            f"📈 Batch {batch_id}: "
            f"{count} kline records processed"
        )

# ========================================================================
# DEBUG STREAMS
# ========================================================================
trade_debug_query = (
    whale_df.writeStream
    .queryName("whale_debug")
    .foreachBatch(debug_whales)
    .start()
)

kline_debug_query = (
    kline_df.writeStream
    .queryName("kline_debug")
    .foreachBatch(debug_klines)
    .start()
)

# ========================================================================
# WHALE PARQUET SINK
# ========================================================================
whale_query = (
    whale_df.writeStream
    .queryName("whale_alerts")
    .format("parquet")
    .outputMode("append")
    .option(
        "path",
        "/opt/spark/app/data/whale_alerts"
    )
    .option(
        "checkpointLocation",
        "/opt/spark/app/checkpoints/whale_alerts"
    )
    .trigger(processingTime="10 seconds")
    .start()
)

# ========================================================================
# KLINE PARQUET SINK
# ========================================================================
kline_query = (
    kline_df.writeStream
    .queryName("candlestick_history")
    .format("parquet")
    .outputMode("append")
    .option(
        "path",
        "/opt/spark/app/data/candlesticks"
    )
    .option(
        "checkpointLocation",
        "/opt/spark/app/checkpoints/candlesticks"
    )
    .trigger(processingTime="10 seconds")
    .start()
)

print("🚀 Whale detection pipeline running")
print("🚀 Candlestick pipeline running")


spark.streams.awaitAnyTermination()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 10:56:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160637#M54921</guid>
      <dc:creator>VikasM</dc:creator>
      <dc:date>2026-06-26T10:56:52Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160642#M54922</link>
      <description>&lt;P&gt;Do check the other 2 options listed above too - upfront schema setup &amp;amp; field renaming&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 11:03:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160642#M54922</guid>
      <dc:creator>balajij8</dc:creator>
      <dc:date>2026-06-26T11:03:07Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160653#M54927</link>
      <description>&lt;P&gt;Thank you, balajij8, for your suggestion about enabling case-sensitive mode. It worked! The process now moves past the previous error, and Spark is successfully consuming data from Kafka.&lt;/P&gt;&lt;P&gt;However, it looks like I've run into another issue. Although the streaming job is consuming the data, it doesn't appear to be writing any Parquet files as expected.&lt;/P&gt;&lt;P&gt;I do see the checkpoint directories being created correctly, both inside the Spark container and on my local machine through the mounted volume, so it seems the streaming queries are running. The only thing missing is the Parquet output.&lt;/P&gt;&lt;P&gt;I'll investigate this next, but if you have any suggestions about what might cause Spark Structured Streaming to create checkpoints without writing any output files, I'd really appreciate your guidance.&lt;/P&gt;&lt;P&gt;following is my Parquet sink:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;whale_query = (
    whale_df.writeStream
    .queryName("whale_alerts")
    .format("parquet")
    .outputMode("append")
    .option(
        "path",
        "/opt/spark/app/data/whale_alerts"
    )
    .option(
        "checkpointLocation",
        "/opt/spark/app/checkpoints/whale_alerts"
    )
    .trigger(processingTime="10 seconds")
    .start()
)

# ========================================================================
# KLINE PARQUET SINK
# ========================================================================
kline_query = (
    kline_df.writeStream
    .queryName("candlestick_history")
    .format("parquet")
    .outputMode("append")
    .option(
        "path",
        "/opt/spark/app/data/candlesticks"
    )
    .option(
        "checkpointLocation",
        "/opt/spark/app/checkpoints/candlesticks"
    )
    .trigger(processingTime="10 seconds")
    .start()
)

print("🚀 Whale detection pipeline running")
print("🚀 Candlestick pipeline running")

spark.streams.awaitAnyTermination()&lt;/LI-CODE&gt;&lt;P&gt;Thank you again for your help!&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 11:54:44 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160653#M54927</guid>
      <dc:creator>VikasM</dc:creator>
      <dc:date>2026-06-26T11:54:44Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160657#M54930</link>
      <description>&lt;P&gt;I have send reply for this message 5 times already I don't know what is going on here&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 11:59:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160657#M54930</guid>
      <dc:creator>VikasM</dc:creator>
      <dc:date>2026-06-26T11:59:37Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160660#M54933</link>
      <description>&lt;P&gt;&lt;SPAN&gt;The configuration is correct &amp;amp; mostly upstream is&amp;nbsp;the issue.&amp;nbsp;The Parquet sink can only write files when it receives data from the upstream.&amp;nbsp;&lt;/SPAN&gt;You can validate the 2 key configurations given below&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;&lt;SPAN class=""&gt;startingOffsets - latest&lt;/SPAN&gt;&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;- Code skips all historical Kafka data and it only processes messages that arrive after the stream starts. You can set it to&amp;nbsp;&lt;STRONG&gt;&lt;SPAN class=""&gt;earliest&amp;nbsp;&lt;/SPAN&gt;&lt;/STRONG&gt;&lt;SPAN class=""&gt;&amp;amp; validate&lt;/SPAN&gt;&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;WHALE_THRESHOLD_USD 50000&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;- Typical value can be 5 - 10. You can lower the threshold &amp;amp; validate temporarily and set it to 50000 later&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Even if Kafka has messages the pipeline filters out them because of the configurations.&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 12:12:10 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160660#M54933</guid>
      <dc:creator>balajij8</dc:creator>
      <dc:date>2026-06-26T12:12:10Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160662#M54934</link>
      <description>&lt;P&gt;Thank you, balajij8, for your suggestions. I really appreciate your time and guidance.&lt;/P&gt;&lt;P&gt;I'll try the different configurations you recommended and investigate further. Once I've tested them, I'll come back and share the results.&lt;/P&gt;&lt;P&gt;Thanks again for your help!&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;P.S.&amp;nbsp;"Did you see the messages I have already sent... I still don't see them above?"&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 12:33:26 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160662#M54934</guid>
      <dc:creator>VikasM</dc:creator>
      <dc:date>2026-06-26T12:33:26Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160671#M54935</link>
      <description>&lt;P&gt;Hello balajij8,&lt;/P&gt;&lt;P&gt;Before trying your suggestions, I decided to inspect the filesystem inside my Spark container once more.&lt;/P&gt;&lt;P&gt;I found something that has changed my understanding of the problem. There are no errors being reported by the streaming job, and the checkpoint and _spark_metadata directories are being updated continuously. I also found metadata entries that indicate Spark believes it has successfully written Parquet files.&lt;/P&gt;&lt;P&gt;However, I cannot find the actual part-*.snappy.parquet files in the output directory, even though the metadata references them. For example:&lt;/P&gt;&lt;P&gt;$ cd _spark_metadata&lt;BR /&gt;$ ls&lt;BR /&gt;0 1 2 3&lt;BR /&gt;$ cat 1&lt;BR /&gt;v1&lt;BR /&gt;{"path":"file:///opt/spark/app/data/whale_alerts/part-00000-ac552411-0fa6-47c8-b120-4dfcc9227b09-c000.snappy.parquet","size":1125,"isDir":false,"modificationTime":1782477948968,"blockReplication":1,"blockSize":33554432,"action":"add"}&lt;/P&gt;&lt;P&gt;But when I run:&lt;/P&gt;&lt;P&gt;find /opt/spark/app/data -name "*.parquet"&lt;/P&gt;&lt;P&gt;no Parquet files are found, either inside the container or on my host machine. Only the _spark_metadata files exist.&lt;/P&gt;&lt;P&gt;Since the streaming job is processing records successfully and the metadata is being written, I'm now wondering whether this is related to the file sink, filesystem, or Docker volume configuration rather than the upstream pipeline.&lt;/P&gt;&lt;P&gt;Before I start changing the Kafka configuration or thresholds, do you have any thoughts on why Spark would generate metadata entries without the corresponding Parquet files?&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 13:18:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160671#M54935</guid>
      <dc:creator>VikasM</dc:creator>
      <dc:date>2026-06-26T13:18:25Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark AnalysisException: Ambiguous reference to field t when parsing nested JSON</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160675#M54936</link>
      <description>&lt;P&gt;Spark Structured Streaming writes to file sinks and generally it uses a phased commit by&amp;nbsp;writing temporary files&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;to the output directory followed by w&lt;/SPAN&gt;riting metadata&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;with references and a final&amp;nbsp;&lt;/SPAN&gt;commit&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;by moving/renaming temp files to final names. &lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;You can verify the Docker side volume mount misconfigurations as some&lt;SPAN&gt;&amp;nbsp;docker configurations use temporary filesystems that get cleaned up or a background process removes the files. The files are written but immediately deleted.&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&lt;SPAN&gt;You can also verify that&amp;nbsp;&lt;SPAN class=""&gt;/opt/spark/app/data&lt;/SPAN&gt;&amp;nbsp;is actually mounted to the host &amp;amp; ensure that the permissions&amp;nbsp; of&amp;nbsp;&lt;SPAN class=""&gt;_spark_metadata&lt;/SPAN&gt;&amp;nbsp;directories and the other directories remain the same - read/write for Spark to perform all operations seamlessly. &lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&lt;SPAN&gt;You can change the code to write data to a path that has read/write access for Spark to perform all operations &amp;amp; validate &amp;amp; confirm.&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 26 Jun 2026 13:37:55 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-analysisexception-ambiguous-reference-to-field-t-when/m-p/160675#M54936</guid>
      <dc:creator>balajij8</dc:creator>
      <dc:date>2026-06-26T13:37:55Z</dc:date>
    </item>
  </channel>
</rss>

