cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

What's the best way to get from Python dict > JSON > PySpark and apply as a mapping to a dataframe?

397973
New Contributor III

I'm migrating code from Python Linux to Databricks PySpark. I have many mappings like this: 

{
    "main": {
    "honda": 1.0,
    "toyota": 2.9,
    "BMW": 5.77,
    "Fiat": 4.5,
    },
}

I exported using json.dump, saved to s3 and was able to import with spark.read.json, but that puts it into a dataframe with nested objects, like this. I can access the objects with select but don't know how to apply it as a mapping to another dataframe. 

 

397973_0-1743620626332.png

Another approach is to save as JSON Lines and read in as a dictionary? From what I understand PySpark prefers to have JSON lines rather than usual JSON, like this: 

{"main":{"honda":1,"toyota":2,"BMW":5,"Fiat":4}}

Ok so I manually created the dict and mapping. Given a dictionary car_map with the values, and a dataframe df:

 

map_col = F.create_map([F.lit(x) for i in car_map.items() for x in i])
df.withColumn('x', map_col[F.col('car')]).display()
 
So then how do I create the JSON lines in Python? It would be a list of dicts but I need to label them. That would make it a JSON rather than JSON lines? 
 
What approach do you think is better, or is there another? 
1 ACCEPTED SOLUTION

Accepted Solutions

BigRoux
Databricks Employee
Databricks Employee

For migrating your Python dictionary mappings to PySpark, you have several good options. Let's examine the approaches and identify the best solution.

Using F.create_map (Your Current Approach)

Your current approach using `F.create_map` is actually quite efficient:

```python
from pyspark.sql import functions as F

# Your dictionary
car_map = {
"honda": 1.0,
"toyota": 2.9,
"BMW": 5.77,
"Fiat": 4.5
}

Create a map column and apply it
map_col = F.create_map([F.lit(x) for i in car_map.items() for x in i])
df = df.withColumn('mapped_value', map_col[F.col('car')])
```

This is a clean approach that works well for smaller mappings and doesn't require any intermediate storage.

Using Broadcast Join (Best for Large Mappings)

For larger mappings, a broadcast join is often more efficient:

```python
Convert your dictionary to a DataFrame
map_df = spark.createDataFrame([(k, v) for k, v in car_map.items()], ["car", "value"])

Broadcast join with your original DataFrame
df = df.join(F.broadcast(map_df), "car", "left").na.fill(0, ["value"])
```Using JSON Lines Approach

If you prefer to store your mappings as files for reuse:

1. Create JSON Lines from your Python dictionary:

```python
import json

# For a simple flat dictionary
with open("car_mapping.jsonl", "w") as f:
for key, value in car_map.items():
f.write(json.dumps({"car": key, "value": value}) + "\n")

# For nested dictionaries like your original example
nested_map = {"main": car_map}
with open("nested_car_mapping.jsonl", "w") as f:
for category, mappings in nested_map.items():
for key, value in mappings.items():
f.write(json.dumps({"category": category, "car": key, "value": value}) + "\n")
```

2. Read it in Databricks:

```python
map_df = spark.read.json("dbfs:/path/to/car_mapping.jsonl")
df = df.join(F.broadcast(map_df), "car", "left")
```

## Recommendation

1. For small to medium mappings: Your current `F.create_map` approach is excellent - it's clean, efficient, and doesn't require intermediate storage.

2. For large mappings (thousands of entries): Use the broadcast join approach, which scales better.

3. For mappings that need to be reused across sessions: Store as JSON Lines and load with `spark.read.json()` followed by a broadcast join.

The JSON Lines format (one JSON object per line) is indeed preferred in Spark over nested JSON, as it allows for parallel processing and is more efficient for distributed systems.

If you need to maintain the nested structure, you can also use a UDF, but this is generally less efficient than the other approaches:

```python
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Create a UDF that looks up values in your dictionary
@udf(returnType=DoubleType())
def map_car(car_name):
return car_map.get(car_name, 0.0)

# Apply the UDF
df = df.withColumn("mapped_value", map_car(F.col("car")))
```

Overall, your `F.create_map` approach is already quite good for most use cases!

View solution in original post

1 REPLY 1

BigRoux
Databricks Employee
Databricks Employee

For migrating your Python dictionary mappings to PySpark, you have several good options. Let's examine the approaches and identify the best solution.

Using F.create_map (Your Current Approach)

Your current approach using `F.create_map` is actually quite efficient:

```python
from pyspark.sql import functions as F

# Your dictionary
car_map = {
"honda": 1.0,
"toyota": 2.9,
"BMW": 5.77,
"Fiat": 4.5
}

Create a map column and apply it
map_col = F.create_map([F.lit(x) for i in car_map.items() for x in i])
df = df.withColumn('mapped_value', map_col[F.col('car')])
```

This is a clean approach that works well for smaller mappings and doesn't require any intermediate storage.

Using Broadcast Join (Best for Large Mappings)

For larger mappings, a broadcast join is often more efficient:

```python
Convert your dictionary to a DataFrame
map_df = spark.createDataFrame([(k, v) for k, v in car_map.items()], ["car", "value"])

Broadcast join with your original DataFrame
df = df.join(F.broadcast(map_df), "car", "left").na.fill(0, ["value"])
```Using JSON Lines Approach

If you prefer to store your mappings as files for reuse:

1. Create JSON Lines from your Python dictionary:

```python
import json

# For a simple flat dictionary
with open("car_mapping.jsonl", "w") as f:
for key, value in car_map.items():
f.write(json.dumps({"car": key, "value": value}) + "\n")

# For nested dictionaries like your original example
nested_map = {"main": car_map}
with open("nested_car_mapping.jsonl", "w") as f:
for category, mappings in nested_map.items():
for key, value in mappings.items():
f.write(json.dumps({"category": category, "car": key, "value": value}) + "\n")
```

2. Read it in Databricks:

```python
map_df = spark.read.json("dbfs:/path/to/car_mapping.jsonl")
df = df.join(F.broadcast(map_df), "car", "left")
```

## Recommendation

1. For small to medium mappings: Your current `F.create_map` approach is excellent - it's clean, efficient, and doesn't require intermediate storage.

2. For large mappings (thousands of entries): Use the broadcast join approach, which scales better.

3. For mappings that need to be reused across sessions: Store as JSON Lines and load with `spark.read.json()` followed by a broadcast join.

The JSON Lines format (one JSON object per line) is indeed preferred in Spark over nested JSON, as it allows for parallel processing and is more efficient for distributed systems.

If you need to maintain the nested structure, you can also use a UDF, but this is generally less efficient than the other approaches:

```python
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Create a UDF that looks up values in your dictionary
@udf(returnType=DoubleType())
def map_car(car_name):
return car_map.get(car_name, 0.0)

# Apply the UDF
df = df.withColumn("mapped_value", map_car(F.col("car")))
```

Overall, your `F.create_map` approach is already quite good for most use cases!

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now