- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
I'm migrating code from Python Linux to Databricks PySpark. I have many mappings like this:
{
"main": {
"honda": 1.0,
"toyota": 2.9,
"BMW": 5.77,
"Fiat": 4.5,
},
}
I exported using json.dump, saved to s3 and was able to import with spark.read.json, but that puts it into a dataframe with nested objects, like this. I can access the objects with select but don't know how to apply it as a mapping to another dataframe.
Another approach is to save as JSON Lines and read in as a dictionary? From what I understand PySpark prefers to have JSON lines rather than usual JSON, like this:
{"main":{"honda":1,"toyota":2,"BMW":5,"Fiat":4}}
Ok so I manually created the dict and mapping. Given a dictionary car_map with the values, and a dataframe df:
- Labels:
-
Spark
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
For migrating your Python dictionary mappings to PySpark, you have several good options. Let's examine the approaches and identify the best solution.
Using F.create_map (Your Current Approach)
Your current approach using `F.create_map` is actually quite efficient:
```python
from pyspark.sql import functions as F
# Your dictionary
car_map = {
"honda": 1.0,
"toyota": 2.9,
"BMW": 5.77,
"Fiat": 4.5
}
Create a map column and apply it
map_col = F.create_map([F.lit(x) for i in car_map.items() for x in i])
df = df.withColumn('mapped_value', map_col[F.col('car')])
```
This is a clean approach that works well for smaller mappings and doesn't require any intermediate storage.
Using Broadcast Join (Best for Large Mappings)
For larger mappings, a broadcast join is often more efficient:
```python
Convert your dictionary to a DataFrame
map_df = spark.createDataFrame([(k, v) for k, v in car_map.items()], ["car", "value"])
Broadcast join with your original DataFrame
df = df.join(F.broadcast(map_df), "car", "left").na.fill(0, ["value"])
```Using JSON Lines Approach
If you prefer to store your mappings as files for reuse:
1. Create JSON Lines from your Python dictionary:
```python
import json
# For a simple flat dictionary
with open("car_mapping.jsonl", "w") as f:
for key, value in car_map.items():
f.write(json.dumps({"car": key, "value": value}) + "\n")
# For nested dictionaries like your original example
nested_map = {"main": car_map}
with open("nested_car_mapping.jsonl", "w") as f:
for category, mappings in nested_map.items():
for key, value in mappings.items():
f.write(json.dumps({"category": category, "car": key, "value": value}) + "\n")
```
2. Read it in Databricks:
```python
map_df = spark.read.json("dbfs:/path/to/car_mapping.jsonl")
df = df.join(F.broadcast(map_df), "car", "left")
```
## Recommendation
1. For small to medium mappings: Your current `F.create_map` approach is excellent - it's clean, efficient, and doesn't require intermediate storage.
2. For large mappings (thousands of entries): Use the broadcast join approach, which scales better.
3. For mappings that need to be reused across sessions: Store as JSON Lines and load with `spark.read.json()` followed by a broadcast join.
The JSON Lines format (one JSON object per line) is indeed preferred in Spark over nested JSON, as it allows for parallel processing and is more efficient for distributed systems.
If you need to maintain the nested structure, you can also use a UDF, but this is generally less efficient than the other approaches:
```python
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
# Create a UDF that looks up values in your dictionary
@udf(returnType=DoubleType())
def map_car(car_name):
return car_map.get(car_name, 0.0)
# Apply the UDF
df = df.withColumn("mapped_value", map_car(F.col("car")))
```
Overall, your `F.create_map` approach is already quite good for most use cases!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
For migrating your Python dictionary mappings to PySpark, you have several good options. Let's examine the approaches and identify the best solution.
Using F.create_map (Your Current Approach)
Your current approach using `F.create_map` is actually quite efficient:
```python
from pyspark.sql import functions as F
# Your dictionary
car_map = {
"honda": 1.0,
"toyota": 2.9,
"BMW": 5.77,
"Fiat": 4.5
}
Create a map column and apply it
map_col = F.create_map([F.lit(x) for i in car_map.items() for x in i])
df = df.withColumn('mapped_value', map_col[F.col('car')])
```
This is a clean approach that works well for smaller mappings and doesn't require any intermediate storage.
Using Broadcast Join (Best for Large Mappings)
For larger mappings, a broadcast join is often more efficient:
```python
Convert your dictionary to a DataFrame
map_df = spark.createDataFrame([(k, v) for k, v in car_map.items()], ["car", "value"])
Broadcast join with your original DataFrame
df = df.join(F.broadcast(map_df), "car", "left").na.fill(0, ["value"])
```Using JSON Lines Approach
If you prefer to store your mappings as files for reuse:
1. Create JSON Lines from your Python dictionary:
```python
import json
# For a simple flat dictionary
with open("car_mapping.jsonl", "w") as f:
for key, value in car_map.items():
f.write(json.dumps({"car": key, "value": value}) + "\n")
# For nested dictionaries like your original example
nested_map = {"main": car_map}
with open("nested_car_mapping.jsonl", "w") as f:
for category, mappings in nested_map.items():
for key, value in mappings.items():
f.write(json.dumps({"category": category, "car": key, "value": value}) + "\n")
```
2. Read it in Databricks:
```python
map_df = spark.read.json("dbfs:/path/to/car_mapping.jsonl")
df = df.join(F.broadcast(map_df), "car", "left")
```
## Recommendation
1. For small to medium mappings: Your current `F.create_map` approach is excellent - it's clean, efficient, and doesn't require intermediate storage.
2. For large mappings (thousands of entries): Use the broadcast join approach, which scales better.
3. For mappings that need to be reused across sessions: Store as JSON Lines and load with `spark.read.json()` followed by a broadcast join.
The JSON Lines format (one JSON object per line) is indeed preferred in Spark over nested JSON, as it allows for parallel processing and is more efficient for distributed systems.
If you need to maintain the nested structure, you can also use a UDF, but this is generally less efficient than the other approaches:
```python
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
# Create a UDF that looks up values in your dictionary
@udf(returnType=DoubleType())
def map_car(car_name):
return car_map.get(car_name, 0.0)
# Apply the UDF
df = df.withColumn("mapped_value", map_car(F.col("car")))
```
Overall, your `F.create_map` approach is already quite good for most use cases!

