09-10-2024 05:53 AM - edited 09-10-2024 06:06 AM
spark
.readStream
.format("cloudFiles")
.schema(df_schema)
.option("cloudFiles.format", "parquet")
.load(f"{s3_path_base}/*/*")
.writeStream.foreachBatch(upsert_to_delta)
.option( "checkpointLocation", "<location_in_s3>", )
.trigger(availableNow=True)
.start()
relevant code:
def upsert_to_delta(micro_df, batch_id):
# spark DF of the columns and its type from source cdc files
spark.createDataFrame(
micro_batch_df.dtypes, schema=self.schema # schema here is just <column_name, data_type>
).createOrReplaceGlobalTempView("SOURCE_CDC_FILES_VIEW_COLUMNS")
# spark DF of the columns and its type from the delta target table
spark.createDataFrame(
spark.read.table(target_table).dtypes,
schema=self.schema, # schema here is just <column_name, data_type>
).createOrReplaceGlobalTempView("TARGET_DBX_TABLE_COLUMNS")
# (left) joining the columns from both source and target to get a list of
# columns in the source files where we take the column type of target table for any
# common columns. Giving priority to the column type of source table.
df_col = spark.sql(
f"""SELECT
'CAST(sc.' || s.column_name || ' AS ' || COALESCE(t.data_type, s.data_type) || ') AS ' || s.column_name AS column_name
FROM
global_temp.SOURCE_CDC_FILES_VIEW_COLUMNS s
LEFT JOIN global_temp.TARGET_DBX_TABLE_COLUMNS t
ON (s.column_name = t.column_name)"""
)
columns = ", ".join(list(df_col.toPandas()["column_name"]))
# Making a spark view from the streaming dataframe
micro_batch_df.createOrReplaceGlobalTempView("SOURCE_DMS_FILES_VIEW")
# Making the merge query to merge the streaming DF
sql_query_for_micro_batch = f"""MERGE INTO <target_table> s
USING (
SELECT
{columns}
FROM global_temp.SOURCE_CDC_FILES_VIEW sc
INNER JOIN (
SELECT {self.unique_key},
MAX(transact_seq) AS transact_seq
FROM global_temp.{SOURCE_CDC_FILES_VIEW}
GROUP BY 1) mc
ON
(sc.{self.unique_key} = mc.{self.unique_key}
AND sc.transact_seq = mc.transact_seq)) b
ON b.{self.unique_key} = s.{self.unique_key}
WHEN MATCHED AND b.Op = "U"
THEN UPDATE SET *
WHEN MATCHED AND b.Op = "D"
THEN DELETE
WHEN NOT MATCHED AND b.Op = "I" OR b.Op = "U"
THEN INSERT *"""
LOGGER.info("Executing the merge")
LOGGER.info(f"Merge SQL: {sql_query_for_micro_batch}")
spark.sql(sql_query_for_micro_batch)
LOGGER.info("Merge is done")
spark.catalog.dropGlobalTempView("SOURCE_CDC_FILES_VIEW_COLUMNS")
spark.catalog.dropGlobalTempView("TARGET_DBX_TABLE_COLUMNS")
spark.catalog.dropGlobalTempView("SOURCE_CDC_FILES_VIEW")
12-23-2024 08:44 AM
Thank you for sharing the detailed information about your issue. Before diving into solutions, I want to confirm if this is still an ongoing problem you're facing. Regarding the difference in job performance between "NO_ISOLATION" mode and "USER_ISOLATION" mode, it might be good to dissect the memory space with memory dumps / heap dumps, but generally speaking, I think it makes sense to consider that in "USER_ISOLATION" mode, there are additional isolation layers to ensure that resources are securely allocated to individual users. This isolation can consume extra memory and computational resources, leading to potential OOM errors and performance degradation. For example, "USER_ISOLATION" mode enforces stricter security and isolation policies, which can impact the overall performance of the cluster. This mode is designed to provide better security at the cost of some performance overhead. In your use case, we would have to benchmark and collect more data points to determine more specifically what is the memory utilization and overhead, as well as where is it coming from, what are the biggest contributors/dominators, etc.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group