cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

how to transform json-stat 2 filte to SparkDataFrame ? how to keep order on MapType structure ?

NTRT
New Contributor

Hi,

I am using different json files of type json-stat2.  These kind of json file is quite common used in national statistisc bureau. Its multi dimensional with multy arrays. Using python environment kan we use pyjstat package to easily  transform json to a dataFrame. I tried pyjstat on databricks and it didnt work as expected since after a while cluster kept failing (most probably due to performance bottlenecks). 

So i tried to create a script that do it self. 

Here is my code:

file_path = "/mnt/makro/bronze/json_ssb/13472_20240514.json"
file_path = "/mnt/makro/bronze/json_ssb/13472_20240514.json"

# Define the schema to match the JSON structure
schema = StructType([
    StructField("version", StringType(), True),
    StructField("class", StringType(), True),
    StructField("label", StringType(), True),
    StructField("source", StringType(), True),
    StructField("updated", StringType(), True),
    StructField("note", ArrayType(StringType()), True),
    StructField("role", StructType([
        StructField("time", ArrayType(StringType()), True),
        StructField("geo", ArrayType(StringType()), True),
        StructField("metric", ArrayType(StringType()), True),
    ]), True),
    StructField("id", ArrayType(StringType()), True),
    StructField("size", ArrayType(IntegerType()), True),
    StructField("dimension", StructType([
        StructField("Region", StructType([
            StructField("label", StringType(), True),
            StructField("note", ArrayType(StringType()), True),
            StructField("category", StructType([
                StructField("index", MapType(StringType(), IntegerType()), True),
                StructField("label", MapType(StringType(), StringType()), True),
                StructField("note", MapType(StringType(), ArrayType(StringType())), True),
            ]), True),
            StructField("extension", StructType([
                StructField("elimination", StringType(), True),
                StructField("show", StringType(), True),
            ]), True),
            StructField("link", StructType([
                StructField("describedby", ArrayType(StructType([
                    StructField("extension", StructType([
                        StructField("Region", StringType(), True)
                    ]), True)
                ])), True),
            ]), True),
        ]), True),
        StructField("Sektor", StructType([
            StructField("label", StringType(), True),
            StructField("category", StructType([
                StructField("index", MapType(StringType(), IntegerType()), True),
                StructField("label", MapType(StringType(), StringType()), True)
            ]), True),
            StructField("extension", StructType([
                StructField("elimination", StringType(), True),
                StructField("eliminationValueCode", StringType(), True),
                StructField("show", StringType(), True),
            ]), True),
            StructField("link", StructType([
                StructField("describedby", ArrayType(StructType([
                    StructField("extension", StructType([
                        StructField("Sektor", StringType(), True)
                    ]), True)
                ])), True),
            ]), True),
        ]), True),
        StructField("ContentsCode", StructType([
            StructField("label", StringType(), True),
            StructField("category", StructType([
                StructField("index", MapType(StringType(), IntegerType()), True),
                StructField("label", MapType(StringType(), StringType()), True),
                StructField("unit", MapType(StringType(), StructType([
                    StructField("base", StringType(), True),
                    StructField("decimals", IntegerType(), True)
                ])), True)
            ]), True),
            StructField("extension", StructType([
                StructField("elimination", StringType(), True),
                StructField("refperiod", MapType(StringType(), StringType()), True),
                StructField("show", StringType(), True),
            ]), True),
            StructField("link", StructType([
                StructField("describedby", ArrayType(StructType([
                    StructField("extension", StructType([
                        StructField("ContentsCode", StringType(), True)
                    ]), True)
                ])), True),
            ]), True),
        ]), True),
        StructField("Tid", StructType([
            StructField("label", StringType(), True),
            StructField("category", StructType([
                StructField("index", MapType(StringType(), IntegerType()), True),
                StructField("label", MapType(StringType(), StringType()), True)
            ]), True),
            StructField("extension", StructType([
                StructField("elimination", StringType(), True),
                StructField("show", StringType(), True),
            ]), True),
        ]), True),
    ]), True),
    StructField("extension", StructType([
        StructField("px", StructType([
            StructField("infofile", StringType(), True),
            StructField("tableid", StringType(), True),
            StructField("decimals", IntegerType(), True),
            StructField("official-statistics", StringType(), True),
            StructField("aggregallowed", StringType(), True),
            StructField("language", StringType(), True),
            StructField("matrix", StringType(), True),
            StructField("subject-code", StringType(), True)
        ]), True),
        StructField("contact", ArrayType(StructType([
            StructField("name", StringType(), True),
            StructField("phone", StringType(), True),
            StructField("mail", StringType(), True),
            StructField("raw", StringType(), True)
        ])), True)
    ]), True),
    StructField("value", ArrayType(IntegerType()), True)
])

