Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
yesterday
Hi Vikas, You can resolve the ambiguity by setting the schema upfront and it's a good approach
# Rename fields in schema to avoid differences
trade_schema = StructType([
StructField("e", StringType(), True),
StructField("s", StringType(), True),
StructField("trade_id", LongType(), True), # Renamed from "t"
StructField("p", StringType(), True),
StructField("q", StringType(), True),
StructField("trade_time_ms", LongType(), True), # Renamed from "T"
StructField("m", BooleanType(), True)
])
# The JSON keys "t" and "T" map to schema fields "trade_id" and "trade_time_ms"
parsed_trade_df = (
trade_raw_df
.select(from_json(col("value").cast("string"), trade_schema).alias("json"))
.filter(col("json").isNotNull())
.select(
col("json.e").alias("event_type"),
col("json.s").alias("symbol"),
col("json.trade_id").alias("trade_id"),
col("json.p").cast(DecimalType(18, 2)).alias("price"),
col("json.q").cast(DecimalType(18, 6)).alias("quantity"),
col("json.trade_time_ms").alias("trade_time_ms"),
col("json.m").alias("is_buyer_maker")
)
.filter(col("event_type") == "trade")
.filter(col("trade_time_ms").isNotNull())
.filter(col("price").isNotNull())
.filter(col("quantity").isNotNull())
)