Thank you for the detailed explanation. It helped me understand the common situations that can trigger AMBIGUOUS_REFERENCE_TO_FIELDS.

Based on your suggestions, I reviewed my code again, but I'm still confused because I don't believe any of those cases apply here.

Specifically:

  • I'm not using .select("json.*") or .select("*").
  • I'm not expanding k.*.
  • I'm not performing any joins or unions before this error occurs.
  • The exception is raised while parsing the trade stream, before any processing of the kline stream begins.

The parsing code is simply:

trade_raw_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BROKER)
    .option("subscribe", TRADE_STREAM_NAME)
    .option("startingOffsets", "latest")
    .load()
)

# ========================================================================
# TRADE PARSING
# ========================================================================
parsed_trade_df = (
    trade_raw_df
    .select(
        from_json(
            col("value").cast("string"),
            trade_schema
        ).alias("json")
    )
    .filter(col("json").isNotNull())
    .select(
        col("json.e").alias("event_type"),
        col("json.s").alias("symbol"),
        col("json.t").alias("trade_id"),
        col("json.p").cast(DecimalType(18, 2)).alias("price"),
        col("json.q").cast(DecimalType(18, 6)).alias("quantity"),
        col("json.T").alias("trade_time_ms"),
        col("json.m").alias("is_buyer_maker")
    )
    .filter(col("event_type") == "trade")
    .filter(col("trade_time_ms").isNotNull())
    .filter(col("price").isNotNull())
    .filter(col("quantity").isNotNull())
)
parsed_trade_df.printSchema()
# ========================================================================
# TRADE TRANSFORMATION
# ========================================================================
trade_df = (
    parsed_trade_df
    .withColumn(
        "event_time",
        (col("trade_time_ms") / 1000).cast("timestamp")
    )
    .withColumn(
        "total_value_usd",
        col("price") * col("quantity")
    )
)


The traceback points to this .select(...) call, which is why I'm struggling to understand where Spark is finding two fields named t.

Would you mind taking a look at this parsing code? If it doesn't immediately stand out, I'm also happy to share the rest of the streaming script if that would help identify where the duplicate field might be coming from.

Thank you again for your guidance.