<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: FeatureEngineeringClient loses timestamp_keys after write_table in Machine Learning</title>
    <link>https://community.databricks.com/t5/machine-learning/featureengineeringclient-loses-timestamp-keys-after-write-table/m-p/52046#M2736</link>
    <description>&lt;P&gt;Needed to update to runtime 13.3 ...&lt;/P&gt;</description>
    <pubDate>Wed, 15 Nov 2023 13:34:59 GMT</pubDate>
    <dc:creator>VincentP</dc:creator>
    <dc:date>2023-11-15T13:34:59Z</dc:date>
    <item>
      <title>FeatureEngineeringClient loses timestamp_keys after write_table</title>
      <link>https://community.databricks.com/t5/machine-learning/featureengineeringclient-loses-timestamp-keys-after-write-table/m-p/51995#M2735</link>
      <description>&lt;P&gt;I am trying to use the&amp;nbsp;FeatureEngineeringClient to setup a feature store table with a time series component. However, after initiating the table with a time series column, the key exists, but the key is removed after adding data to the table. Therefore no point-in-time joins can be performed after adding data.&lt;/P&gt;&lt;P&gt;Minimal example:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;query_prd_schema = types.StructType([
    types.StructField("timestamp", types.TimestampType(), True),
    types.StructField("search_query", types.StringType(), True),
    types.StructField("prd_id", types.StringType(), True),
    types.StructField("start_agg_window", types.DateType(), True),
    types.StructField("end_agg_window", types.DateType(), True),
    types.StructField("a2c_rate", types.FloatType(), True),
])

fe.create_table(
    name=table_name,
    primary_keys=["search_query", "prd_id", 'timestamp'],
    timeseries_columns=['timestamp'],
    schema=query_prd_schema,
    description="Features aggregated on query, prd level"
)&lt;/LI-CODE&gt;&lt;P&gt;The code below still returns a list with the timestamp col, but after doing the following:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;fe.write_table(
    name=table_name,
    df=a2c_rates,
    mode="merge"
)&lt;/LI-CODE&gt;&lt;P&gt;The same query returns an empty list. So after calling&amp;nbsp;&lt;SPAN&gt;create_training_set with a FeatureLookup that has a &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;timestamp_lookup_key, this results in errors.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="inherit" color="#183139"&gt;The a2c_rates table has the following &lt;/FONT&gt;&lt;FONT color="#183139"&gt;&lt;SPAN&gt;types&lt;/SPAN&gt;&lt;/FONT&gt;&lt;FONT face="inherit" color="#183139"&gt;:&lt;/FONT&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;[('timestamp', 'timestamp'),
 ('search_query', 'string'),
 ('prd_id', 'string'),
 ('a2c_rate', 'float'),
 ('start_agg_window', 'date'),
 ('end_agg_window', 'date')]&lt;/LI-CODE&gt;&lt;P&gt;&lt;SPAN&gt;So that should be fine, right?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Code to produce the a2c_rates table:&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Start with GA data for one day
preprocessed = spark.createDataFrame(
    pd.DataFrame(
        {
            'hitDate': [
                '2023-01-01', '2023-01-01', '2023-01-01', '2023-01-01', '2023-01-01',
                '2023-01-01', '2023-01-01', '2023-01-01', '2023-01-01', '2023-01-01',
                '2023-01-01', '2023-01-01', '2023-01-01', '2023-01-01', '2023-01-01',
                ],
            'emda_id': [
                'emda1', 'emda1', 'emda1', 'emda1', 'emda1',
                'emda2', 'emda2', 'emda2', 'emda2', 'emda2',
                'emda3', 'emda3', 'emda3', 'emda3', 'emda3',
                ],
            'interaction_type': [
                'search', 'impressed', 'impressed', 'impressed', 'added_to_basket',
                'search', 'impressed', 'impressed', 'impressed', 'added_to_basket',
                'search', 'impressed', 'impressed', 'impressed', 'added_to_basket',
                ], 
            'search_query': [
                'pizza', 'pizza', 'pizza', 'pizza', 'pizza',
                'pizza', 'pizza', 'pizza', 'pizza', 'pizza',
                'pizza', 'pizza', 'pizza', 'pizza', 'pizza',
                ],
            'prd_id': [
                None, '001', '002', '003', '001', 
                None, '001', '002', '003', '002', 
                None, '001', '002', '003', '001', 
            ],
        }
    )
)

agg_impressions = (preprocessed
     .filter(F.col('interaction_type') == 'impressed')
     .groupBy('prd_id', 'search_query', window("hitDate", "1 day"))
     .count()
     .withColumnRenamed('count', 'impr_count')
     .withColumn('start_agg_window', F.to_date(F.col('window.start')))
     .withColumn('end_agg_window', F.date_sub(F.to_date(F.col('window.end')), 1))
)

agg_adds = (preprocessed
            .filter(F.col('interaction_type') == 'added_to_basket')
            .groupBy('search_query', 'prd_id', window("hitDate", "1 day"))
            .count()
            .withColumnRenamed('count', 'add_count')
            .withColumn('start_agg_window', F.to_date(F.col('window.start')))
            .withColumn('end_agg_window', F.date_sub(F.to_date(F.col('window.end')), 1))
)

a2c_rates = (agg_impressions
             .join(agg_adds, how='left', on=['search_query', 'prd_id', 'start_agg_window', 'end_agg_window'])
             .withColumn('a2c_rate', F.col('add_count') / F.col('impr_count'))
             .withColumn('a2c_rate', F.col('a2c_rate').cast('float'))
             .fillna(0, subset=['a2c_rate'])
             .withColumn('timestamp', (F.unix_timestamp(F.col('end_agg_window')) + (60*60*24)+60*60*7).cast('timestamp'))
             .select('timestamp', 'search_query', 'prd_id', 'a2c_rate', 'start_agg_window', 'end_agg_window')
)

a2c_rates = a2c_rates.withColumn('timestamp', F.to_timestamp(F.col('timestamp')))&lt;/LI-CODE&gt;</description>
      <pubDate>Wed, 15 Nov 2023 10:39:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/featureengineeringclient-loses-timestamp-keys-after-write-table/m-p/51995#M2735</guid>
      <dc:creator>VincentP</dc:creator>
      <dc:date>2023-11-15T10:39:33Z</dc:date>
    </item>
    <item>
      <title>Re: FeatureEngineeringClient loses timestamp_keys after write_table</title>
      <link>https://community.databricks.com/t5/machine-learning/featureengineeringclient-loses-timestamp-keys-after-write-table/m-p/52046#M2736</link>
      <description>&lt;P&gt;Needed to update to runtime 13.3 ...&lt;/P&gt;</description>
      <pubDate>Wed, 15 Nov 2023 13:34:59 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/featureengineeringclient-loses-timestamp-keys-after-write-table/m-p/52046#M2736</guid>
      <dc:creator>VincentP</dc:creator>
      <dc:date>2023-11-15T13:34:59Z</dc:date>
    </item>
  </channel>
</rss>

