cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to keep data in time-based localized clusters after joining?

Erik_L
Contributor II

I have a bunch of data frames from different data sources. They are all time series data in order of a column timestamp, which is an int32 Unix timestamp. I can join them together by this and another column join_idx which is basically an integer index ordered by timestamp. When I join these data frames together, the resulting data frame is now out of order, which results in extremely slow RANGE BETWEEN queries. I need to be able to bucket the times into non-rolling windows as part of point in range queries. Is there any way that I can use Databricks' localization based on timestamps to keep the data collected?

df_all = (
        df_1.join(df_2, on=['timestamp', 'join_idx'], how="inner")
        .join(df_3, on=['timestamp', 'join_idx'], how="inner")
        .join(df_4, on=['timestamp', 'join_idx'], how="inner")
        .withColumn("id", lit(id))
    ).writeTo("joined_table").createOrReplace()

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Erik Louie​ :

To efficiently perform RANGE BETWEEN queries on your joined data frame, you can try bucketing the data based on the timestamp column. Bucketing ensures that data with similar timestamps are stored together in the same file, making range queries faster as Spark can skip over files that are outside the range being queried.

Here's an example of how to bucket your data:

from pyspark.sql.functions import bucket
 
# Bucket the data on the timestamp column into 100 buckets
num_buckets = 100
bucket_col = "timestamp_bucket"
df_bucketed = df_all.withColumn(bucket_col, bucket("timestamp", num_buckets))
 
# Write the bucketed data to a table
df_bucketed.write.mode("overwrite").bucketBy(num_buckets, bucket_col).sortBy("timestamp").saveAsTable("bucketed_table")
 
# Query the bucketed data using a range query
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
 
w = Window.orderBy("timestamp")
df_query = (
    spark.table("bucketed_table")
    .filter("timestamp >= {start_time} AND timestamp <= {end_time}")
    .withColumn("row_num", row_number().over(w))
    .filter("row_num BETWEEN {start_row} AND {end_row}")
)

In this example, we first bucket the data on the timestamp column into 100 buckets using the bucket function. We then write the bucketed data to a table using bucketBy and sortBy. Finally, we perform a range query on the bucketed data using filter and row_number.

Note that you will need to adjust the number of buckets to fit the size of your data and the range of timestamps you are querying. You may also need to experiment with different bucketing strategies to find the best performance for your use case.

View solution in original post

3 REPLIES 3

Anonymous
Not applicable

@Erik Louie​ :

If the data frames have different time zones, you can use Databricks' timezone conversion function to convert them to a common time zone. You can use the from_utc_timestamp or to_utc_timestamp

function to convert the timestamp column to a UTC timestamp, and then use the date_format function to convert it to a timestamp string in a particular time zone.

For example, if your data frames have timestamps in different time zones, you can use the following code to convert them to a common time zone:

from pyspark.sql.functions import from_utc_timestamp, to_utc_timestamp, date_format
 
# Convert the timestamp column to a UTC timestamp
df_1_utc = df_1.withColumn('timestamp_utc', from_utc_timestamp(df_1.timestamp, df_1.timezone))
df_2_utc = df_2.withColumn('timestamp_utc', from_utc_timestamp(df_2.timestamp, df_2.timezone))
 
# Convert to a timestamp string in a particular time zone
df_1_localized = df_1_utc.withColumn('timestamp_local', date_format(to_utc_timestamp(df_1_utc.timestamp_utc, 'America/Los_Angeles'), 'yyyy-MM-dd HH:mm:ss'))
df_2_localized = df_2_utc.withColumn('timestamp_local', date_format(to_utc_timestamp(df_2_utc.timestamp_utc, 'America/Los_Angeles'), 'yyyy-MM-dd HH:mm:ss'))
 
# Join the data frames on the localized timestamp column
df_all = (
    df_1_localized.join(df_2_localized, on=['timestamp_local', 'join_idx'], how="inner")
    .join(df_3, on=['timestamp_local', 'join_idx'], how="inner")
    .join(df_4, on=['timestamp_local', 'join_idx'], how="inner")
    .withColumn("id", lit(id))
).writeTo("joined_table").createOrReplace()

This code converts the timestamp column to a UTC timestamp using from_utc_timestamp, and then converts it to a localized timestamp string using to_utc_timestamp and date_format. Finally, it joins the data frames on the localized timestamp column.

My apologies -- it seems like the title mislead you. I appreciate the long and thorough answer to another problem, but my problem is on how to keep data in local file clusters for efficient `RANGE BETWEEN` queries.

Anonymous
Not applicable

@Erik Louie​ :

To efficiently perform RANGE BETWEEN queries on your joined data frame, you can try bucketing the data based on the timestamp column. Bucketing ensures that data with similar timestamps are stored together in the same file, making range queries faster as Spark can skip over files that are outside the range being queried.

Here's an example of how to bucket your data:

from pyspark.sql.functions import bucket
 
# Bucket the data on the timestamp column into 100 buckets
num_buckets = 100
bucket_col = "timestamp_bucket"
df_bucketed = df_all.withColumn(bucket_col, bucket("timestamp", num_buckets))
 
# Write the bucketed data to a table
df_bucketed.write.mode("overwrite").bucketBy(num_buckets, bucket_col).sortBy("timestamp").saveAsTable("bucketed_table")
 
# Query the bucketed data using a range query
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
 
w = Window.orderBy("timestamp")
df_query = (
    spark.table("bucketed_table")
    .filter("timestamp >= {start_time} AND timestamp <= {end_time}")
    .withColumn("row_num", row_number().over(w))
    .filter("row_num BETWEEN {start_row} AND {end_row}")
)

In this example, we first bucket the data on the timestamp column into 100 buckets using the bucket function. We then write the bucketed data to a table using bucketBy and sortBy. Finally, we perform a range query on the bucketed data using filter and row_number.

Note that you will need to adjust the number of buckets to fit the size of your data and the range of timestamps you are querying. You may also need to experiment with different bucketing strategies to find the best performance for your use case.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!