4 weeks ago
I am facing an issue when using databricks, when I set a specific type in my schema and read a json, its values are fine, but after saving my df and loading again, the value is gone.
I have this sample code that shows this issue:
from pyspark.sql.types import StructType, StructField, StringType, MapType, DoubleType, ArrayType, LongType, BooleanType
import os
from pyspark.sql.functions import lit, current_date, date_sub, input_file_name, regexp_extract, split,expr, to_date, date_format,from_json, col
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.utils import AnalysisException
from datetime import datetime, timedelta
from delta.tables import *
import logging
import tempfile
import inspect
import collections
import json
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
mockschema = StructType([
StructField("bot", BooleanType(), True),
StructField("channel", StringType(), True),
StructField("chills", StringType(), True),
StructField("cookies", MapType(StringType(), StringType()), True),
StructField("geo", StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True),
StructField("region", StringType(), True),
# StructField("latitude", StringType(), True), # this works, user id is shown
# StructField("longitude", StringType(), True) # this works, user id is shown
StructField("latitude", DoubleType(), True), # this does not work, user id is null
StructField("longitude", DoubleType(), True) # this does not work, user id is null
]), True),
StructField("ip", StringType(), True),
StructField("timestamp", LongType(), True),
StructField("ua", StringType(), True),
StructField("url", StringType(), True),
StructField("user_id", StringType(), True),
StructField("vibes", StringType(), True)
])
mock_data = {
"bot": False,
"channel": "YES",
"chills": "VERY_COLD",
"cookies": {
"1111": "aa",
"2222": "bb",
"3333": "cc",
"4444": "dd",
"5555": "ee",
"6666": "ff",
},
"geo": {
"city": "Such City",
"country": "AA",
"region": "WOWW",
"latitude": -1.924868,
"longitude": 25.424055
},
"ip": "123.45.678.90",
"timestamp": 1711711760169,
"ua": "SOME_BROWSER_USER_AGENT",
"url": "www.some.url.com",
"user_id": "123456789"
}
output_format = 'delta'
with tempfile.TemporaryDirectory() as d:
spark.createDataFrame(
[mock_data]
).write.mode("overwrite").format("json").save(d)
df = spark.read.schema(mockschema).json(d)
df = df.withColumn("date_partition", regexp_extract(input_file_name(), r'.*\/(\d{4}-\d{1,2}-\d{1,2})\/.*', 1))
df = df.withColumn("date_partition", date_format(to_date("date_partition", "yyyy-M-d"), "yyyy-MM-dd")) #Fortmat with leading zeros for better sorting
df = df.withColumn("filename", expr("regexp_extract(input_file_name(), '[^/]+$', 0)"))
display(df[['user_id', 'filename']]) # df before saving, have user id
df_repartitioned = df.repartition("date_partition")
df_repartitioned.write.format(output_format).mode("overwrite").save(f"{d}/df")
logger.info(f"dumped df to temp folder")
df = spark.read.format(output_format).load(f"{d}/df")
display(df[['user_id', 'filename']]) # df after saving, missing user id when double type in geo
in mockschema, setting geo as doubletype makes so the saved df user_id becomes none, and setting this as string "solves" the issue.
I need to understand why this happens as I have some other codes with same issue of fields missing after saving, but couldn't track down why this schema causes an error. Does anyone ever faced something like that and can explain?
Thanks in advance ❤️
This "issue" is solved by using another schema types, but the issue being on "user_id" field the schema problem was in "geo", we have other schemas and couldnt track which schema interfere with another one.
I want first understand whats causing this behavior.
The value of user_id should be the same after saving than before, not having any loss of data
4 weeks ago
@Henrique_Lino , Where are you saving your df?
4 weeks ago
my original df is being saved in a google storage bucket, but behave the same as this one that uses temp dir, I use something like this:
output_path = "gs:/bucket/folder"
df.save(output_path)
then spark.read(output_path)
in post I used temp dir as they have the same results for me
4 weeks ago
Can you check if there is any issue on the storage side? I mean are you able to successfully store the file there? We should also check if the file is getting corrupted while writing.
4 weeks ago
Yes, the file does exists there, and downloading the output, xxx.snappy.parquet and reading using a parquet viewer, the data after loading with the viewer is also null. So the value is lost during the saving.
I also dont think the issue is the bucket itself or the path as using tempfile yields the same, changing the type of geo from double to string solves, but we couldnt understand why geo.longitude and geo.latitude types corrupts user_id field 😞
We're facing this same issue with more complex data structures, but this one should be "simpler" to use as example
4 weeks ago
Can you try to read the data without applying schema and check the datatype of longitude and latitude?
4 weeks ago - last edited 4 weeks ago
Reading without schema struct will read lat/lng as string:
root
|-- bot: boolean (nullable = true)
|-- channel: string (nullable = true)
|-- chills: string (nullable = true)
|-- cookies: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- geo: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- country: string (nullable = true)
| |-- region: string (nullable = true)
| |-- latitude: float (nullable = true)
| |-- longitude: float (nullable = true)
|-- ip: string (nullable = true)
|-- timestamp: long (nullable = true)
|-- ua: string (nullable = true)
|-- url: string (nullable = true)
|-- user_id: string (nullable = true)
|-- vibes: string (nullable = true)
|-- date_partition: string (nullable = true)
|-- filename: string (nullable = true)
they are defined as:
"latitude": -1.924868, "longitude": 25.424055
Also reading without schema will not nullify the user_id field
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.