09-23-2021 08:21 AM
09-23-2021 11:03 AM
My apologies, I read it a little incorrect originally.
For your use case I would use COPY INTO which will only load the files you have not processed yet. You could use structured streaming to do this or the Databricks AutoLoader but those would be a little more complex.
For structured streaming you can use a ".trigger(once=True)" to use the streaming API as a batch process. You would use the checkpoint location on the write to track which files have been processed.
With AutoLoader you can use the "File Listing" option to identify which files have been used last. You will still want to use the .trigger(once=True) argument here as well.
Here are examples below on how to use the COPY INTO command:
# copy into delta by providing a file location
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
FROM (
SELECT _c0::bigint key, _c1::int index, _c2 textData
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
)
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
# copy into delta by providing a table but must be an existing delta table so you create it first
CREATE TABLE target as
(
_c0 long,
_c1 integer,
_c2 string
)
USING DELTA
COPY INTO target_table
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
09-23-2021 08:41 AM
What type of file? Is the file stored in a storage account?
Typically, you would read and write data with something like the following code:
# read a parquet file
df = spark.read.format("parquet").load("/path/to/file")
# write the data as a file
df.write.format("delta").save("/path/to/delta/table")
# write the data as a managed table
df.write.format("delta").saveAsTable("table_name")
Please reference this documentation for some more information.
09-23-2021 10:24 AM
Thank you for you feedback @Ryan Chynoweth
For example, imagine that situation:
time1- I have some CSV files landing in my hdfs directory (landing/file1.csv, landing/file2.csv)
time2- My batch PySpark read the hdfs landing directory and write in hdfs bronze directory (bronze/);
time3- New CSV files arrive in hdfs landing directory (landing/file3.csv, landing/file4.csv)
time4- In this point the batch PySpark need to read only are new files (landing/file3.csv, landing/file4.csv) to append to the bonze hdfs directory (bronze/)
In na stream (WriteStream) have the 'checkpointLocation' option, but in na batch ? I need to developer a python control for this situation ?
Can you understand ?
tsk
09-23-2021 11:03 AM
My apologies, I read it a little incorrect originally.
For your use case I would use COPY INTO which will only load the files you have not processed yet. You could use structured streaming to do this or the Databricks AutoLoader but those would be a little more complex.
For structured streaming you can use a ".trigger(once=True)" to use the streaming API as a batch process. You would use the checkpoint location on the write to track which files have been processed.
With AutoLoader you can use the "File Listing" option to identify which files have been used last. You will still want to use the .trigger(once=True) argument here as well.
Here are examples below on how to use the COPY INTO command:
# copy into delta by providing a file location
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
FROM (
SELECT _c0::bigint key, _c1::int index, _c2 textData
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
)
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
# copy into delta by providing a table but must be an existing delta table so you create it first
CREATE TABLE target as
(
_c0 long,
_c1 integer,
_c2 string
)
USING DELTA
COPY INTO target_table
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
09-23-2021 11:52 AM
wowwwww that's right @Ryan Chynoweth , I can use 'once=True' in streaming API 😄
Thank you very much man
09-27-2021 11:38 AM
Happy to help!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group