cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

ConcurrentAppendException in Feature Engineering write_table

zed
New Contributor III

I am using the Feature Engineering client when writing to a time series feature table. Then I have cried two data bricks jobs with the below code. I am running with different run_dates (e.g. '2016-01-07' and '2016-01-08'). When they run concurrently, one of the jobs throws: 

ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.

I think this is happening because of the merge operation that the feature engineering client is using. One solution that I have read is Avoid conflicts using partitioning and disjoint command conditions. However, I cannot partition the table because my table is a Time Series Feature table, which should not be partitioned according to this doc:

A time series feature table must have one timestamp key and cannot have any partition columns. The timestamp key column must be of TimestampType or DateType.

https://docs.databricks.com/en/machine-learning/feature-store/time-series.html#create-a-time-series-...

In addition, the code snippet solution described by using the below code cannot be used as well since the write_table handles the merge operation, and is abstracted. 

"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")

How can I write concurrently to a time series feature table then?

 

 

# Databricks notebook source
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, IntegerType, StringType

from delta.tables import DeltaTable

from databricks.feature_engineering import FeatureEngineeringClient

def get_inputs(spark, run_date: str) -> list[DataFrame]:
    return {
        "silver_taxi_trips": (
            spark.read.load(
                path="/databricks-datasets/nyctaxi-with-zipcodes/subsampled", 
                format="delta"
            )
            .filter(
                F.col("tpep_pickup_datetime").cast("date") <= run_date
            )
        ),
    }


def main(spark, run_date: str) -> None:
    inputs = get_inputs(spark, run_date)

    df_taxi_trips = inputs["silver_taxi_trips"]

    df_pickupzip_features = (
        df_taxi_trips.groupBy(
            "pickup_zip", F.hour("tpep_pickup_datetime").alias("hour_slot")
        )
        .agg(
            F.mean("fare_amount").alias("mean_fare_hourly_pickup_zip"),
            F.count("*").alias("count_trips_hourly_pickup_zip"),
        )
        .select(
            F.col("pickup_zip").alias("zip"),
            F.col("hour_slot"),
            F.col("mean_fare_hourly_pickup_zip").cast(FloatType()),
            F.col("count_trips_hourly_pickup_zip").cast(IntegerType()),
        )
    )

    df_pickupzip_features = df_pickupzip_features.withColumn('run_date', F.to_date(F.lit(run_date)))
    df_pickupzip_features = df_pickupzip_features.withColumn('writing_ts', F.current_timestamp())

    fe = FeatureEngineeringClient()
    fe.write_table(
        name="dbc_mlops_premium.taxi_example.trip_pickup_features",
        df=df_pickupzip_features
    )

 

 

 

 

the table was created with:

 

 

 

%sql
CREATE TABLE IF NOT EXISTS dbc_mlops_premium.taxi_example.trip_pickup_features(
  -- entity
  zip INT NOT NULL,
  hour_slot INT NOT NULL,
  -- features
  mean_fare_hourly_pickup_zip FLOAT,
  count_trips_hourly_pickup_zip INT,
  -- time metadata
  run_date DATE NOT NULL,
  writing_ts TIMESTAMP NOT NULL,
  CONSTRAINT trip_pickup_features_pk PRIMARY KEY (zip, hour_slot, run_date TIMESERIES)
)
COMMENT "Taxi pick up features";

 

 

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

VZLA
Databricks Employee
Databricks Employee

@zed Clustering by your date column can indeed help avoid the ConcurrentAppendException without incurring the strict partitioning constraints that a โ€œtime series feature tableโ€ normally disallows. Unlike partitioning, CLUSTER BY does not create physical partitions on disk for each date valueโ€”it simply organizes data files by that column, improving data skipping and reducing merge conflicts.

As for performance and the warning about โ€œsuboptimal point in time join performance,โ€ Databricks is suggesting that you use liquid clustering (an automatic layout-optimization feature). If your update frequency is high, implementing a periodic OPTIMIZE (or using liquid clustering if feasible) is usually a good idea to keep the data layout efficient for point-in-time lookups and reduce concurrency issues.

If clustering alone resolves your concurrency problems, and you have a periodic job or process that runs OPTIMIZE on this table, that should be sufficient. You may still see occasional conflicts, so still worth consider adding a lightweight retry mechanism around your writes.

View solution in original post

6 REPLIES 6

Walter_C
Databricks Employee
Databricks Employee

Modify your write_table operations to ensure they are as specific as possible about the data being written. This might involve adding more granular conditions to your data filtering and writing logic.

Here is an example adjustment to your code to handle concurrent writes more effectively:

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, IntegerType, StringType
from delta.tables import DeltaTable
from databricks.feature_engineering import FeatureEngineeringClient

def get_inputs(spark, run_date: str) -> list[DataFrame]:
    return {
        "silver_taxi_trips": (
            spark.read.load(
                path="/databricks-datasets/nyctaxi-with-zipcodes/subsampled", 
                format="delta"
            )
            .filter(
                F.col("tpep_pickup_datetime").cast("date") <= run_date
            )
        ),
    }

def main(spark, run_date: str) -> None:
    inputs = get_inputs(spark, run_date)
    df_taxi_trips = inputs["silver_taxi_trips"]

    df_pickupzip_features = (
        df_taxi_trips.groupBy(
            "pickup_zip", F.hour("tpep_pickup_datetime").alias("hour_slot")
        )
        .agg(
            F.mean("fare_amount").alias("mean_fare_hourly_pickup_zip"),
            F.count("*").alias("count_trips_hourly_pickup_zip"),
        )
        .select(
            F.col("pickup_zip").alias("zip"),
            F.col("hour_slot"),
            F.col("mean_fare_hourly_pickup_zip").cast(FloatType()),
            F.col("count_trips_hourly_pickup_zip").cast(IntegerType()),
        )
    )

    df_pickupzip_features = df_pickupzip_features.withColumn('run_date', F.to_date(F.lit(run_date)))
    df_pickupzip_features = df_pickupzip_features.withColumn('writing_ts', F.current_timestamp())

    fe = FeatureEngineeringClient()
    fe.write_table(
        name="dbc_mlops_premium.taxi_example.trip_pickup_features",
        df=df_pickupzip_features,
        mode="merge"
    )

