04-05-2024 07:53 AM
I am facing an issue when using databricks, when I set a specific type in my schema and read a json, its values are fine, but after saving my df and loading again, the value is gone.
I have this sample code that shows this issue:
from pyspark.sql.types import StructType, StructField, StringType, MapType, DoubleType, ArrayType, LongType, BooleanType
import os
from pyspark.sql.functions import lit, current_date, date_sub, input_file_name, regexp_extract, split,expr, to_date, date_format,from_json, col
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.utils import AnalysisException
from datetime import datetime, timedelta
from delta.tables import *
import logging
import tempfile
import inspect
import collections
import json
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
mockschema = StructType([
StructField("bot", BooleanType(), True),
StructField("channel", StringType(), True),
StructField("chills", StringType(), True),
StructField("cookies", MapType(StringType(), StringType()), True),
StructField("geo", StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True),
StructField("region", StringType(), True),
# StructField("latitude", StringType(), True), # this works, user id is shown
# StructField("longitude", StringType(), True) # this works, user id is shown
StructField("latitude", DoubleType(), True), # this does not work, user id is null
StructField("longitude", DoubleType(), True) # this does not work, user id is null
]), True),
StructField("ip", StringType(), True),
StructField("timestamp", LongType(), True),
StructField("ua", StringType(), True),
StructField("url", StringType(), True),
StructField("user_id", StringType(), True),
StructField("vibes", StringType(), True)
])
mock_data = {
"bot": False,
"channel": "YES",
"chills": "VERY_COLD",
"cookies": {
"1111": "aa",
"2222": "bb",
"3333": "cc",
"4444": "dd",
"5555": "ee",
"6666": "ff",
},
"geo": {
"city": "Such City",
"country": "AA",
"region": "WOWW",
"latitude": -1.924868,
"longitude": 25.424055
},
"ip": "123.45.678.90",
"timestamp": 1711711760169,
"ua": "SOME_BROWSER_USER_AGENT",
"url": "www.some.url.com",
"user_id": "123456789"
}
output_format = 'delta'
with tempfile.TemporaryDirectory() as d:
spark.createDataFrame(
[mock_data]
).write.mode("overwrite").format("json").save(d)
df = spark.read.schema(mockschema).json(d)
df = df.withColumn("date_partition", regexp_extract(input_file_name(), r'.*\/(\d{4}-\d{1,2}-\d{1,2})\/.*', 1))
df = df.withColumn("date_partition", date_format(to_date("date_partition", "yyyy-M-d"), "yyyy-MM-dd")) #Fortmat with leading zeros for better sorting
df = df.withColumn("filename", expr("regexp_extract(input_file_name(), '[^/]+$', 0)"))
display(df[['user_id', 'filename']]) # df before saving, have user id
df_repartitioned = df.repartition("date_partition")
df_repartitioned.write.format(output_format).mode("overwrite").save(f"{d}/df")
logger.info(f"dumped df to temp folder")
df = spark.read.format(output_format).load(f"{d}/df")
display(df[['user_id', 'filename']]) # df after saving, missing user id when double type in geo
in mockschema, setting geo as doubletype makes so the saved df user_id becomes none, and setting this as string "solves" the issue.
I need to understand why this happens as I have some other codes with same issue of fields missing after saving, but couldn't track down why this schema causes an error. Does anyone ever faced something like that and can explain?
Thanks in advance ❤️
This "issue" is solved by using another schema types, but the issue being on "user_id" field the schema problem was in "geo", we have other schemas and couldnt track which schema interfere with another one.
I want first understand whats causing this behavior.
The value of user_id should be the same after saving than before, not having any loss of data
04-05-2024 09:51 AM
@Henrique_Lino , Where are you saving your df?
04-05-2024 09:54 AM
my original df is being saved in a google storage bucket, but behave the same as this one that uses temp dir, I use something like this:
output_path = "gs:/bucket/folder"
df.save(output_path)
then spark.read(output_path)
in post I used temp dir as they have the same results for me
04-05-2024 10:01 AM
Can you check if there is any issue on the storage side? I mean are you able to successfully store the file there? We should also check if the file is getting corrupted while writing.
04-05-2024 10:10 AM
Yes, the file does exists there, and downloading the output, xxx.snappy.parquet and reading using a parquet viewer, the data after loading with the viewer is also null. So the value is lost during the saving.
I also dont think the issue is the bucket itself or the path as using tempfile yields the same, changing the type of geo from double to string solves, but we couldnt understand why geo.longitude and geo.latitude types corrupts user_id field 😞
We're facing this same issue with more complex data structures, but this one should be "simpler" to use as example
04-05-2024 10:40 AM
Can you try to read the data without applying schema and check the datatype of longitude and latitude?
04-05-2024 10:50 AM - edited 04-05-2024 10:51 AM
Reading without schema struct will read lat/lng as string:
root
|-- bot: boolean (nullable = true)
|-- channel: string (nullable = true)
|-- chills: string (nullable = true)
|-- cookies: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- geo: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- country: string (nullable = true)
| |-- region: string (nullable = true)
| |-- latitude: float (nullable = true)
| |-- longitude: float (nullable = true)
|-- ip: string (nullable = true)
|-- timestamp: long (nullable = true)
|-- ua: string (nullable = true)
|-- url: string (nullable = true)
|-- user_id: string (nullable = true)
|-- vibes: string (nullable = true)
|-- date_partition: string (nullable = true)
|-- filename: string (nullable = true)
they are defined as:
"latitude": -1.924868, "longitude": 25.424055
Also reading without schema will not nullify the user_id field
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group