3 weeks ago - last edited 3 weeks ago
I am using the Feature Engineering client when writing to a time series feature table. Then I have cried two data bricks jobs with the below code. I am running with different run_dates (e.g. '2016-01-07' and '2016-01-08'). When they run concurrently, one of the jobs throws:
ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
I think this is happening because of the merge operation that the feature engineering client is using. One solution that I have read is Avoid conflicts using partitioning and disjoint command conditions. However, I cannot partition the table because my table is a Time Series Feature table, which should not be partitioned according to this doc:
A time series feature table must have one timestamp key and cannot have any partition columns. The timestamp key column must be of TimestampType or DateType.
In addition, the code snippet solution described by using the below code cannot be used as well since the write_table handles the merge operation, and is abstracted.
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
How can I write concurrently to a time series feature table then?
# Databricks notebook source
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, IntegerType, StringType
from delta.tables import DeltaTable
from databricks.feature_engineering import FeatureEngineeringClient
def get_inputs(spark, run_date: str) -> list[DataFrame]:
return {
"silver_taxi_trips": (
spark.read.load(
path="/databricks-datasets/nyctaxi-with-zipcodes/subsampled",
format="delta"
)
.filter(
F.col("tpep_pickup_datetime").cast("date") <= run_date
)
),
}
def main(spark, run_date: str) -> None:
inputs = get_inputs(spark, run_date)
df_taxi_trips = inputs["silver_taxi_trips"]
df_pickupzip_features = (
df_taxi_trips.groupBy(
"pickup_zip", F.hour("tpep_pickup_datetime").alias("hour_slot")
)
.agg(
F.mean("fare_amount").alias("mean_fare_hourly_pickup_zip"),
F.count("*").alias("count_trips_hourly_pickup_zip"),
)
.select(
F.col("pickup_zip").alias("zip"),
F.col("hour_slot"),
F.col("mean_fare_hourly_pickup_zip").cast(FloatType()),
F.col("count_trips_hourly_pickup_zip").cast(IntegerType()),
)
)
df_pickupzip_features = df_pickupzip_features.withColumn('run_date', F.to_date(F.lit(run_date)))
df_pickupzip_features = df_pickupzip_features.withColumn('writing_ts', F.current_timestamp())
fe = FeatureEngineeringClient()
fe.write_table(
name="dbc_mlops_premium.taxi_example.trip_pickup_features",
df=df_pickupzip_features
)
the table was created with:
%sql
CREATE TABLE IF NOT EXISTS dbc_mlops_premium.taxi_example.trip_pickup_features(
-- entity
zip INT NOT NULL,
hour_slot INT NOT NULL,
-- features
mean_fare_hourly_pickup_zip FLOAT,
count_trips_hourly_pickup_zip INT,
-- time metadata
run_date DATE NOT NULL,
writing_ts TIMESTAMP NOT NULL,
CONSTRAINT trip_pickup_features_pk PRIMARY KEY (zip, hour_slot, run_date TIMESERIES)
)
COMMENT "Taxi pick up features";
Friday
@zed Clustering by your date column can indeed help avoid the ConcurrentAppendException without incurring the strict partitioning constraints that a โtime series feature tableโ normally disallows. Unlike partitioning, CLUSTER BY does not create physical partitions on disk for each date valueโit simply organizes data files by that column, improving data skipping and reducing merge conflicts.
As for performance and the warning about โsuboptimal point in time join performance,โ Databricks is suggesting that you use liquid clustering (an automatic layout-optimization feature). If your update frequency is high, implementing a periodic OPTIMIZE (or using liquid clustering if feasible) is usually a good idea to keep the data layout efficient for point-in-time lookups and reduce concurrency issues.
If clustering alone resolves your concurrency problems, and you have a periodic job or process that runs OPTIMIZE on this table, that should be sufficient. You may still see occasional conflicts, so still worth consider adding a lightweight retry mechanism around your writes.
3 weeks ago
Modify your write_table
operations to ensure they are as specific as possible about the data being written. This might involve adding more granular conditions to your data filtering and writing logic.
Here is an example adjustment to your code to handle concurrent writes more effectively:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, IntegerType, StringType
from delta.tables import DeltaTable
from databricks.feature_engineering import FeatureEngineeringClient
def get_inputs(spark, run_date: str) -> list[DataFrame]:
return {
"silver_taxi_trips": (
spark.read.load(
path="/databricks-datasets/nyctaxi-with-zipcodes/subsampled",
format="delta"
)
.filter(
F.col("tpep_pickup_datetime").cast("date") <= run_date
)
),
}
def main(spark, run_date: str) -> None:
inputs = get_inputs(spark, run_date)
df_taxi_trips = inputs["silver_taxi_trips"]
df_pickupzip_features = (
df_taxi_trips.groupBy(
"pickup_zip", F.hour("tpep_pickup_datetime").alias("hour_slot")
)
.agg(
F.mean("fare_amount").alias("mean_fare_hourly_pickup_zip"),
F.count("*").alias("count_trips_hourly_pickup_zip"),
)
.select(
F.col("pickup_zip").alias("zip"),
F.col("hour_slot"),
F.col("mean_fare_hourly_pickup_zip").cast(FloatType()),
F.col("count_trips_hourly_pickup_zip").cast(IntegerType()),
)
)
df_pickupzip_features = df_pickupzip_features.withColumn('run_date', F.to_date(F.lit(run_date)))
df_pickupzip_features = df_pickupzip_features.withColumn('writing_ts', F.current_timestamp())
fe = FeatureEngineeringClient()
fe.write_table(
name="dbc_mlops_premium.taxi_example.trip_pickup_features",
df=df_pickupzip_features,
mode="merge"
)
# Ensure the table creation script is run beforehand
%sql
CREATE TABLE IF NOT EXISTS dbc_mlops_premium.taxi_example.trip_pickup_features(
zip INT NOT NULL,
hour_slot INT NOT NULL,
mean_fare_hourly_pickup_zip FLOAT,
count_trips_hourly_pickup_zip INT,
run_date DATE NOT NULL,
writing_ts TIMESTAMP NOT NULL,
CONSTRAINT trip_pickup_features_pk PRIMARY KEY (zip, hour_slot, run_date TIMESERIES)
)
COMMENT "Taxi pick up features";
3 weeks ago
Hi Walter,
You code snippet just adds the the mode argument to write_table, which is the default value, also used by my code. The error I get is when doing that merge.
fe.write_table(
name="dbc_mlops_premium.taxi_example.trip_pickup_features",
df=df_pickupzip_features,
mode="merge" # newline in your code
)
Thursday
Still no response on this thread...
Friday - last edited Friday
@zed I went through these document links, and believe you're on spot here with respect to the limitation and exclusive setups. I don't see any option other than implementing a retry logic or reducing the possibilities of such conflicts, something like:
import time
import random
from delta.exceptions import ConcurrentAppendException
retry = 5
while retry > 0:
try:
# Your update statement on the delta table
break
except ConcurrentAppendException as e:
retry -= 1
delay = random.randrange(0, 20)
time.sleep(delay)
print(f"{retry} retries left, added delay {delay} seconds")
else:
raise Exception("Update failed after multiple retries")
Concurrency control is very complex to handle. I'm not fully sure, but given the capabilities of LiquidClustering, it could potentially help with reducing the likelihood of conflicts, although at the end everything narrows down to having two whichever conflicting operations running concurrently and reducing the chances for having resulting data inconsistencies, batching updates togheter is another option, but also considered as a mitigation, I don't think there's a golden solution for this.
In your case (make sure to review it first):
fe = FeatureEngineeringClient()
retries = 5
while retries > 0:
try:
fe.write_table(
name="dbc_mlops_premium.taxi_example.trip_pickup_features",
df=df_pickupzip_features,
mode="merge"
)
break
except Exception as e:
retries -= 1
if retries == 0:
raise
delay = 20 # You can adjust the delay as needed
time.sleep(delay)
print(f"Retrying... {retries} attempts left")
Friday - last edited Friday
Thank you for your response. Indeed that a retry approach can solve this issue
I noticed that if I create the table by clustering with USING DELTA CLUSTER BY the column in the below code snippet the ConcurrentAppendException does not happen. Moreover, if I add more columns to the CLUSTER strategy, like the entity drop off zip, the exception re-appears. In terms of performance, I haven't done a stress test with large enough data, but it seems reasonable to CLUSTER BY a date column as this table is queried according to the date as well.
In addition, when I write to it it appears the following log:
From what I saw in the code of the Feature Engineering, I think the OPTIMIZE step is skipped and this warning is thrown. If this step is mandatory to guarantee good performance of consumer reads, I can have a separate process to perform the OPTIMIZE process. What do you think?
%sql
CREATE TABLE IF NOT EXISTS dbc_mlops_premium.taxi_example.trip_pickup_features(
-- entity
dropoff_zip INT NOT NULL,
tpep_dropoff_hour INT NOT NULL,
-- features
count_trips_hourly_dropoff_zip INT,
-- time metadata
tpep_dropoff_run_date DATE NOT NULL,
CONSTRAINT trip_dropoff_time_series_features_pk PRIMARY KEY (dropoff_zip, tpep_dropoff_hour, tpep_dropoff_run_date TIMESERIES)
)
USING DELTA CLUSTER BY (tpep_dropoff_run_date)
COMMENT "Section 1.2 Taxi Dropoff Features";
Let me know if you think this clustering solution is not efficient enough. I am new to the intricacies of Liquid clustering, and might be missing some intricacy
Friday
@zed Clustering by your date column can indeed help avoid the ConcurrentAppendException without incurring the strict partitioning constraints that a โtime series feature tableโ normally disallows. Unlike partitioning, CLUSTER BY does not create physical partitions on disk for each date valueโit simply organizes data files by that column, improving data skipping and reducing merge conflicts.
As for performance and the warning about โsuboptimal point in time join performance,โ Databricks is suggesting that you use liquid clustering (an automatic layout-optimization feature). If your update frequency is high, implementing a periodic OPTIMIZE (or using liquid clustering if feasible) is usually a good idea to keep the data layout efficient for point-in-time lookups and reduce concurrency issues.
If clustering alone resolves your concurrency problems, and you have a periodic job or process that runs OPTIMIZE on this table, that should be sufficient. You may still see occasional conflicts, so still worth consider adding a lightweight retry mechanism around your writes.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group