Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-20-2021 10:46 AM
So for instance:
from pyspark.sql import Row
from pyspark.sql.types import *
file = [Row(file="/dbfs/tmp/comm_q_json.json")]
df = spark.createDataFrame(file, ("file: String"))
def read_process_json(iterator):
import pandas as pd
def process_from_path(path):
rawJSON = pd.io.json.read_json(path)
return pd.json_normalize(rawJSON['value'])
for pdf in iterator:
DFseries = pdf["file"].apply(process_from_path).tolist()
yield pd.concat(DFseries)
outSchema = StructType([
StructField("COL1",StringType(),True),
StructField("COL2",StringType(),True),
StructField("COL3",StringType(),True),
StructField("COL4",StringType(),True),
StructField("COL5",StringType(),True),
StructField("COL6",StringType(),True),
StructField("COL8",StringType(),True),
StructField("COL9",StringType(),True)
])
display(df.mapInPandas(read_process_json, schema=outSchema))Here we rely on the pandas API to do the JSON wrangling. I'm not 100% sure it would work, but who knows. You might have to tweak some configs if the results are too large to serialize.