2 weeks ago
Hi ,
Pattern A — Customer → Orders (array) → OrderLines (nested array)
{
"sourceSystem": "CDS",
"ingestionTs": "2026-03-09T09:20:10Z",
"customer": {
"customerId": "CUST-1001",
"name": "Amit Das",
"email": "amit.das@example.com",
"address": {
"city": "Kolkata",
"state": "WB",
"pin": "700001"
},
"phones": [
{ "type": "mobile", "number": "+91-9000000001" },
{ "type": "office", "number": "+91-3300000002" }
],
"orders": [
{
"orderId": "ORD-501",
"orderDate": "2026-03-01",
"status": "SHIPPED",
"orderLines": [
{ "lineId": 1, "sku": "SKU-01", "qty": 2, "price": 120.5 },
{ "lineId": 2, "sku": "SKU-02", "qty": 1, "price": 999.0 }
],
"payments": [
{ "paymentId": "PAY-901", "method": "UPI", "amount": 1240.0 }
]
},
{
"orderId": "ORD-502",
"orderDate": "2026-03-05",
"status": "CREATED",
"orderLines": [
{ "lineId": 1, "sku": "SKU-03", "qty": 3, "price": 10.0 }
],
"payments": []
}
]
}
}
``
Tables typically created:
customer_master (PK: customerId)
customer_phones (FK: customerId)
customer_orders (PK: orderId, FK: customerId)
order_lines (FK: orderId)
order_payments (FK: orderId)
2 weeks ago
Hi @SantiNath_Dey,
I may not be able to deliver production-grade working code, but I'll walk you through my approach. It's a long post, but no point giving you a concise response that doesn't convey the message.
Let's start with the architecture.
The framework has three layers, all driven by config.
For the output, all entity DataFrames are written as Delta tables to the target Unity Catalog schema. A _metadata_catalog table is also generated. One row per column across all output tables, capturing table name, column name, data type, PK/FK flags, and FK references. This serves as a built-in lineage/governance layer.
The key point is that the three config tables at the top of the diagram are the only things that change when a new JSON format arrives. The engine on the left (the three layers I've described above) never changes.
You asked about how the config tables should be set up. The three config tables will look something like the below. This is just an example. Modify it as required.
Pattern_Config: One row per JSON shape. The detection_rule is a SQL boolean expression evaluated against the raw DataFrame to auto-match the right pattern.
|
Column |
Type |
Purpose |
|
pattern_id |
STRING |
Unique ID (e.g. P001) |
|
pattern_name |
STRING |
Human-readable name |
|
source_system |
STRING |
Origin system identifier |
|
detection_rule |
STRING |
SQL expression, e.g. sourceSystem = 'CDS' AND customer IS NOT NULL |
|
root_path |
STRING |
Struct to hoist as root (e.g. customer), or NULL |
|
description |
STRING |
Free text |
Entity Config: One row per target table. Defines the relational model...parent-child hierarchy, keys, and processing order.
|
Column |
Type |
Purpose |
|
pattern_id |
STRING |
Links to pattern_config |
|
entity_id |
STRING |
Unique ID (e.g. E001) |
|
entity_name |
STRING |
Target Delta table name |
|
parent_entity_id |
STRING |
Parent entity (NULL for root) |
|
pk_column |
STRING |
Primary key column name |
|
pk_auto_generate |
BOOLEAN |
True = engine generates surrogate PK |
|
fk_column |
STRING |
Foreign key column name |
|
fk_references |
STRING |
Parent table.column reference |
|
entity_order |
INT |
Processing sequence (parents first) |
Entity Explode Config: One row per entity. Tells the engine how to navigate the JSON tree and which columns to extract.
|
Column |
Type |
Purpose |
|
entity_id |
STRING |
Links to entity_config |
|
explode_path |
STRING |
Dot-path to the array (e.g. orders.orderLines), NULL for root |
|
explode_type |
STRING |
root, array_of_struct, or nested_array |
|
select_columns |
STRING |
Comma-separated column list for this entity |
|
flatten_structs |
STRING |
Struct flattening rule: struct_col -> field1, field2 |
|
alias_map |
STRING |
Column renames (optional) |
Let's now talk about the ERD for the example you gave. I'm using your example as the basis for the demonstration. I'm creating 5 normalised tables from a single nested JSON document as you wanted.
All relationships are 1-to-many (one customer has many phones/orders, one order has many lines/payments). The PK/FK pairs are fully defined in entity_config, so the same ER structure can be reproduced from config alone... no hardcoded joins in the engine.
From a coding perspective, here is how I'll approach it. This is just pseudo code as requested. Use this as reference to build what you need.
Step 1 is to load the config..
patterns = read_table("pattern_config") -- all patterns
entities = read_table("entity_config") -- ordered by entity_order
explode_cfg = read_table("entity_explode_config") -- keyed by entity_id
Step 2 is to read the JSON and detect the pattern.
df_raw = spark.read.json(input_path)
FOR each pattern IN patterns:
IF df_raw.filter(expr(pattern.detection_rule)).count > 0:
matched_pattern = pattern
BREAK
IF no match:
RAISE "No pattern matched"
Step 3 is to apply root path
IF matched_pattern.root_path IS NOT NULL:
-- Preserve top-level scalar metadata (sourceSystem, ingestionTs)
meta_cols = [scalar columns that are NOT the root_path struct]
df_hoisted = df_raw.select(meta_cols, root_path.*)
ELSE:
df_hoisted = df_raw
Step 4 is where you process each entity..
FOR each entity IN entities WHERE entity.pattern_id = matched_pattern.id:
exp = explode_cfg[entity.entity_id]
-- 4a. Resolve explode context
IF exp.explode_type == "root":
working_df = df_hoisted
ELSE:
working_df = df_hoisted
FOR each segment IN exp.explode_path.split("."):
IF segment is ArrayType:
working_df = working_df.explode_outer(segment)
-- expand struct fields if element is struct
ELIF segment is StructType:
working_df = working_df.select(segment.*)
-- 4b. Flatten structs (if configured)
IF exp.flatten_structs IS NOT NULL:
parse "struct_col -> field1, field2, field3"
FOR each field:
working_df = working_df.withColumn(
"struct_col__field", col("struct_col.field")
)
-- 4c. Select configured columns + FK
final_cols = []
IF entity.fk_column:
final_cols.append(entity.fk_column)
final_cols.extend(parse(exp.select_columns))
entity_df = working_df.select(final_cols)
-- 4d. Auto-generate PK if needed
IF entity.pk_auto_generate:
entity_df = entity_df.withColumn(
entity.pk_column,
concat(fk_col, "_", monotonically_increasing_id)
)
-- 4e. Deduplicate and register
output_tables[entity.entity_name] = entity_df.dropDuplicates()
Step is where you write the output..
FOR each table_name, df IN output_tables:
df.write.format("delta")
.mode("overwrite")
.saveAsTable("catalog.output_schema.{table_name}")
-- Also write a metadata catalog table
metadata_df = build_metadata(entities, output_tables)
metadata_df.write ... saveAsTable("catalog.output_schema._metadata_catalog")
I hope this covers what you were looking for. Good luck!
If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.
2 weeks ago
Hi @SantiNath_Dey ,
Given you’re dealing with multiple complex JSON structures, I agree with a metadata‑driven approach rather than hard‑coding the flattening logic.
At a high level:
Define a small set of config tables:
Build a generic flattening/orchestration layer that:
The initial config could feel a bit heavier, but it scales really well. When a new JSON pattern shows up, you only add rows to the config tables (new pattern, entities, and exploded paths). You don’t touch the flattening/orchestration code at all.
Does that give you a rough idea to start with?
If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.
2 weeks ago
Thanks for your response! Could you please share a high-level sudo code example that explains the config table structure and how to define patterns in a Delta table? Additionally, I'd like to understand how to flatten complex JSON data using a metadata-driven approach.
Currently, I've implemented a flattening logic using a recursive function that processes the entire JSON file, but it's not working as expected for some complex scenarios. I'd also like to restrict the child tables, as not all scenarios require all tables, and this should be a config-driven approach.
If you have any reference sudo code or approach, it would be really helpful. Thanks in advance!
2 weeks ago
Hi @SantiNath_Dey,
I may not be able to deliver production-grade working code, but I'll walk you through my approach. It's a long post, but no point giving you a concise response that doesn't convey the message.
Let's start with the architecture.
The framework has three layers, all driven by config.
For the output, all entity DataFrames are written as Delta tables to the target Unity Catalog schema. A _metadata_catalog table is also generated. One row per column across all output tables, capturing table name, column name, data type, PK/FK flags, and FK references. This serves as a built-in lineage/governance layer.
The key point is that the three config tables at the top of the diagram are the only things that change when a new JSON format arrives. The engine on the left (the three layers I've described above) never changes.
You asked about how the config tables should be set up. The three config tables will look something like the below. This is just an example. Modify it as required.
Pattern_Config: One row per JSON shape. The detection_rule is a SQL boolean expression evaluated against the raw DataFrame to auto-match the right pattern.
|
Column |
Type |
Purpose |
|
pattern_id |
STRING |
Unique ID (e.g. P001) |
|
pattern_name |
STRING |
Human-readable name |
|
source_system |
STRING |
Origin system identifier |
|
detection_rule |
STRING |
SQL expression, e.g. sourceSystem = 'CDS' AND customer IS NOT NULL |
|
root_path |
STRING |
Struct to hoist as root (e.g. customer), or NULL |
|
description |
STRING |
Free text |
Entity Config: One row per target table. Defines the relational model...parent-child hierarchy, keys, and processing order.
|
Column |
Type |
Purpose |
|
pattern_id |
STRING |
Links to pattern_config |
|
entity_id |
STRING |
Unique ID (e.g. E001) |
|
entity_name |
STRING |
Target Delta table name |
|
parent_entity_id |
STRING |
Parent entity (NULL for root) |
|
pk_column |
STRING |
Primary key column name |
|
pk_auto_generate |
BOOLEAN |
True = engine generates surrogate PK |
|
fk_column |
STRING |
Foreign key column name |
|
fk_references |
STRING |
Parent table.column reference |
|
entity_order |
INT |
Processing sequence (parents first) |
Entity Explode Config: One row per entity. Tells the engine how to navigate the JSON tree and which columns to extract.
|
Column |
Type |
Purpose |
|
entity_id |
STRING |
Links to entity_config |
|
explode_path |
STRING |
Dot-path to the array (e.g. orders.orderLines), NULL for root |
|
explode_type |
STRING |
root, array_of_struct, or nested_array |
|
select_columns |
STRING |
Comma-separated column list for this entity |
|
flatten_structs |
STRING |
Struct flattening rule: struct_col -> field1, field2 |
|
alias_map |
STRING |
Column renames (optional) |
Let's now talk about the ERD for the example you gave. I'm using your example as the basis for the demonstration. I'm creating 5 normalised tables from a single nested JSON document as you wanted.
All relationships are 1-to-many (one customer has many phones/orders, one order has many lines/payments). The PK/FK pairs are fully defined in entity_config, so the same ER structure can be reproduced from config alone... no hardcoded joins in the engine.
From a coding perspective, here is how I'll approach it. This is just pseudo code as requested. Use this as reference to build what you need.
Step 1 is to load the config..
patterns = read_table("pattern_config") -- all patterns
entities = read_table("entity_config") -- ordered by entity_order
explode_cfg = read_table("entity_explode_config") -- keyed by entity_id
Step 2 is to read the JSON and detect the pattern.
df_raw = spark.read.json(input_path)
FOR each pattern IN patterns:
IF df_raw.filter(expr(pattern.detection_rule)).count > 0:
matched_pattern = pattern
BREAK
IF no match:
RAISE "No pattern matched"
Step 3 is to apply root path
IF matched_pattern.root_path IS NOT NULL:
-- Preserve top-level scalar metadata (sourceSystem, ingestionTs)
meta_cols = [scalar columns that are NOT the root_path struct]
df_hoisted = df_raw.select(meta_cols, root_path.*)
ELSE:
df_hoisted = df_raw
Step 4 is where you process each entity..
FOR each entity IN entities WHERE entity.pattern_id = matched_pattern.id:
exp = explode_cfg[entity.entity_id]
-- 4a. Resolve explode context
IF exp.explode_type == "root":
working_df = df_hoisted
ELSE:
working_df = df_hoisted
FOR each segment IN exp.explode_path.split("."):
IF segment is ArrayType:
working_df = working_df.explode_outer(segment)
-- expand struct fields if element is struct
ELIF segment is StructType:
working_df = working_df.select(segment.*)
-- 4b. Flatten structs (if configured)
IF exp.flatten_structs IS NOT NULL:
parse "struct_col -> field1, field2, field3"
FOR each field:
working_df = working_df.withColumn(
"struct_col__field", col("struct_col.field")
)
-- 4c. Select configured columns + FK
final_cols = []
IF entity.fk_column:
final_cols.append(entity.fk_column)
final_cols.extend(parse(exp.select_columns))
entity_df = working_df.select(final_cols)
-- 4d. Auto-generate PK if needed
IF entity.pk_auto_generate:
entity_df = entity_df.withColumn(
entity.pk_column,
concat(fk_col, "_", monotonically_increasing_id)
)
-- 4e. Deduplicate and register
output_tables[entity.entity_name] = entity_df.dropDuplicates()
Step is where you write the output..
FOR each table_name, df IN output_tables:
df.write.format("delta")
.mode("overwrite")
.saveAsTable("catalog.output_schema.{table_name}")
-- Also write a metadata catalog table
metadata_df = build_metadata(entities, output_tables)
metadata_df.write ... saveAsTable("catalog.output_schema._metadata_catalog")
I hope this covers what you were looking for. Good luck!
If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.
2 weeks ago
Thanks for the quick response! I'm going to dive in and start building the implementation now.