#read json
df = spark.read.option("multiline", "true").schema(schema).json(file_path)

dimensions = df.select("dimension.*").collect()[0].asDict()
updated= df.select("updated")
source = df.select("source")

def create_dimension_df(dim_name, dim_data):
    categories = dim_data['category']['label'].asDict()
    data = [(idx, k, v) for idx, (k, v) in enumerate(categories.items())]
    # data = [(k, v) for k, v in categories.items()]
    # return spark.createDataFrame(data, schema=['code', dim_name])
    return spark.createDataFrame(data, schema=['index', 'code', dim_name])

region_df = create_dimension_df("Region", dimensions["Region"])
sektor_df = create_dimension_df("sektor", dimensions["Sektor"])
contents_code_df = create_dimension_df("statistikkvariabel", dimensions["ContentsCode"])
time_df = create_dimension_df("år", dimensions["Tid"])
values = df.select("value").collect()[0].value
sizes = df.select("size").collect()[0].size
value_df = spark.createDataFrame([(i, values[i]) for i in range(len(values))], schema=['index', 'value'])

import itertools
indices = list(itertools.product(range(sizes[0]), range(sizes[1]), range(sizes[2]), range(sizes[3])))
index_df = spark.createDataFrame(indices, schema=["Region_index", "sektor_index", "statistikkvariabel_index", "år_index"])
index_df = index_df.withColumn("index", col("Region_index") * sizes[1] * sizes[2] * sizes[3] + col("sektor_index") * sizes[2] * sizes[3] + col("statistikkvariabel_index") * sizes[3] + col("år_index"))
index_df = index_df.join(value_df, on="index").drop("index")

region_df = region_df.withColumnRenamed('index', 'Region_index')
sektor_df = sektor_df.withColumnRenamed('index', 'sektor_index')
contents_code_df = contents_code_df.withColumnRenamed('index', 'statistikkvariabel_index')
time_df = time_df.withColumnRenamed('index', 'år_index')

# Join with dimension DataFrames
final_df = index_df \
    .join(region_df, on="Region_index") \
    .join(sektor_df, on="sektor_index") \
    .join(contents_code_df, on="statistikkvariabel_index") \
    .join(time_df, on="år_index") \
    .select("Region", "sektor", "statistikkvariabel", "år", "value")

this code works but gives wrong data, since original order of MapType structure is not preserved. In this case using this logic i need to maintain order to get correct data. how can i dot it ? is there a better way to  transform json-stat 2 filte to SparkDataFrame. I would like to have a code that will adapt to the response we get from api, since column names can vary. 

 

If you want to try you can send a post request to:

url -> https://data.ssb.no/api/v0/no/table/13472/

body-> 

{"query":[{"code":"Region","selection":{"filter":"agg:KommSummer","values":["K-1875","K-5501","K-5503","K-5510","K-5512","K-5514","K-5516","K-5518","K-5520","K-5522","K-5524","K-5526","K-5528","K-5530","K-5532","K-5534","K-5536","K-5538","K-5540","K-5542","K-5544","K-5546","K-5601","K-5603","K-5605","K-5607","K-5610","K-5612","K-5614","K-5616","K-5618","K-5620","K-5622","K-5624","K-5626","K-5628","K-5630","K-5632","K-5634","K-5636","K-21-22","K-23","K-Rest"]}},{"code":"Sektor","selection":{"filter":"item","values":["ALLE"]}},{"code":"ContentsCode","selection":{"filter":"item","values":["SysselEtterArbste"]}}],"response":{"format":"json-stat2"}}
 
thanks in advance.
Nuno
1 REPLY 1

-werners-
Esteemed Contributor III

MapType does not maintain order (json itself too).
Can you apply the ordering yourself afterwards?