cancel
Showing results for 
Search instead for 
Did you mean: 
Databricks Free Edition Help
Engage in discussions about the Databricks Free Edition within the Databricks Community. Share insights, tips, and best practices for getting started, troubleshooting issues, and maximizing the value of your trial experience to explore Databricks' capabilities effectively.
cancel
Showing results for 
Search instead for 
Did you mean: 

Complex Json file Flatten Dynamically

SantiNath_Dey
New Contributor II

Hi ,

We have multiple complex JSON files and need to flatten them, especially handling array data types. Whenever an array is present, we need to create a new child table and establish a relationship between the master and child tables. This will follow a metadata-driven architecture. Additionally, we have different JSON formats, so the flattening logic must also work based on patterns. Could you please help with this? It’s an advanced requirement, and your support would be greatly appreciated.
 

Pattern A — Customer → Orders (array) → OrderLines (nested array)
{
  "sourceSystem": "CDS",
  "ingestionTs": "2026-03-09T09:20:10Z",
  "customer": {
    "customerId": "CUST-1001",
    "name": "Amit Das",
    "email": "amit.das@example.com",
    "address": {
      "city": "Kolkata",
      "state": "WB",
      "pin": "700001"
    },
    "phones": [
      { "type": "mobile", "number": "+91-9000000001" },
      { "type": "office", "number": "+91-3300000002" }
    ],
    "orders": [
      {
        "orderId": "ORD-501",
        "orderDate": "2026-03-01",
        "status": "SHIPPED",
        "orderLines": [
          { "lineId": 1, "sku": "SKU-01", "qty": 2, "price": 120.5 },
          { "lineId": 2, "sku": "SKU-02", "qty": 1, "price": 999.0 }
        ],
        "payments": [
          { "paymentId": "PAY-901", "method": "UPI", "amount": 1240.0 }
        ]
      },
      {
        "orderId": "ORD-502",
        "orderDate": "2026-03-05",
        "status": "CREATED",
        "orderLines": [
          { "lineId": 1, "sku": "SKU-03", "qty": 3, "price": 10.0 }
        ],
        "payments": []
      }
    ]
  }
}
``
Tables typically created:

customer_master (PK: customerId)
customer_phones (FK: customerId)
customer_orders (PK: orderId, FK: customerId)
order_lines (FK: orderId)
order_payments (FK: orderId)

1 ACCEPTED SOLUTION

Accepted Solutions

Hi @SantiNath_Dey,

I may not be able to deliver production-grade working code, but I'll walk you through my approach. It's a long post, but no point giving you a concise response that doesn't convey the message.

Let's start with the architecture. 

Flattener_Architecture.jpg

 The framework has three layers, all driven by config.                                                                                      

  •  Detection Layer... When raw JSON arrives (from a UC Volume or a Bronze layer), the engine reads pattern_config and evaluates each detection_rule (a SQL expression like sourceSystem = 'CDS' AND customer IS NOT NULL) against the DataFrame. First match wins. This identifies which pattern to apply and whether to hoist a nested struct as the root (e.g., customer).
  • Orchestration Layer... Once the pattern is identified, the engine pulls all entities from entity_config for that pattern_id, ordered by entity_order. This guarantees parents are processed before children. Each entity row defines the target table name, PK (natural or auto-generated), FK, and its parent in the hierarchy.
  • Flattening Layer... For each entity, the engine reads entity_explode_config to get the explode plan. It walks the explode_path (e.g., orders.orderLines), exploding each intermediate array segment, flattens any configured structs inline (e.g., address -> city, state, pin), selects only the configured columns, and injects the FK from the parent entity. The resulting DataFrame is deduplicated and registered as an output table.

For the output, all entity DataFrames are written as Delta tables to the target Unity Catalog schema. A _metadata_catalog table is also generated. One row per column across all output tables, capturing table name, column name, data type, PK/FK flags, and FK references. This serves as a built-in lineage/governance layer.

