04-12-2023 05:36 AM
I have the following code:
from pyspark.sql.functions import *
!pip install dbl-tempo
from tempo import TSDF
from pyspark.sql.functions import *
# interpolate target_cols column linearly for tsdf dataframe
def interpolate_tsdf(tsdf_data, target_cols=["value"]):
return tsdf_data.interpolate(
freq="1 minute",
func="mean",
target_cols=target_cols,
method="linear",
show_interpolated=False
).df
def interpolate_sensor_measurements(data):
# get the data that is not binary
df_float = data \
.filter(~col("is_binary")) \
.withColumn("value", col("value").cast("float"))
# create tempo DataFrame
input_tsdf = TSDF(
df_float,
partition_cols=["b_sensor_id"],
ts_col="measurement_time"
)
# first create 1 minute range and then interpolate on these values
# this is required to avoid tempo taking the mean instead of interpolating
# then filter to have only every 15 minutes
interpolated_tsdf_float = interpolate_tsdf(input_tsdf).filter(minute(col("measurement_time")) % 15 == 0)
# get the data that is binary, make sure any values that have categories (active, inactive) are set to 1 and 0
df_bin = data \
.filter(col("is_binary")) \
.filter(col("value").isNotNull()) \
.withColumn("value", when(col("value") == "active", 1)
.when(col("value") == "inactive", 0)
.otherwise(col("value")).cast("float"))
# create tempo DataFrame
input_tsdf_bin = TSDF(
df_bin,
partition_cols=["b_sensor_id"],
ts_col="measurement_time"
)
# first create 1 minute range and then interpolate on these values
# this is required to avoid tempo taking the mean instead of interpolating
# then filter to have only every 15 minutes
interpolated_tsdf_bin = interpolate_tsdf(input_tsdf_bin) \
.filter(minute(col("measurement_time")) % 15 == 0) \
.withColumn("value", round(col("value"))) # round value to 0 or 1, this way the closest category is taken
# take the union of both dataframes
interpolated_df = interpolated_tsdf_float.union(interpolated_tsdf_bin)
return interpolated_df
import dlt
@dlt.table(
name="interpolated_sensordata"
)
def get_interpolated_sensor_data():
return (
interpolate_sensor_measurements(dlt.read_stream("valid_sensordata")))
If I start a delta live tables pipeline including this notebook, I get the following error:
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/databricks/spark/python/dlt/helpers.py", line 28, in call
res = self.func()
File "<command--1>", line 8, in get_interpolated_sensor_data
File "<command--1>", line 33, in interpolate_sensor_measurements
File "<command--1>", line 9, in interpolate_tsdf
File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb6e8f2f-7b39-4457-aab1-8733cf50ad5c/lib/python3.9/site-packages/tempo/tsdf.py", line 1224, in interpolate
interpolated_df: DataFrame = interpolate_service.interpolate(
File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb6e8f2f-7b39-4457-aab1-8733cf50ad5c/lib/python3.9/site-packages/tempo/interpol.py", line 300, in interpolate
calculate_time_horizon(tsdf.df, ts_col, freq, partition_cols)
File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-eb6e8f2f-7b39-4457-aab1-8733cf50ad5c/lib/python3.9/site-packages/tempo/utils.py", line 92, in calculate_time_horizon
) = normalized_time_df.select(
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
res = func(*args, **kwargs)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 2024, in first
return self.head()
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
return func(*args, **kwargs)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 2010, in head
rs = self.head(1)
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
return func(*args, **kwargs)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 2012, in head
return self.take(n)
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
return func(*args, **kwargs)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 890, in take
return self.limit(num).collect()
File "<command--1>", line 6, in dlt_collect_fn
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 43, in wrapper
return func(*args, **kwargs)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 837, in collect
sock_info = self._jdf.collectToPython()
File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/databricks/spark/python/pyspark/sql/utils.py", line 202, in deco
raise converted from None
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
memory
If I turn it into a live table it does work, but then it fetches all data again. Does anyone have an idea how to make this work on a streaming table?
04-15-2023 06:11 PM
@Eelke van Foeken :
The error message indicates that there is an issue with the calculate_time_horizon function in the
tempo.utils module. Specifically, the function is being called with a DataFrame that has no records, causing a NullPointerExceptionwhen trying to select the minimum and maximum timestamps. This may be caused by the DataFrame being empty due to a filtering operation, or due to no data being received yet in the stream.
To debug this issue, you can add print statements to the interpolate_sensor_measurements function to see what data is being passed to the interpolate_tsdf function, and whether the resulting DataFrame from input_tsdf.df has any records. You can also add print statements to the calculate_time_horizon function in the tempo.utils module to see what input DataFrame is causing the issue.
If you determine that the issue is caused by no data being received in the stream, you may need to modify your pipeline to wait for data to be available before running the interpolation function. You can also consider setting a default value to return if no data is available, or using a sliding window to interpolate over a fixed time period instead of a continuous stream.
04-15-2023 11:47 PM
Hi @Eelke van Foeken
Hope everything is going great.
Just wanted to check in if you were able to resolve your issue. If yes, would you be happy to mark an answer as best so that other members can find the solution more quickly? If not, please tell us so we can help you.
Cheers!
06-15-2023 01:30 AM
The issue was not resolved because we were trying to use a streaming table within TSDF which does not work.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group