cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to ingest files from volume using autoloader

pshuk
New Contributor III

I am doing a test run.  I am uploading files to a volume and then using autoloader to ingesgt files and creating a table. I am getting this error message:

-----------------------------------------------------------

com.databricks.sql.cloudfiles.errors.CloudFilesIllegalStateException: The container in the file event `{"backfill":{"bucket":"root@dbstoragepdarecwn6h6go","key":"7019658555662308/FileStore/LiveDataUpload/wgs_hpo_test/2127020.HPO.txt","size":77,"eventTime":1703107647000}}` is different from expected by the source: `unitycatalog@bgdatabricksstoragev2`.

-------------------------------------

Here is the code to ingest the file from the specified location.

-------------------------------------------------------------------------------------------------------------------

import dlt

from pyspark.sql.functions import col, current_timestamp, split

# Define variables used in code below

file_path = "/Volumes/bgem_dev/wgs_live/hpo/"  

username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]

table_name = f"bgem_dev.wgs_live.hpo_test"

checkpoint_path = f"/tmp/{username}/_checkpoint/Live"

# Clear out data from previous demo execution

#spark.sql(f"DROP TABLE IF EXISTS {table_name}")

#dbutils.fs.rm(checkpoint_path, True)

(spark.readStream

  .format("cloudFiles")

  .option("cloudFiles.format", "text")

  .load(file_path)

  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))

   .writeStream

  .option("checkpointLocation", checkpoint_path)

  .trigger(availableNow=True)

  .toTable(table_name)

  )

1 REPLY 1

Wojciech_BUK
Valued Contributor III

Hey, i think you are mixing DLT syntaxt with pyspark syntax:

  • In DLT you should use:
    CREATE OR REFRESH STREAMING TABLE
      <table-name>
    AS SELECT
      *
    FROM
      STREAM read_files(
        '<path-to-source-data>',
        format => '<file-format>'
      )
    or in Python
    @dlt.table(table_properties={'quality': 'bronze'})
    def <table-name>():
      return (
         spark.readStream.format('cloudFiles')
         .option('cloudFiles.format', '<file-format>')
         .load(f'{<path-to-source-data>}')
     )
    You don't specified checkpoint for DLT in notebook
    checkpoints are stored under the storage location specified in the DLT Pipeline settings

  •  in PySaprk it is ok how you wrote it.

    I would avoid dumping your checkpoint to DBFS, create path in your storage and save it there.

Please let me know if that helps or please clarify if you are doing DLT or PySpark 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group