cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Read just the new file ???

William_Scardua
Valued Contributor

Hi guys,

How can I read just the new file in a batch process ?

Can you help me ? pleas

Thank you

1 ACCEPTED SOLUTION

Accepted Solutions

My apologies, I read it a little incorrect originally.

For your use case I would use COPY INTO which will only load the files you have not processed yet. You could use structured streaming to do this or the Databricks AutoLoader but those would be a little more complex.

For structured streaming you can use a ".trigger(once=True)" to use the streaming API as a batch process. You would use the checkpoint location on the write to track which files have been processed.

With AutoLoader you can use the "File Listing" option to identify which files have been used last. You will still want to use the .trigger(once=True) argument here as well.

Here are examples below on how to use the COPY INTO command:

# copy into delta by providing a file location
 
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
FROM (
  SELECT _c0::bigint key, _c1::int index, _c2 textData
  FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
)
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
 
# copy into delta by providing a table but must be an existing delta table so you create it first
 
CREATE TABLE target as 
(
 _c0 long, 
_c1 integer, 
_c2 string
)
USING DELTA
 
COPY INTO target_table
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'

View solution in original post

5 REPLIES 5

Ryan_Chynoweth
Esteemed Contributor

What type of file? Is the file stored in a storage account?

Typically, you would read and write data with something like the following code:

# read a parquet file
df = spark.read.format("parquet").load("/path/to/file")
 
# write the data as a file
df.write.format("delta").save("/path/to/delta/table")
 
# write the data as a managed table
df.write.format("delta").saveAsTable("table_name")
 
 

Please reference this documentation for some more information.

Thank you for you feedback @Ryan Chynowethโ€‹ 

โ€‹

For example, imagine that situation:

โ€‹

time1- I have some CSV files landing in my hdfs directory (landing/file1.csv, landing/file2.csv)

time2- My batch PySpark read the hdfs landing directory and write in hdfs bronze directory (bronze/);

time3- New CSV files arrive in hdfs landing directory (landing/file3.csv, landing/file4.csv)

time4- In this point the batch PySpark need to read only are new files (landing/file3.csv, landing/file4.csv) to append to the bonze hdfs directory (bronze/)

โ€‹

In na stream (WriteStream) have the 'checkpointLocation' option, but in na batch ? I need to developer a python control for this situation ?

Can you understand ?

tsk

My apologies, I read it a little incorrect originally.

For your use case I would use COPY INTO which will only load the files you have not processed yet. You could use structured streaming to do this or the Databricks AutoLoader but those would be a little more complex.

For structured streaming you can use a ".trigger(once=True)" to use the streaming API as a batch process. You would use the checkpoint location on the write to track which files have been processed.

With AutoLoader you can use the "File Listing" option to identify which files have been used last. You will still want to use the .trigger(once=True) argument here as well.

Here are examples below on how to use the COPY INTO command:

# copy into delta by providing a file location
 
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
FROM (
  SELECT _c0::bigint key, _c1::int index, _c2 textData
  FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
)
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
 
# copy into delta by providing a table but must be an existing delta table so you create it first
 
CREATE TABLE target as 
(
 _c0 long, 
_c1 integer, 
_c2 string
)
USING DELTA
 
COPY INTO target_table
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'

wowwwww that's right @Ryan Chynowethโ€‹ , I can use 'once=True' in streaming API ๐Ÿ˜„

โ€‹

Thank you very much man

Happy to help!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group