The key point is that the three config tables at the top of the diagram are the only things that change when a new JSON format arrives. The engine on the left (the three layers I've described above) never changes.

You asked about how the config tables should be set up. The three config tables will look something like the below. This is just an example. Modify it as required.

Pattern_Config: One row per JSON shape. The detection_rule is a SQL boolean expression evaluated against the raw DataFrame to auto-match the right pattern.

Column

Type

Purpose

pattern_id

STRING

Unique ID (e.g. P001)

pattern_name

STRING

Human-readable name

source_system

STRING

Origin system identifier

detection_rule

STRING

SQL expression, e.g. sourceSystem = 'CDS' AND customer IS NOT NULL

root_path

STRING

Struct to hoist as root (e.g. customer), or NULL

description

STRING

Free text

Entity Config: One row per target table. Defines the relational model...parent-child hierarchy, keys, and processing order.

Column

Type

Purpose

pattern_id

STRING

Links to pattern_config

entity_id

STRING

Unique ID (e.g. E001)

entity_name

STRING

Target Delta table name

parent_entity_id

STRING

Parent entity (NULL for root)

pk_column

STRING

Primary key column name

pk_auto_generate

BOOLEAN

True = engine generates surrogate PK

fk_column

STRING

Foreign key column name

fk_references

STRING

Parent table.column reference

entity_order

INT

Processing sequence (parents first)

Entity Explode Config: One row per entity. Tells the engine how to navigate the JSON tree and which columns to extract.

Column

Type

Purpose

entity_id

STRING

Links to entity_config

explode_path

STRING

Dot-path to the array (e.g. orders.orderLines), NULL for root

explode_type

STRING

root, array_of_struct, or nested_array

select_columns

STRING

Comma-separated column list for this entity

flatten_structs

STRING

Struct flattening rule: struct_col -> field1, field2

alias_map

STRING

Column renames (optional)

Let's now talk about the ERD for the example you gave. I'm using your example as the basis for the demonstration. I'm creating 5 normalised tables from a single nested JSON document as you wanted.  

  • customer_master is the root table, keyed on customerId. It holds the scalar customer fields plus the flattened address struct (address__cityaddress__state, address__pin) and the top-level metadata columns (sourceSystem, ingestionTs).
  • customer_phones and customer_orders are direct children of customer_master, both linked via customerId as FK. Phones has an auto-generated surrogate PK since the source array has no natural key. Orders use orderId as a natural PK.
  • order_lines and order_payments sit one level deeper, both linked to customer_orders via orderId. These come from nested arrays within the orders array. When you run the script, the engine walks the dotted path orders.orderLines and orders. payments automatically, exploding each intermediate array along the way.

All relationships are 1-to-many (one customer has many phones/orders, one order has many lines/payments). The PK/FK pairs are fully defined in entity_config, so the same ER structure can be reproduced from config alone... no hardcoded joins in the engine.

Flattener_ERD.jpg

From a coding perspective, here is how I'll approach it. This is just pseudo code as requested. Use this as reference to build what you need.

Step 1 is to load the config..

patterns    = read_table("pattern_config")      -- all patterns
entities    = read_table("entity_config")        -- ordered by entity_order
explode_cfg = read_table("entity_explode_config") -- keyed by entity_id

 

Step 2 is to read the JSON and detect the pattern.

df_raw = spark.read.json(input_path)

FOR each pattern IN patterns:
    IF df_raw.filter(expr(pattern.detection_rule)).count > 0:
        matched_pattern = pattern
        BREAK

IF no match:
    RAISE "No pattern matched"

Step 3 is to apply root path

IF matched_pattern.root_path IS NOT NULL:
    -- Preserve top-level scalar metadata (sourceSystem, ingestionTs)
    meta_cols = [scalar columns that are NOT the root_path struct]
    df_hoisted = df_raw.select(meta_cols, root_path.*)

ELSE:
    df_hoisted = df_raw

Step 4 is where you process each entity..

 

FOR each entity IN entities WHERE entity.pattern_id = matched_pattern.id:

    exp = explode_cfg[entity.entity_id]

    -- 4a. Resolve explode context
    IF exp.explode_type == "root":
        working_df = df_hoisted

    ELSE:
        working_df = df_hoisted
        FOR each segment IN exp.explode_path.split("."):
            IF segment is ArrayType:
                working_df = working_df.explode_outer(segment)
               -- expand struct fields if element is struct
            ELIF segment is StructType:
               working_df = working_df.select(segment.*)

    -- 4b. Flatten structs (if configured)
    IF exp.flatten_structs IS NOT NULL:
        parse "struct_col -> field1, field2, field3"
        FOR each field:
            working_df = working_df.withColumn(
                "struct_col__field", col("struct_col.field")
            )

    -- 4c. Select configured columns + FK
    final_cols = []
    IF entity.fk_column:
        final_cols.append(entity.fk_column)
    final_cols.extend(parse(exp.select_columns))
    entity_df = working_df.select(final_cols)

    -- 4d. Auto-generate PK if needed
    IF entity.pk_auto_generate:
        entity_df = entity_df.withColumn(
            entity.pk_column,
            concat(fk_col, "_", monotonically_increasing_id)
        )

    -- 4e. Deduplicate and register
    output_tables[entity.entity_name] = entity_df.dropDuplicates()

Step is where you write the output..

FOR each table_name, df IN output_tables:
    df.write.format("delta")
      .mode("overwrite")
      .saveAsTable("catalog.output_schema.{table_name}")

-- Also write a metadata catalog table
metadata_df = build_metadata(entities, output_tables)
metadata_df.write ... saveAsTable("catalog.output_schema._metadata_catalog")

I hope this covers what you were looking for. Good luck!

If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

View solution in original post

4 REPLIES 4

Ashwin_DSA
Databricks Employee
Databricks Employee

Hi @SantiNath_Dey ,

Given you’re dealing with multiple complex JSON structures, I agree with a metadata‑driven approach rather than hard‑coding the flattening logic.

At a high level:

  • Define a small set of config tables:

    • One for patterns... as in which JSON shape/source you’re handling, with a detection rule.
    • One for entities per pattern... which relational tables you want.. for example, customer, orders, order_lines, etc.
    • Perhaps another one for the entity explode paths?
  • Build a generic flattening/orchestration layer that:

    1. Reads the raw JSON.
    2. Uses the pattern config to pick the right pattern.
    3. For each entity in that pattern, follow the configured explode paths (arrays --> child tables) and select the configured columns, including parent keys as FKs.

The initial config could feel a bit heavier, but it scales really well. When a new JSON pattern shows up, you only add rows to the config tables (new pattern, entities, and exploded paths). You don’t touch the flattening/orchestration code at all.

Does that give you a rough idea to start with?

If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

Thanks for your response! Could you please share a high-level sudo code example that explains the config table structure and how to define patterns in a Delta table? Additionally, I'd like to understand how to flatten complex JSON data using a metadata-driven approach.

Currently, I've implemented a flattening logic using a recursive function that processes the entire JSON file, but it's not working as expected for some complex scenarios. I'd also like to restrict the child tables, as not all scenarios require all tables, and this should be a config-driven approach.

If you have any reference sudo code or approach, it would be really helpful. Thanks in advance!

Hi @SantiNath_Dey,

I may not be able to deliver production-grade working code, but I'll walk you through my approach. It's a long post, but no point giving you a concise response that doesn't convey the message.

Let's start with the architecture. 

Flattener_Architecture.jpg

 The framework has three layers, all driven by config.                                                                                      

  •  Detection Layer... When raw JSON arrives (from a UC Volume or a Bronze layer), the engine reads pattern_config and evaluates each detection_rule (a SQL expression like sourceSystem = 'CDS' AND customer IS NOT NULL) against the DataFrame. First match wins. This identifies which pattern to apply and whether to hoist a nested struct as the root (e.g., customer).
  • Orchestration Layer... Once the pattern is identified, the engine pulls all entities from entity_config for that pattern_id, ordered by entity_order. This guarantees parents are processed before children. Each entity row defines the target table name, PK (natural or auto-generated), FK, and its parent in the hierarchy.
  • Flattening Layer... For each entity, the engine reads entity_explode_config to get the explode plan. It walks the explode_path (e.g., orders.orderLines), exploding each intermediate array segment, flattens any configured structs inline (e.g., address -> city, state, pin), selects only the configured columns, and injects the FK from the parent entity. The resulting DataFrame is deduplicated and registered as an output table.

For the output, all entity DataFrames are written as Delta tables to the target Unity Catalog schema. A _metadata_catalog table is also generated. One row per column across all output tables, capturing table name, column name, data type, PK/FK flags, and FK references. This serves as a built-in lineage/governance layer.

The key point is that the three config tables at the top of the diagram are the only things that change when a new JSON format arrives. The engine on the left (the three layers I've described above) never changes.

You asked about how the config tables should be set up. The three config tables will look something like the below. This is just an example. Modify it as required.

Pattern_Config: One row per JSON shape. The detection_rule is a SQL boolean expression evaluated against the raw DataFrame to auto-match the right pattern.

Column

Type

Purpose

pattern_id

STRING

Unique ID (e.g. P001)

pattern_name

STRING

Human-readable name

source_system

STRING

Origin system identifier

detection_rule

STRING

SQL expression, e.g. sourceSystem = 'CDS' AND customer IS NOT NULL

root_path

STRING

Struct to hoist as root (e.g. customer), or NULL

description

STRING

Free text

Entity Config: One row per target table. Defines the relational model...parent-child hierarchy, keys, and processing order.

Column

Type

Purpose

pattern_id

STRING

Links to pattern_config

entity_id

STRING

Unique ID (e.g. E001)

entity_name

STRING

Target Delta table name

parent_entity_id

STRING

Parent entity (NULL for root)

pk_column

STRING

Primary key column name

pk_auto_generate

BOOLEAN

True = engine generates surrogate PK

fk_column

STRING

Foreign key column name

fk_references

STRING

Parent table.column reference

entity_order

INT

Processing sequence (parents first)

Entity Explode Config: One row per entity. Tells the engine how to navigate the JSON tree and which columns to extract.

Column

Type

Purpose

entity_id

STRING

Links to entity_config

explode_path

STRING

Dot-path to the array (e.g. orders.orderLines), NULL for root

explode_type

STRING

root, array_of_struct, or nested_array

select_columns

STRING

Comma-separated column list for this entity

flatten_structs

STRING

Struct flattening rule: struct_col -> field1, field2

alias_map

STRING

Column renames (optional)

Let's now talk about the ERD for the example you gave. I'm using your example as the basis for the demonstration. I'm creating 5 normalised tables from a single nested JSON document as you wanted.  

  • customer_master is the root table, keyed on customerId. It holds the scalar customer fields plus the flattened address struct (address__cityaddress__state, address__pin) and the top-level metadata columns (sourceSystem, ingestionTs).
  • customer_phones and customer_orders are direct children of customer_master, both linked via customerId as FK. Phones has an auto-generated surrogate PK since the source array has no natural key. Orders use orderId as a natural PK.
  • order_lines and order_payments sit one level deeper, both linked to customer_orders via orderId. These come from nested arrays within the orders array. When you run the script, the engine walks the dotted path orders.orderLines and orders. payments automatically, exploding each intermediate array along the way.

All relationships are 1-to-many (one customer has many phones/orders, one order has many lines/payments). The PK/FK pairs are fully defined in entity_config, so the same ER structure can be reproduced from config alone... no hardcoded joins in the engine.

Flattener_ERD.jpg

From a coding perspective, here is how I'll approach it. This is just pseudo code as requested. Use this as reference to build what you need.

Step 1 is to load the config..

patterns    = read_table("pattern_config")      -- all patterns
entities    = read_table("entity_config")        -- ordered by entity_order
explode_cfg = read_table("entity_explode_config") -- keyed by entity_id

 

Step 2 is to read the JSON and detect the pattern.

df_raw = spark.read.json(input_path)

FOR each pattern IN patterns:
    IF df_raw.filter(expr(pattern.detection_rule)).count > 0:
        matched_pattern = pattern
        BREAK

IF no match:
    RAISE "No pattern matched"

Step 3 is to apply root path

IF matched_pattern.root_path IS NOT NULL:
    -- Preserve top-level scalar metadata (sourceSystem, ingestionTs)
    meta_cols = [scalar columns that are NOT the root_path struct]
    df_hoisted = df_raw.select(meta_cols, root_path.*)

ELSE:
    df_hoisted = df_raw

Step 4 is where you process each entity..

 

FOR each entity IN entities WHERE entity.pattern_id = matched_pattern.id:

    exp = explode_cfg[entity.entity_id]

    -- 4a. Resolve explode context
    IF exp.explode_type == "root":
        working_df = df_hoisted

    ELSE:
        working_df = df_hoisted
        FOR each segment IN exp.explode_path.split("."):
            IF segment is ArrayType:
                working_df = working_df.explode_outer(segment)
               -- expand struct fields if element is struct
            ELIF segment is StructType:
               working_df = working_df.select(segment.*)

    -- 4b. Flatten structs (if configured)
    IF exp.flatten_structs IS NOT NULL:
        parse "struct_col -> field1, field2, field3"
        FOR each field:
            working_df = working_df.withColumn(
                "struct_col__field", col("struct_col.field")
            )

    -- 4c. Select configured columns + FK
    final_cols = []
    IF entity.fk_column:
        final_cols.append(entity.fk_column)
    final_cols.extend(parse(exp.select_columns))
    entity_df = working_df.select(final_cols)

    -- 4d. Auto-generate PK if needed
    IF entity.pk_auto_generate:
        entity_df = entity_df.withColumn(
            entity.pk_column,
            concat(fk_col, "_", monotonically_increasing_id)
        )

    -- 4e. Deduplicate and register
    output_tables[entity.entity_name] = entity_df.dropDuplicates()

Step is where you write the output..

FOR each table_name, df IN output_tables:
    df.write.format("delta")
      .mode("overwrite")
      .saveAsTable("catalog.output_schema.{table_name}")

-- Also write a metadata catalog table
metadata_df = build_metadata(entities, output_tables)
metadata_df.write ... saveAsTable("catalog.output_schema._metadata_catalog")

I hope this covers what you were looking for. Good luck!

If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

SantiNath_Dey
New Contributor II

Thanks for the quick response! I'm going to dive in and start building the implementation now.