cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader error "Failed to infer schema for format json from existing files in input"

hpant
New Contributor III

I have two json files in one of the location in Azure gen 2 storage e.g. '/mnt/abc/Testing/'. When I trying to read the files using autoloader I am getting this error: "Failed to infer schema for format json from existing files in input path /mnt/abc/Testing/. Please ensure you configured the options properly or explicitly specify the schema."

5 REPLIES 5

hpant
New Contributor III

Hey @Retired_mod ,

Thanks for your response. I have tried both with without schema and with schema, it is not working.

1. Without Schema

ss1.PNG

2. With Schema: It is taking forever to read the data even with two rows of data. (It kept running for 1 hour and I interrupted it after).

ss2.PNG

For your reference, I have attached sample json file:

ss3.PNG

hpant
New Contributor III

Hey @Kaniz_Fatma ,

Any comment on the above message?

Thanks

mtajmouati
Contributor

Hi ! 

 

Given the issues you're facing with schema inference and long processing times, it might be helpful to preprocess the JSON files to ensure they are properly formatted and manageable. Here's a step-by-step approach to split the JSON files into smaller chunks and then use Databricks Auto Loader to read them.

Step 1: Split the JSON Files

You can use a Python script to split the JSON files into smaller chunks. Here's an example of how you can do this:

import json

def split_json_file(input_path, output_dir, chunk_size=1):
    with open(input_path, 'r') as file:
        data = json.load(file)
    
    for i in range(0, len(data), chunk_size):
        chunk = data[i:i + chunk_size]
        output_path = f"{output_dir}/chunk_{i//chunk_size}.json"
        with open(output_path, 'w') as chunk_file:
            json.dump(chunk, chunk_file, indent=4)

# Example usage
input_path = "/dbfs/mnt/abc/Testing/input.json"
output_dir = "/dbfs/mnt/abc/Testing/split"
split_json_file(input_path, output_dir, chunk_size=1)

 

Step 2: Use Databricks Auto Loader to Read the Split JSON Files

Now, you can use Databricks Auto Loader to read the split JSON files into a DataFrame.

  1. Ensure the directory structure is correct: The split JSON files should be located in a directory that Auto Loader can access.

  2. Update the path in your Auto Loader code: Point to the directory where the split JSON files are stored.

Example Code to Read Split JSON Files

Here's the updated code to read the split JSON files using Auto Loader:

from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
from pyspark.sql import SparkSession

# Define the schema
schema = StructType([
    StructField("device_id", StringType(), True),
    StructField("documentTime", TimestampType(), True),
    StructField("radonShortTermAvg", DoubleType(), True),
    StructField("temp", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("co2", DoubleType(), True),
    StructField("voc", DoubleType(), True),
    StructField("pressure", DoubleType(), True),
    StructField("light", DoubleType(), True)
])

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Auto Loader Example") \
    .getOrCreate()

# Define paths
path = '/mnt/abc/Testing/split/'  # Update to the directory with split JSON files
checkpoint_path = '/mnt/deskoccupancy-historical/Testing/Autoloader/'
table_name = 'bronze.occupancy_table_bronze'

# Read stream using Auto Loader
df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", checkpoint_path) \
    .option("cloudFiles.includeExistingFiles", "true") \
    .option("encoding", "utf-8") \
    .option("multiline", "true") \
    .schema(schema) \
    .load(path)

# Write stream to Delta Lake
df.writeStream \
    .format("delta") \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", checkpoint_path) \
    .start(table_name) \
    .awaitTermination()

 M.T

hpant
New Contributor III

Hi @mtajmouati ,

Thanks for your response. Your solution might work, but the problem is that we are receiving data every hour, and Autoloader can handle new data with its checkpoint system. However, if we keep the function to split the JSON file, we need to add a mechanism that only splits files which have not been read before, which would defeat the purpose of the Autoloader checkpoint system. Is there any way to achieve this using only Autoloader?

Thanks,

Himanshu

holly
Databricks Employee
Databricks Employee

Hi @hpant would you consider testing the new VARIANT type for your JSON data? I appreciate it will require rewriting the next step in your pipeline, but should be more robust wrt errors. 

Disclaimer: I haven't personally tested variant with Autoloader. It should speed up some reads, but I don't think it yet creates file statistics.

Alternatively, you can partially define a schema and put the remainder of the columns in a 'rescued' column, but it is a hassle splitting it out afterwards.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group