cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

DLT cloudfiles trigger interval not working

elifa
New Contributor II

I have the following streaming table definition using cloudfiles format and pipelines.trigger.interval setting to reduce file discovery costs but the query is triggering every 12 seconds instead of every 5 minutes.

Is there another configuration I am missing or DLT cloudfiles does not work with that setting?

@dlt.table
def s3_data(        
    spark_conf={"pipelines.trigger.interval" : "5 minutes"},
    table_properties={
        "quality": "bronze",
        "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
    }
):
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("s3://my-bucket/")
        .withColumn("filePath", input_file_name())
    )

 

3 REPLIES 3

Timothydickers
New Contributor II

@elifa wrote:

I have the following streaming table definition using cloudfiles format and pipelines.trigger.interval setting to reduce file discovery costs but the query is triggering every 12 seconds instead of every 5 minutes.

Is there another configuration I am missing or DLT cloudfiles does not work with that setting?  Sheetz Listens

 

@dlt.table
def s3_data(        
    spark_conf={"pipelines.trigger.interval" : "5 minutes"},
    table_properties={
        "quality": "bronze",
        "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
    }
):
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("s3://my-bucket/")
        .withColumn("filePath", input_file_name())
    )

 

 


Hello,

The pipelines.trigger.interval setting is designed to control the discovery interval for new files in the input path when using Delta Lake Time Travel with CloudFiles as the streaming source in Databricks. However, there seems to be an issue with the trigger interval not being honored as expected.

First, verify that you are using the correct syntax for the pipelines.trigger.interval setting. The correct format is "5m", representing 5 minutes, instead of "5 minutes". Update your spark_conf to set the trigger interval as follows:

spark_conf={"pipelines.trigger.interval": "5m"}

If the issue persists, consider checking for any potential limitations or updates in the Databricks version you are using. It is possible that there might be a bug or a compatibility issue in the version you have installed.

Another possible approach is to check the Databricks documentation or forums to see if there are any known issues or workarounds related to the pipelines.trigger.interval setting when using CloudFiles as the streaming source.

In case you are still facing the problem, you may consider reaching out to Databricks support for assistance and further investigation. They can provide specific insights into the behavior of the pipelines.trigger.interval setting with CloudFiles in your Databricks environment and offer guidance on resolving the issue.

 

 

Tharun-Kumar
Honored Contributor II
Honored Contributor II

@elifa 

Could you check for this message in the log file? 

INFO EnzymePlanner: Planning for flow: s3_data

According to the config pipelines.trigger.interval, the planning should happen once in every 5 minutes. 

elifa
New Contributor II

The log below is how I can see that it is running every 12 seconds. I am using the same configuration on other tables that are not cloudfiles format and it works fine on them.

23/08/08 04:59:00 INFO MicroBatchExecution: Streaming query made progress: {
  "name" : "s3_data",
  "timestamp" : "2023-08-08T04:59:00.005Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
}

23/08/08 04:59:12 INFO MicroBatchExecution: Streaming query made progress: {
  "name" : "s3_data",
  "timestamp" : "2023-08-08T04:59:12.000Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
}

23/08/08 04:59:36 INFO MicroBatchExecution: Streaming query made progress: {
  "name" : "s3_data",
  "timestamp" : "2023-08-08T04:59:36.000Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
}

23/08/08 04:59:48 INFO MicroBatchExecution: Streaming query made progress: {
  "name" : "s3_data",
  "timestamp" : "2023-08-08T04:59:48.002Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
}

 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.