cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader not ingesting all file data into Delta Table from Azure Blob Container

KristiLogos
New Contributor III
I have done the following, ie. crate a Delta Table where I plan to load the Azure Blob Container files that are .json.gz files:
 
df = spark.read.option("multiline", "true").json(f"{container_location}/*.json.gz")
 
 
DeltaTable.create(spark) \
    .addColumns(df.schema) \
    .property("delta.minReaderVersion", "2") \
    .property("delta.minWriterVersion", "5") \
    .property("delta.columnMapping.mode", "name") \
    .tableName('tablename') \
    .execute()
 
Then I set up the autloader:
df_autoloader = (spark.readStream
                 .format("cloudFiles")
                 .option("cloudFiles.resourceGroup", "resourcename")
                 .option("cloudFiles.subscriptionId", "12345")
                 .option("cloudFiles.tenantId", "12345")
                 .option("cloudFiles.clientId", "12345")
                 .option("cloudFiles.clientSecret", "12345")
                 .option("cloudFiles.format", "json")
                 .option("multiline", "true")  
                 .option("cloudFiles.useNotifications", "true")  
                 .schema(schema)  
                 .load(AMP_LOC)  # path to  Blob
)
 
(df_autoloader.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_dir)  
    .table("tablename")
)
 
I see things happenign in the cell but when I go to query the table I only see 200 rows of data, when there should be millions.
2 REPLIES 2

filipniziol
Contributor

Hi @KristiLogos ,

Try first to add .trigger(availableNow=True). This ensures all the data is being processed.

Without the option, as per documentation, it will run the query as fast as possible, which is equivalent to setting the trigger to processingTime='0 seconds'.

When you're running the streaming query in a notebook where the cell execution might terminate before all data is processed, the query may not have enough time to ingest all your files. This could result in only a fraction of your data (e.g., 200 rows) being written to your Delta table.

 

df_autoloader.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_dir)  
    .trigger(availableNow=True)
    .table("tablename")

 

Check this setting and let us know if it works. 

gchandra
Databricks Employee
Databricks Employee

If it's streaming data, space it out with 10 seconds trigger

.trigger(processingTime="10 seconds")

 

Do all the JSON files have the same schema? As your table creation is dynamic (df.schema), if all JSON doesn't have the same schema they may be skipped. 



~

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group