mtajmouati
Contributor

Hi ! 

 

Given the issues you're facing with schema inference and long processing times, it might be helpful to preprocess the JSON files to ensure they are properly formatted and manageable. Here's a step-by-step approach to split the JSON files into smaller chunks and then use Databricks Auto Loader to read them.

Step 1: Split the JSON Files

You can use a Python script to split the JSON files into smaller chunks. Here's an example of how you can do this:

import json

def split_json_file(input_path, output_dir, chunk_size=1):
    with open(input_path, 'r') as file:
        data = json.load(file)
    
    for i in range(0, len(data), chunk_size):
        chunk = data[i:i + chunk_size]
        output_path = f"{output_dir}/chunk_{i//chunk_size}.json"
        with open(output_path, 'w') as chunk_file:
            json.dump(chunk, chunk_file, indent=4)

# Example usage
input_path = "/dbfs/mnt/abc/Testing/input.json"
output_dir = "/dbfs/mnt/abc/Testing/split"
split_json_file(input_path, output_dir, chunk_size=1)

 

Step 2: Use Databricks Auto Loader to Read the Split JSON Files

Now, you can use Databricks Auto Loader to read the split JSON files into a DataFrame.

  1. Ensure the directory structure is correct: The split JSON files should be located in a directory that Auto Loader can access.

  2. Update the path in your Auto Loader code: Point to the directory where the split JSON files are stored.

Example Code to Read Split JSON Files

Here's the updated code to read the split JSON files using Auto Loader:

from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
from pyspark.sql import SparkSession

# Define the schema
schema = StructType([
    StructField("device_id", StringType(), True),
    StructField("documentTime", TimestampType(), True),
    StructField("radonShortTermAvg", DoubleType(), True),
    StructField("temp", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("co2", DoubleType(), True),
    StructField("voc", DoubleType(), True),
    StructField("pressure", DoubleType(), True),
    StructField("light", DoubleType(), True)
])

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Auto Loader Example") \
    .getOrCreate()

# Define paths
path = '/mnt/abc/Testing/split/'  # Update to the directory with split JSON files
checkpoint_path = '/mnt/deskoccupancy-historical/Testing/Autoloader/'
table_name = 'bronze.occupancy_table_bronze'

# Read stream using Auto Loader
df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", checkpoint_path) \
    .option("cloudFiles.includeExistingFiles", "true") \
    .option("encoding", "utf-8") \
    .option("multiline", "true") \
    .schema(schema) \
    .load(path)

# Write stream to Delta Lake
df.writeStream \
    .format("delta") \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", checkpoint_path) \
    .start(table_name) \
    .awaitTermination()

 M.T

Best regards,
Mehdi Tajmouati
 mehdi.tajmouati@wytasoft.com
 06 68 23 18 42
 www.wytasoft.com