3 weeks ago
I have the following code in databricks under serverless and i want to know how to improve it to make it more efficient and run faster without having the results change (standard errors change slightly when i try to improve it):
3 weeks ago
Greetings @Jake3 ,
Most of the runtime here is coming from two things:
You’re pulling everything into single-node pandas via toPandas().
You’re re-doing a lot of per-combination work inside the big for combo in all_rows loop, including a full df.copy() and repeated groupbys.
Below are changes that keep the same estimator but reduce work. I’m going to separate “safe” optimizations (no change in logic) from “aggressive” ones (can slightly change displayed output or shape).
Push as much as possible to Spark before toPandas()
If df is a Spark DataFrame and the input table is large, make sure you shrink it before calling this function:
# Example wrapper before calling taylor_row_percent
needed_cols = set(row_domains + [measure] + list(exclusions.keys()))
if weight_col:
needed_cols.add(weight_col)
if strata_col:
needed_cols.add(strata_col)
if isinstance(fpc, str):
needed_cols.add(fpc)
sdf_small = sdf.select(*needed_cols)
result = taylor_row_percent(
sdf_small, row_domains, measure, exclusions,
weight_col=weight_col, strata_col=strata_col, fpc=fpc, ...
)
This keeps the math identical, but reduces bytes transferred and processed in pandas.
Don’t copy the whole DataFrame for every combo
Inside the main loop you do:
df2 = df.copy()
df2["R"] = mask_row.astype(float)
df2["C"] = mask_cell.astype(float)
df2["u"] = (df2[weight_col] / W_row) * (df2["C"] - p_hat * df2["R"])
for h, g in df2.groupby(strata_col):
...
That df.copy() per combination is extremely expensive.
You can reuse the same DataFrame and only overwrite temporary columns; that keeps the math and the order of operations the same:
# Prepare once, after step 3 (FPC handling)
df["R"] = 0.0
df["C"] = 0.0
df["u"] = 0.0
...
for combo in all_rows:
dom_vals = combo[:-1]
measure_val = combo[-1]
mask_row = df["scope_fg"] == 1
for c, v in zip(row_domains, dom_vals):
mask_row &= (df[c] == v)
mask_cell = mask_row & (df[measure] == measure_val)
n_row = int(mask_row.sum())
n_cell = int(mask_cell.sum())
if n_row == 0:
...
else:
W_row = df.loc[mask_row, weight_col].sum()
W_cell = df.loc[mask_cell, weight_col].sum()
p_hat = W_cell / W_row if W_row > 0 else np.nan
# overwrite temporary columns in-place
df["R"] = mask_row.astype(float)
df["C"] = mask_cell.astype(float)
df["u"] = (df[weight_col] / W_row) * (df["C"] - p_hat * df["R"])
var_p = 0.0
for h, g in df.groupby(strata_col):
n_h = len(g)
if n_h <= 1:
continue
N_h = g[fpc_col].iloc[0]
f_h = n_h / N_h if np.isfinite(N_h) else 0.0
S2 = g["u"].var(ddof=1)
var_p += (1 - f_h) * n_h * S2
se_p = np.sqrt(var_p) if var_p > 0 else np.nan
Key points:
We don’t change how u is computed or how variance is aggregated.
We reuse df and group on it directly.
You should get the same standard errors (modulo normal floating-point noise), but much faster and with less memory churn.
Drop out-of-scope records early (still same results)
Right now you carry all rows and always AND with scope_fg == 1. Once scope_fg is fully computed, you can safely filter to in-scope records exactly once:
# 4. Scope flag (same as your code up to this point)
df["scope_fg"] = 1
for var, excl_vals in exclusions.items():
for val in excl_vals:
if pd.isna(val):
df.loc[df[var].isna(), "scope_fg"] = 0
else:
df.loc[df[var] == val, "scope_fg"] = 0
# NEW: drop out-of-scope rows once
df = df[df["scope_fg"] == 1].copy()
df.drop(columns=["scope_fg"], inplace=True)
Then, later:
domain_levels and measure_levels effectively used the same filter already (scope_fg == 1).
n_row, n_cell and all weights are computed only on these in-scope units anyway.
You’ll need to drop explicit references to scope_fg in the masks:
# before
mask_row = df["scope_fg"] == 1
for c, v in zip(row_domains, dom_vals):
mask_row &= (df[c] == v)
# after filtering to in-scope only:
mask_row = pd.Series(True, index=df.index)
for c, v in zip(row_domains, dom_vals):
mask_row &= (df[c] == v)
This reduces row count for all subsequent operations with no change in the estimator definition.
4. Reduce width: keep only needed columns in pandas
After you’ve handled weight_col, strata_col, fpc_col, and scope_fg, you can trim columns:
keep_cols = set(row_domains + [measure, weight_col, strata_col, fpc_col])
keep_cols |= set(exclusions.keys())
df = df[list(keep_cols)].copy()
This makes copies and groupbys cheaper without touching the math.
Why your SEs “change slightly” when you refactor
Whenever you:
reorder operations,
change from groupby().var() to a custom variance,
or change which rows participate in an intermediate computation,you can get small floating-point differences even when the algebra is identical. On survey SEs those show up as tiny differences in displayed standard error and margins of error.
If you need exact reproduction, keep:
the same sample (df),
the same grouping logic,
the same ddof and variance method (.var(ddof=1)),
the same order of operations (especially how you aggregate across strata).
The safe steps above (Spark pre-filter, dropping out-of-scope once, reusing df instead of df.copy() in the loop) preserve that.
More aggressive speedups (may change the output shape)
Only if you’re willing to change what rows you output, there are big speedups available:
Instead of product(*domain_levels.values(), measure_levels), drive the loop from the actual observed combinations using groupby(row_domains + [measure]).
That avoids computing rows for combinations that don’t appear in the data (which currently get na/np).
This can be dramatically faster for high-cardinality domains, but your output will no longer include all “zero-cell” combinations.
If you paste how big your typical data is (rows, distinct strata, distinct domain levels), I can propose a tighter rewrite that keeps the estimator identical but gets you closer to “linear in N” rather than “N × #cells”.
Hope this helps, Louis.
3 weeks ago
Greetings @Jake3 ,
Most of the runtime here is coming from two things:
You’re pulling everything into single-node pandas via toPandas().
You’re re-doing a lot of per-combination work inside the big for combo in all_rows loop, including a full df.copy() and repeated groupbys.
Below are changes that keep the same estimator but reduce work. I’m going to separate “safe” optimizations (no change in logic) from “aggressive” ones (can slightly change displayed output or shape).
Push as much as possible to Spark before toPandas()
If df is a Spark DataFrame and the input table is large, make sure you shrink it before calling this function:
# Example wrapper before calling taylor_row_percent
needed_cols = set(row_domains + [measure] + list(exclusions.keys()))
if weight_col:
needed_cols.add(weight_col)
if strata_col:
needed_cols.add(strata_col)
if isinstance(fpc, str):
needed_cols.add(fpc)
sdf_small = sdf.select(*needed_cols)
result = taylor_row_percent(
sdf_small, row_domains, measure, exclusions,
weight_col=weight_col, strata_col=strata_col, fpc=fpc, ...
)
This keeps the math identical, but reduces bytes transferred and processed in pandas.
Don’t copy the whole DataFrame for every combo
Inside the main loop you do:
df2 = df.copy()
df2["R"] = mask_row.astype(float)
df2["C"] = mask_cell.astype(float)
df2["u"] = (df2[weight_col] / W_row) * (df2["C"] - p_hat * df2["R"])
for h, g in df2.groupby(strata_col):
...
That df.copy() per combination is extremely expensive.
You can reuse the same DataFrame and only overwrite temporary columns; that keeps the math and the order of operations the same:
# Prepare once, after step 3 (FPC handling)
df["R"] = 0.0
df["C"] = 0.0
df["u"] = 0.0
...
for combo in all_rows:
dom_vals = combo[:-1]
measure_val = combo[-1]
mask_row = df["scope_fg"] == 1
for c, v in zip(row_domains, dom_vals):
mask_row &= (df[c] == v)
mask_cell = mask_row & (df[measure] == measure_val)
n_row = int(mask_row.sum())
n_cell = int(mask_cell.sum())
if n_row == 0:
...
else:
W_row = df.loc[mask_row, weight_col].sum()
W_cell = df.loc[mask_cell, weight_col].sum()
p_hat = W_cell / W_row if W_row > 0 else np.nan
# overwrite temporary columns in-place
df["R"] = mask_row.astype(float)
df["C"] = mask_cell.astype(float)
df["u"] = (df[weight_col] / W_row) * (df["C"] - p_hat * df["R"])
var_p = 0.0
for h, g in df.groupby(strata_col):
n_h = len(g)
if n_h <= 1:
continue
N_h = g[fpc_col].iloc[0]
f_h = n_h / N_h if np.isfinite(N_h) else 0.0
S2 = g["u"].var(ddof=1)
var_p += (1 - f_h) * n_h * S2
se_p = np.sqrt(var_p) if var_p > 0 else np.nan
Key points:
We don’t change how u is computed or how variance is aggregated.
We reuse df and group on it directly.
You should get the same standard errors (modulo normal floating-point noise), but much faster and with less memory churn.
Drop out-of-scope records early (still same results)
Right now you carry all rows and always AND with scope_fg == 1. Once scope_fg is fully computed, you can safely filter to in-scope records exactly once:
# 4. Scope flag (same as your code up to this point)
df["scope_fg"] = 1
for var, excl_vals in exclusions.items():
for val in excl_vals:
if pd.isna(val):
df.loc[df[var].isna(), "scope_fg"] = 0
else:
df.loc[df[var] == val, "scope_fg"] = 0
# NEW: drop out-of-scope rows once
df = df[df["scope_fg"] == 1].copy()
df.drop(columns=["scope_fg"], inplace=True)
Then, later:
domain_levels and measure_levels effectively used the same filter already (scope_fg == 1).
n_row, n_cell and all weights are computed only on these in-scope units anyway.
You’ll need to drop explicit references to scope_fg in the masks:
# before
mask_row = df["scope_fg"] == 1
for c, v in zip(row_domains, dom_vals):
mask_row &= (df[c] == v)
# after filtering to in-scope only:
mask_row = pd.Series(True, index=df.index)
for c, v in zip(row_domains, dom_vals):
mask_row &= (df[c] == v)
This reduces row count for all subsequent operations with no change in the estimator definition.
4. Reduce width: keep only needed columns in pandas
After you’ve handled weight_col, strata_col, fpc_col, and scope_fg, you can trim columns:
keep_cols = set(row_domains + [measure, weight_col, strata_col, fpc_col])
keep_cols |= set(exclusions.keys())
df = df[list(keep_cols)].copy()
This makes copies and groupbys cheaper without touching the math.
Why your SEs “change slightly” when you refactor
Whenever you:
reorder operations,
change from groupby().var() to a custom variance,
or change which rows participate in an intermediate computation,you can get small floating-point differences even when the algebra is identical. On survey SEs those show up as tiny differences in displayed standard error and margins of error.
If you need exact reproduction, keep:
the same sample (df),
the same grouping logic,
the same ddof and variance method (.var(ddof=1)),
the same order of operations (especially how you aggregate across strata).
The safe steps above (Spark pre-filter, dropping out-of-scope once, reusing df instead of df.copy() in the loop) preserve that.
More aggressive speedups (may change the output shape)
Only if you’re willing to change what rows you output, there are big speedups available:
Instead of product(*domain_levels.values(), measure_levels), drive the loop from the actual observed combinations using groupby(row_domains + [measure]).
That avoids computing rows for combinations that don’t appear in the data (which currently get na/np).
This can be dramatically faster for high-cardinality domains, but your output will no longer include all “zero-cell” combinations.
If you paste how big your typical data is (rows, distinct strata, distinct domain levels), I can propose a tighter rewrite that keeps the estimator identical but gets you closer to “linear in N” rather than “N × #cells”.
Hope this helps, Louis.
3 weeks ago
I would definitely agree with Louis on this. He provided a some great suggestions even from the code/logic perspective.
Couple of things I would suggest while developing logic.
1. As Louis mentioned, when we use Databricks always think about Spark(Distributed compute framework), instead of Pandas. Transform your data as needed, filter out only necessary data to run your model using Taylor.
2. Based on the data size, use Spark UI to check the logs and utilisation of your memory, CPU, storage this helps if you need to change your compute type. Since your whole logic is written in Pandas it's likely to be that only your Driver node is working. So, use standalone cluster instead of all purpose cluster (if you want to keep everything in Pandas). If you observe your CPU or memory utilisation 100%, then try increase the capacity of the node. I would suggest use memory optimized nodes.
3. As Louis mentioned divide your data into chunks and then run your model instead of feeding all the data at once.
Hope this helps.
Regards,
Saicharan
a week ago
Thank you!
2 weeks ago
Hi @Jake3,
Your Taylor-linearisation row-percent estimator is well structured. The main performance bottleneck is the Python-level loop over every domain/measure combination, with a full DataFrame copy (df.copy()) happening inside each iteration. Here are concrete changes that should give you a significant speedup without altering the statistical results.
OPTIMIZATION 1: REMOVE THE df.copy() INSIDE THE LOOP
The line df2 = df.copy() creates an entire copy of your DataFrame for every single domain/measure combination. That is the single most expensive operation in your code. Instead, compute the "u" column directly on sliced views and avoid the copy entirely:
# Replace the df2 block with direct computation: w_arr = df.loc[mask_row, weight_col].values c_arr = mask_cell[mask_row].astype(float).values r_arr = np.ones(n_row) u_arr = (w_arr / W_row) * (c_arr - p_hat * r_arr)
Then in the strata variance loop, group by strata within the mask_row subset rather than copying the entire DataFrame.
OPTIMIZATION 2: USE groupby INSTEAD OF LOOPING OVER COMBINATIONS
Instead of building all_rows via itertools.product and looping, use pandas groupby to iterate only over combinations that actually exist in your data:
group_cols = row_domains + [measure] grouped = df.loc[df["scope_fg"] == 1].groupby(group_cols, observed=True)
This automatically skips empty combinations and avoids repeated Boolean mask construction for each combo. For the domain totals (W_row), pre-compute them with a separate groupby on just the row_domains:
domain_totals = (
df.loc[df["scope_fg"] == 1]
.groupby(row_domains, observed=True)[weight_col]
.sum()
)
Then look up the domain total for each group with a simple index lookup.
OPTIMIZATION 3: VECTORIZE THE STRATA VARIANCE CALCULATION
The inner loop over strata can be vectorized using groupby + transform:
# Pre-compute per-stratum stats in one pass
strata_groups = sub_df.groupby(strata_col)
n_h_map = strata_groups["u"].transform("count")
mean_u = strata_groups["u"].transform("mean")
# Variance contribution per record
sub_df["u_dev_sq"] = (sub_df["u"] - mean_u) ** 2
strata_stats = sub_df.groupby(strata_col).agg(
n_h=("u", "count"),
S2=("u_dev_sq", lambda x: x.sum() / (len(x) - 1) if len(x) > 1 else 0),
N_h=(fpc_col, "first")
)
strata_stats["f_h"] = np.where(
np.isfinite(strata_stats["N_h"]),
strata_stats["n_h"] / strata_stats["N_h"],
0.0
)
var_p = ((1 - strata_stats["f_h"]) * strata_stats["n_h"] * strata_stats["S2"]).sum()
OPTIMIZATION 4: PRE-COMPUTE MASKS WITH CATEGORICAL COLUMNS
Convert your domain and measure columns to pandas Categorical dtype before processing. This makes groupby and equality comparisons faster, especially with string columns:
for c in row_domains + [measure]:
df[c] = df[c].astype("category")
OPTIMIZATION 5: AVOID .toPandas() ON LARGE DATASETS
If your Spark DataFrame is very large, the .toPandas() call collects everything to the driver. Consider whether you can filter or aggregate in Spark first. For example, if you only need in-scope records:
# Filter in Spark before collecting
spark_df = spark_df.filter(~col("some_col").isin(exclusion_values))
df = spark_df.toPandas()
You can also enable Apache Arrow for faster Spark-to-pandas conversion:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
This is typically enabled by default on serverless, but worth confirming on other compute types.
PUTTING IT ALL TOGETHER
Here is a refactored version of the main estimation loop that incorporates optimizations 1-3:
# Pre-compute domain-level weighted totals
inscope = df[df["scope_fg"] == 1].copy()
for c in row_domains + [measure]:
inscope[c] = inscope[c].astype("category")
domain_W = inscope.groupby(row_domains, observed=True)[weight_col].sum()
domain_N = inscope.groupby(row_domains, observed=True)[weight_col].count()
group_cols = row_domains + [measure]
cell_agg = inscope.groupby(group_cols, observed=True).agg(
W_cell=(weight_col, "sum"),
n_cell=(weight_col, "count")
).reset_index()
# Merge domain totals
cell_agg = cell_agg.merge(
domain_W.rename("W_row").reset_index(),
on=row_domains, how="left"
)
cell_agg = cell_agg.merge(
domain_N.rename("n_row").reset_index(),
on=row_domains, how="left"
)
cell_agg["p_hat"] = cell_agg["W_cell"] / cell_agg["W_row"]
# Then loop only for Taylor variance (which needs record-level data)
# but now you skip empty combos and avoid df.copy()
This approach computes the point estimates (p_hat) in one vectorized pass. You only need the record-level loop for the Taylor variance computation, which still needs individual "u" values per stratum.
A NOTE ON NUMERICAL PRECISION
You mentioned that standard errors change slightly when you try to optimize. This is almost always caused by floating-point summation order differences. If you switch from a Python loop to vectorized operations, the order in which values are summed may change slightly. To verify correctness:
1. Compare results on a small test dataset where you can verify by hand.
2. Check that differences are within floating-point tolerance (typically 1e-10 or smaller).
3. If you need exact reproducibility with SAS output, keep the loop-based variance computation but apply the other optimizations (removing df.copy(), pre-computing domain totals).
REFERENCE DOCUMENTATION
- Apache Arrow optimization for pandas conversion: https://docs.databricks.com/en/pandas/pandas-function-apis.html
- Performance tuning on Databricks: https://docs.databricks.com/en/optimizations/index.html
- pandas best practices for performance: https://pandas.pydata.org/docs/user_guide/enhancingperf.html
* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.
If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.