- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-04-2026 09:39 AM
Greetings @Jake3 ,
Most of the runtime here is coming from two things:
-
You’re pulling everything into single-node pandas via toPandas().
-
You’re re-doing a lot of per-combination work inside the big for combo in all_rows loop, including a full df.copy() and repeated groupbys.
Below are changes that keep the same estimator but reduce work. I’m going to separate “safe” optimizations (no change in logic) from “aggressive” ones (can slightly change displayed output or shape).
-
Push as much as possible to Spark before toPandas()
If df is a Spark DataFrame and the input table is large, make sure you shrink it before calling this function:
# Example wrapper before calling taylor_row_percent
needed_cols = set(row_domains + [measure] + list(exclusions.keys()))
if weight_col:
needed_cols.add(weight_col)
if strata_col:
needed_cols.add(strata_col)
if isinstance(fpc, str):
needed_cols.add(fpc)
sdf_small = sdf.select(*needed_cols)
result = taylor_row_percent(
sdf_small, row_domains, measure, exclusions,
weight_col=weight_col, strata_col=strata_col, fpc=fpc, ...
)
This keeps the math identical, but reduces bytes transferred and processed in pandas.
-
Don’t copy the whole DataFrame for every combo
Inside the main loop you do:
df2 = df.copy()
df2["R"] = mask_row.astype(float)
df2["C"] = mask_cell.astype(float)
df2["u"] = (df2[weight_col] / W_row) * (df2["C"] - p_hat * df2["R"])
for h, g in df2.groupby(strata_col):
...
That df.copy() per combination is extremely expensive.
You can reuse the same DataFrame and only overwrite temporary columns; that keeps the math and the order of operations the same:
# Prepare once, after step 3 (FPC handling)
df["R"] = 0.0
df["C"] = 0.0
df["u"] = 0.0
...
for combo in all_rows:
dom_vals = combo[:-1]
measure_val = combo[-1]
mask_row = df["scope_fg"] == 1
for c, v in zip(row_domains, dom_vals):
mask_row &= (df[c] == v)
mask_cell = mask_row & (df[measure] == measure_val)
n_row = int(mask_row.sum())
n_cell = int(mask_cell.sum())
if n_row == 0:
...
else:
W_row = df.loc[mask_row, weight_col].sum()
W_cell = df.loc[mask_cell, weight_col].sum()
p_hat = W_cell / W_row if W_row > 0 else np.nan
# overwrite temporary columns in-place
df["R"] = mask_row.astype(float)
df["C"] = mask_cell.astype(float)
df["u"] = (df[weight_col] / W_row) * (df["C"] - p_hat * df["R"])
var_p = 0.0
for h, g in df.groupby(strata_col):
n_h = len(g)
if n_h <= 1:
continue
N_h = g[fpc_col].iloc[0]
f_h = n_h / N_h if np.isfinite(N_h) else 0.0
S2 = g["u"].var(ddof=1)
var_p += (1 - f_h) * n_h * S2
se_p = np.sqrt(var_p) if var_p > 0 else np.nan
Key points:
-
We don’t change how u is computed or how variance is aggregated.
-
We reuse df and group on it directly.
-
You should get the same standard errors (modulo normal floating-point noise), but much faster and with less memory churn.
-
Drop out-of-scope records early (still same results)
Right now you carry all rows and always AND with scope_fg == 1. Once scope_fg is fully computed, you can safely filter to in-scope records exactly once:
# 4. Scope flag (same as your code up to this point)
df["scope_fg"] = 1
for var, excl_vals in exclusions.items():
for val in excl_vals:
if pd.isna(val):
df.loc[df[var].isna(), "scope_fg"] = 0
else:
df.loc[df[var] == val, "scope_fg"] = 0
# NEW: drop out-of-scope rows once
df = df[df["scope_fg"] == 1].copy()
df.drop(columns=["scope_fg"], inplace=True)
Then, later:
-
domain_levels and measure_levels effectively used the same filter already (scope_fg == 1).
-
n_row, n_cell and all weights are computed only on these in-scope units anyway.
You’ll need to drop explicit references to scope_fg in the masks:
# before
mask_row = df["scope_fg"] == 1
for c, v in zip(row_domains, dom_vals):
mask_row &= (df[c] == v)
# after filtering to in-scope only:
mask_row = pd.Series(True, index=df.index)
for c, v in zip(row_domains, dom_vals):
mask_row &= (df[c] == v)
This reduces row count for all subsequent operations with no change in the estimator definition.
4. Reduce width: keep only needed columns in pandas
After you’ve handled weight_col, strata_col, fpc_col, and scope_fg, you can trim columns:
keep_cols = set(row_domains + [measure, weight_col, strata_col, fpc_col])
keep_cols |= set(exclusions.keys())
df = df[list(keep_cols)].copy()
This makes copies and groupbys cheaper without touching the math.
-
Why your SEs “change slightly” when you refactor
Whenever you:
-
reorder operations,
-
change from groupby().var() to a custom variance,
-
or change which rows participate in an intermediate computation,you can get small floating-point differences even when the algebra is identical. On survey SEs those show up as tiny differences in displayed standard error and margins of error.
If you need exact reproduction, keep:
-
the same sample (df),
-
the same grouping logic,
-
the same ddof and variance method (.var(ddof=1)),
-
the same order of operations (especially how you aggregate across strata).
The safe steps above (Spark pre-filter, dropping out-of-scope once, reusing df instead of df.copy() in the loop) preserve that.
-
More aggressive speedups (may change the output shape)
Only if you’re willing to change what rows you output, there are big speedups available:
-
Instead of product(*domain_levels.values(), measure_levels), drive the loop from the actual observed combinations using groupby(row_domains + [measure]).
-
That avoids computing rows for combinations that don’t appear in the data (which currently get na/np).
This can be dramatically faster for high-cardinality domains, but your output will no longer include all “zero-cell” combinations.
If you paste how big your typical data is (rows, distinct strata, distinct domain levels), I can propose a tighter rewrite that keeps the estimator identical but gets you closer to “linear in N” rather than “N × #cells”.
Hope this helps, Louis.