cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Read just the new file ???

William_Scardua
Valued Contributor

Hi guys,

How can I read just the new file in a batch process ?

Can you help me ? pleas

Thank you

1 ACCEPTED SOLUTION

Accepted Solutions

My apologies, I read it a little incorrect originally.

For your use case I would use COPY INTO which will only load the files you have not processed yet. You could use structured streaming to do this or the Databricks AutoLoader but those would be a little more complex.

For structured streaming you can use a ".trigger(once=True)" to use the streaming API as a batch process. You would use the checkpoint location on the write to track which files have been processed.

With AutoLoader you can use the "File Listing" option to identify which files have been used last. You will still want to use the .trigger(once=True) argument here as well.

Here are examples below on how to use the COPY INTO command:

# copy into delta by providing a file location
 
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
FROM (
  SELECT _c0::bigint key, _c1::int index, _c2 textData
  FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
)
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
 
# copy into delta by providing a table but must be an existing delta table so you create it first
 
CREATE TABLE target as 
(
 _c0 long, 
_c1 integer, 
_c2 string
)
USING DELTA
 
COPY INTO target_table
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'

View solution in original post

5 REPLIES 5

Ryan_Chynoweth
Honored Contributor III

What type of file? Is the file stored in a storage account?

Typically, you would read and write data with something like the following code:

# read a parquet file
df = spark.read.format("parquet").load("/path/to/file")
 
# write the data as a file
df.write.format("delta").save("/path/to/delta/table")
 
# write the data as a managed table
df.write.format("delta").saveAsTable("table_name")
 
 

Please reference this documentation for some more information.

Thank you for you feedback @Ryan Chynoweth​ 

For example, imagine that situation:

time1- I have some CSV files landing in my hdfs directory (landing/file1.csv, landing/file2.csv)

time2- My batch PySpark read the hdfs landing directory and write in hdfs bronze directory (bronze/);

time3- New CSV files arrive in hdfs landing directory (landing/file3.csv, landing/file4.csv)

time4- In this point the batch PySpark need to read only are new files (landing/file3.csv, landing/file4.csv) to append to the bonze hdfs directory (bronze/)

In na stream (WriteStream) have the 'checkpointLocation' option, but in na batch ? I need to developer a python control for this situation ?

Can you understand ?

tsk

My apologies, I read it a little incorrect originally.

For your use case I would use COPY INTO which will only load the files you have not processed yet. You could use structured streaming to do this or the Databricks AutoLoader but those would be a little more complex.

For structured streaming you can use a ".trigger(once=True)" to use the streaming API as a batch process. You would use the checkpoint location on the write to track which files have been processed.

With AutoLoader you can use the "File Listing" option to identify which files have been used last. You will still want to use the .trigger(once=True) argument here as well.

Here are examples below on how to use the COPY INTO command:

# copy into delta by providing a file location
 
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
FROM (
  SELECT _c0::bigint key, _c1::int index, _c2 textData
  FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
)
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
 
# copy into delta by providing a table but must be an existing delta table so you create it first
 
CREATE TABLE target as 
(
 _c0 long, 
_c1 integer, 
_c2 string
)
USING DELTA
 
COPY INTO target_table
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'

wowwwww that's right @Ryan Chynoweth​ , I can use 'once=True' in streaming API 😄

Thank you very much man

Happy to help!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.