Hi!
We're working with change event data from relational and NoSQL databases then processing and ingesting that into DataBricks. It's streamed from source to our messaging platform. Then, our connector is pushing to DataBricks.
Right now we're doing that using DataBricks JDBC driver integrations and SQL - which is what we've done with most other database destinations. We're using COPY INTO with Parquet files loaded to DBFS. If the table doesn't exist already, it's created. If it does exist and schema's have drifted, column(s) are added as required. Partition and clustering fields added too.
We're working on an improvement: supporting MERGE INTO. We want to reflect the source database record 'latest state' in DataBricks.
We've got a few challenges.
One is preprocessing to avoid multiple matches (deduping) mentioned in DataBricks docs:
-
A MERGE operation can fail if multiple rows of the source dataset match and attempt to update the same rows of the target Delta table. According to the SQL semantics of merge, such an update operation is ambiguous as it is unclear which source row should be used to update the matched target row. You can preprocess the source table to eliminate the possibility of multiple matches. See the Change data capture example—it preprocesses the change dataset (that is, the source dataset) to retain only the latest change for each key before applying that change into the target Delta table
-
We looked at the Change data capture example linked, but the examples are based on Notebooks, no clear solution when using JDBC driver integration.
What's the recommended way of handling this?
A theory we had is a VIEW that retains the latest change for each key and perhaps merging from that into the final table, but that seems inefficient. We've tried writing equivalent DELETE SQL statements (for deduping) that we use for other database destinations, but they rely on either joins or sub-queries like this - which DataBricks doesn't seem to support. No joins in deletes and queries like this fail with column mismatch errors:
DELETE FROM {table} WHERE struct({id}, {timestamp}) NOT IN (SELECT struct({id}, {timestamp}) FROM {table} GROUP BY {id})
It feels like our external systems may have to rely on and interact with Notebooks to achieve this?