Dan_Z
Databricks Employee
Databricks Employee

So for instance:

from pyspark.sql import Row
from pyspark.sql.types import *
 
file = [Row(file="/dbfs/tmp/comm_q_json.json")]
df = spark.createDataFrame(file, ("file: String"))
 
def read_process_json(iterator):
  import pandas as pd
  
  def process_from_path(path):
    rawJSON = pd.io.json.read_json(path)
    return pd.json_normalize(rawJSON['value'])
  
  for pdf in iterator:
    DFseries = pdf["file"].apply(process_from_path).tolist()
    yield pd.concat(DFseries)
    
outSchema = StructType([
  StructField("COL1",StringType(),True),
  StructField("COL2",StringType(),True),
  StructField("COL3",StringType(),True),
  StructField("COL4",StringType(),True),
  StructField("COL5",StringType(),True),
  StructField("COL6",StringType(),True),
  StructField("COL8",StringType(),True),
  StructField("COL9",StringType(),True)
])
 
display(df.mapInPandas(read_process_json, schema=outSchema))

Here we rely on the pandas API to do the JSON wrangling. I'm not 100% sure it would work, but who knows. You might have to tweak some configs if the results are too large to serialize.

View solution in original post