# Ensure the table creation script is run beforehand
%sql
CREATE TABLE IF NOT EXISTS dbc_mlops_premium.taxi_example.trip_pickup_features(
  zip INT NOT NULL,
  hour_slot INT NOT NULL,
  mean_fare_hourly_pickup_zip FLOAT,
  count_trips_hourly_pickup_zip INT,
  run_date DATE NOT NULL,
  writing_ts TIMESTAMP NOT NULL,
  CONSTRAINT trip_pickup_features_pk PRIMARY KEY (zip, hour_slot, run_date TIMESERIES)
)
COMMENT "Taxi pick up features";

zed
New Contributor III

Hi Walter, 

You code snippet just adds the the mode argument to write_table, which is the default value, also used by my code. The error I get is when doing that merge.

fe.write_table(
    name="dbc_mlops_premium.taxi_example.trip_pickup_features",
    df=df_pickupzip_features,
    mode="merge" # newline in your code
)

 

zed
New Contributor III

Still no response on this thread...

VZLA
Databricks Employee
Databricks Employee

@zed I went through these document links, and believe you're on spot here with respect to the limitation and exclusive setups. I don't see any option other than implementing a retry logic or reducing the possibilities of such conflicts, something like:

 

import time
import random
from delta.exceptions import ConcurrentAppendException

retry = 5
while retry > 0:
    try:
        # Your update statement on the delta table
        break
    except ConcurrentAppendException as e:
        retry -= 1
        delay = random.randrange(0, 20)
        time.sleep(delay)
        print(f"{retry} retries left, added delay {delay} seconds")
else:
    raise Exception("Update failed after multiple retries")

 

Concurrency control is very complex to handle. I'm not fully sure, but given the capabilities of LiquidClustering, it could potentially help with reducing the likelihood of conflicts, although at the end everything narrows down to having two whichever conflicting operations running concurrently and reducing the chances for having resulting data inconsistencies, batching updates togheter is another option, but also considered as a mitigation, I don't think there's a golden solution for this.

In your case (make sure to review it first):

    fe = FeatureEngineeringClient()
    retries = 5
    while retries > 0:
        try:
            fe.write_table(
                name="dbc_mlops_premium.taxi_example.trip_pickup_features",
                df=df_pickupzip_features,
                mode="merge"
            )
            break
        except Exception as e:
            retries -= 1
            if retries == 0:
                raise
            delay = 20  # You can adjust the delay as needed
            time.sleep(delay)
            print(f"Retrying... {retries} attempts left")

zed
New Contributor III

Thank you for your response. Indeed that a retry approach can solve this issue

I noticed that if I create the table by clustering with USING DELTA CLUSTER BY the column in the below code snippet the ConcurrentAppendException does not happen. Moreover, if I add more columns to the CLUSTER strategy, like the entity drop off zip, the exception re-appears. In terms of performance, I haven't done a stress test with large enough data, but it seems reasonable to CLUSTER BY a date column as this table is queried according to the date as well. 

In addition, when I write to it it appears the following log: 

Feature table ... has its own partition strategy that may yield suboptimal point in time join performance. Consider exploring liquid clustering on this table for better performance. See https://docs.databricks.com/en/delta/clustering.html

From what I saw in the code of the Feature Engineering, I think the OPTIMIZE step is skipped and this warning is thrown. If this step is mandatory to guarantee good performance of consumer reads, I can have a separate process to perform the OPTIMIZE process. What do you think?

 

%sql
CREATE TABLE IF NOT EXISTS dbc_mlops_premium.taxi_example.trip_pickup_features(
  -- entity
  dropoff_zip INT NOT NULL,
  tpep_dropoff_hour INT NOT NULL,
  -- features
  count_trips_hourly_dropoff_zip INT,
  -- time metadata
  tpep_dropoff_run_date DATE NOT NULL,
  CONSTRAINT trip_dropoff_time_series_features_pk PRIMARY KEY (dropoff_zip, tpep_dropoff_hour, tpep_dropoff_run_date TIMESERIES)
)
USING DELTA CLUSTER BY (tpep_dropoff_run_date)
COMMENT "Section 1.2 Taxi Dropoff Features";

 

Let me know if you think this clustering solution is not efficient enough. I am new to the intricacies of Liquid clustering, and might be missing some intricacy 

VZLA
Databricks Employee
Databricks Employee

@zed Clustering by your date column can indeed help avoid the ConcurrentAppendException without incurring the strict partitioning constraints that a โ€œtime series feature tableโ€ normally disallows. Unlike partitioning, CLUSTER BY does not create physical partitions on disk for each date valueโ€”it simply organizes data files by that column, improving data skipping and reducing merge conflicts.

As for performance and the warning about โ€œsuboptimal point in time join performance,โ€ Databricks is suggesting that you use liquid clustering (an automatic layout-optimization feature). If your update frequency is high, implementing a periodic OPTIMIZE (or using liquid clustering if feasible) is usually a good idea to keep the data layout efficient for point-in-time lookups and reduce concurrency issues.

If clustering alone resolves your concurrency problems, and you have a periodic job or process that runs OPTIMIZE on this table, that should be sufficient. You may still see occasional conflicts, so still worth consider adding a lightweight retry mechanism around your writes.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group