cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

I want to perform interpolation on a streaming table in delta live tables.

Eelke
New Contributor II

I have the following code:

from pyspark.sql.functions import * 
!pip install dbl-tempo
from tempo import TSDF
 
from pyspark.sql.functions import *
 
# interpolate target_cols column linearly for tsdf dataframe
def interpolate_tsdf(tsdf_data, target_cols=["value"]):
    return tsdf_data.interpolate(
        freq="1 minute",
        func="mean",
        target_cols=target_cols,
        method="linear",
        show_interpolated=False
    ).df
 
def interpolate_sensor_measurements(data):
    # get the data that is not binary
    df_float = data \
        .filter(~col("is_binary")) \
        .withColumn("value", col("value").cast("float"))
 
    # create tempo DataFrame
    input_tsdf = TSDF(
        df_float,
        partition_cols=["b_sensor_id"],
        ts_col="measurement_time"
    )
 
    # first create 1 minute range and then interpolate on these values
    # this is required to avoid tempo taking the mean instead of interpolating
    # then filter to have only every 15 minutes
    interpolated_tsdf_float = interpolate_tsdf(input_tsdf).filter(minute(col("measurement_time")) % 15 == 0)
 
    # get the data that is binary, make sure any values that have categories (active, inactive) are set to 1 and 0
    df_bin = data \
        .filter(col("is_binary")) \
        .filter(col("value").isNotNull()) \
        .withColumn("value", when(col("value") == "active", 1)
                            .when(col("value") == "inactive", 0)
                            .otherwise(col("value")).cast("float"))
 
    # create tempo DataFrame
    input_tsdf_bin = TSDF(
        df_bin,
        partition_cols=["b_sensor_id"],
        ts_col="measurement_time"
    )
 
    # first create 1 minute range and then interpolate on these values
    # this is required to avoid tempo taking the mean instead of interpolating
    # then filter to have only every 15 minutes
    interpolated_tsdf_bin = interpolate_tsdf(input_tsdf_bin) \
        .filter(minute(col("measurement_time")) % 15 == 0) \
        .withColumn("value", round(col("value"))) # round value to 0 or 1, this way the closest category is taken
 
    # take the union of both dataframes
    interpolated_df = interpolated_tsdf_float.union(interpolated_tsdf_bin)
 
    return interpolated_df
import dlt
 
@dlt.table(
  name="interpolated_sensordata"
)
def get_interpolated_sensor_data():
    return (
        interpolate_sensor_measurements(dlt.read_stream("valid_sensordata")))

If I start a delta live tables pipeline including this notebook, I get the following error:

py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/databricks/spark/python/dlt/helpers.py", line 28, in call
    res = self.func()
  File "<command--1>", line 8, in get_interpolated_sensor_data
  File "<command--1>", line 33, in interpolate_sensor_measurements
  File "<command--1>", line 9, in interpolate_tsdf
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb6e8f2f-7b39-4457-aab1-8733cf50ad5c/lib/python3.9/site-packages/tempo/tsdf.py", line 1224, in interpolate
    interpolated_df: DataFrame = interpolate_service.interpolate(
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb6e8f2f-7b39-4457-aab1-8733cf50ad5c/lib/python3.9/site-packages/tempo/interpol.py", line 300, in interpolate
    calculate_time_horizon(tsdf.df, ts_col, freq, partition_cols)
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb6e8f2f-7b39-4457-aab1-8733cf50ad5c/lib/python3.9/site-packages/tempo/utils.py", line 92, in calculate_time_horizon
    ) = normalized_time_df.select(
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 2024, in first
    return self.head()
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
    return func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 2010, in head
    rs = self.head(1)
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
    return func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 2012, in head
    return self.take(n)
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
    return func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 890, in take
    return self.limit(num).collect()
  File "<command--1>", line 6, in dlt_collect_fn
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
    return func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 837, in collect
    sock_info = self._jdf.collectToPython()
  File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/sql/utils.py", line 202, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
memory

If I turn it into a live table it does work, but then it fetches all data again. Does anyone have an idea how to make this work on a streaming table?

3 REPLIES 3

Anonymous
Not applicable

@Eelke van Foeken​ :

The error message indicates that there is an issue with the calculate_time_horizon function in the

tempo.utils module. Specifically, the function is being called with a DataFrame that has no records, causing a NullPointerExceptionwhen trying to select the minimum and maximum timestamps. This may be caused by the DataFrame being empty due to a filtering operation, or due to no data being received yet in the stream.

To debug this issue, you can add print statements to the interpolate_sensor_measurements function to see what data is being passed to the interpolate_tsdf function, and whether the resulting DataFrame from input_tsdf.df has any records. You can also add print statements to the calculate_time_horizon function in the tempo.utils module to see what input DataFrame is causing the issue.

If you determine that the issue is caused by no data being received in the stream, you may need to modify your pipeline to wait for data to be available before running the interpolation function. You can also consider setting a default value to return if no data is available, or using a sliding window to interpolate over a fixed time period instead of a continuous stream.

Anonymous
Not applicable

Hi @Eelke van Foeken​ 

Hope everything is going great.

Just wanted to check in if you were able to resolve your issue. If yes, would you be happy to mark an answer as best so that other members can find the solution more quickly? If not, please tell us so we can help you. 

Cheers!

Eelke
New Contributor II

The issue was not resolved because we were trying to use a streaming table within TSDF which does not work.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!