I have the following code:
from pyspark.sql.functions import *
!pip install dbl-tempo
from tempo import TSDF
from pyspark.sql.functions import *
# interpolate target_cols column linearly for tsdf dataframe
def interpolate_tsdf(tsdf_data, target_cols=["value"]):
return tsdf_data.interpolate(
freq="1 minute",
func="mean",
target_cols=target_cols,
method="linear",
show_interpolated=False
).df
def interpolate_sensor_measurements(data):
# get the data that is not binary
df_float = data \
.filter(~col("is_binary")) \
.withColumn("value", col("value").cast("float"))
# create tempo DataFrame
input_tsdf = TSDF(
df_float,
partition_cols=["b_sensor_id"],
ts_col="measurement_time"
)
# first create 1 minute range and then interpolate on these values
# this is required to avoid tempo taking the mean instead of interpolating
# then filter to have only every 15 minutes
interpolated_tsdf_float = interpolate_tsdf(input_tsdf).filter(minute(col("measurement_time")) % 15 == 0)
# get the data that is binary, make sure any values that have categories (active, inactive) are set to 1 and 0
df_bin = data \
.filter(col("is_binary")) \
.filter(col("value").isNotNull()) \
.withColumn("value", when(col("value") == "active", 1)
.when(col("value") == "inactive", 0)
.otherwise(col("value")).cast("float"))
# create tempo DataFrame
input_tsdf_bin = TSDF(
df_bin,
partition_cols=["b_sensor_id"],
ts_col="measurement_time"
)
# first create 1 minute range and then interpolate on these values
# this is required to avoid tempo taking the mean instead of interpolating
# then filter to have only every 15 minutes
interpolated_tsdf_bin = interpolate_tsdf(input_tsdf_bin) \
.filter(minute(col("measurement_time")) % 15 == 0) \
.withColumn("value", round(col("value"))) # round value to 0 or 1, this way the closest category is taken
# take the union of both dataframes
interpolated_df = interpolated_tsdf_float.union(interpolated_tsdf_bin)
return interpolated_df
import dlt
@dlt.table(
name="interpolated_sensordata"
)
def get_interpolated_sensor_data():
return (
interpolate_sensor_measurements(dlt.read_stream("valid_sensordata")))
If I start a delta live tables pipeline including this notebook, I get the following error:
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/databricks/spark/python/dlt/helpers.py", line 28, in call
res = self.func()
File "<command--1>", line 8, in get_interpolated_sensor_data
File "<command--1>", line 33, in interpolate_sensor_measurements
File "<command--1>", line 9, in interpolate_tsdf
File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb6e8f2f-7b39-4457-aab1-8733cf50ad5c/lib/python3.9/site-packages/tempo/tsdf.py", line 1224, in interpolate
interpolated_df: DataFrame = interpolate_service.interpolate(
File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb6e8f2f-7b39-4457-aab1-8733cf50ad5c/lib/python3.9/site-packages/tempo/interpol.py", line 300, in interpolate
calculate_time_horizon(tsdf.df, ts_col, freq, partition_cols)
File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb6e8f2f-7b39-4457-aab1-8733cf50ad5c/lib/python3.9/site-packages/tempo/utils.py", line 92, in calculate_time_horizon
) = normalized_time_df.select(
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
res = func(*args, **kwargs)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 2024, in first
return self.head()
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
return func(*args, **kwargs)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 2010, in head
rs = self.head(1)
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
return func(*args, **kwargs)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 2012, in head
return self.take(n)
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
return func(*args, **kwargs)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 890, in take
return self.limit(num).collect()
File "<command--1>", line 6, in dlt_collect_fn
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
return func(*args, **kwargs)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 837, in collect
sock_info = self._jdf.collectToPython()
File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/databricks/spark/python/pyspark/sql/utils.py", line 202, in deco
raise converted from None
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
memory
If I turn it into a live table it does work, but then it fetches all data again. Does anyone have an idea how to make this work on a streaming table?