Source code for geostep.power

import warnings
from typing import List, Optional

import numpy as np
import pandas as pd
from joblib import Parallel, delayed  # type: ignore
from scipy.stats import ttest_ind

from .logger import logger

# Suppress warnings from t-test with constant data, which can occur in simulations
warnings.filterwarnings("ignore", category=RuntimeWarning)


def _single_simulation(
    historical_data: pd.DataFrame,
    geo_col: str,
    date_col: str,
    kpi_col: str,
    effect_size: float,
    test_weeks: int,
    pre_period_weeks: int,
    alpha: float,
) -> Optional[bool]:
    """
    Performs a single iteration of a power simulation.
    Internal helper function.
    """
    unique_dates = np.sort(historical_data[date_col].unique())
    max_start_date_index = len(unique_dates) - (test_weeks + pre_period_weeks)

    if max_start_date_index < 0:
        return None  # Not enough data for this duration

    start_date_index = np.random.randint(0, max_start_date_index)
    start_date = unique_dates[start_date_index]
    pre_period_start = unique_dates[start_date_index - pre_period_weeks]
    test_period_end = unique_dates[start_date_index + test_weeks]

    pre_data = historical_data[
        (historical_data[date_col] >= pre_period_start)
        & (historical_data[date_col] < start_date)
    ].copy()
    test_data = historical_data[
        (historical_data[date_col] >= start_date)
        & (historical_data[date_col] < test_period_end)
    ].copy()

    unique_geos = historical_data[geo_col].unique()
    treatment_geos = np.random.choice(unique_geos, len(unique_geos) // 2, replace=False)

    pre_avg = pre_data.groupby(geo_col)[kpi_col].mean().reset_index(name="pre_avg")
    test_avg = test_data.groupby(geo_col)[kpi_col].mean().reset_index(name="test_avg")

    if pre_avg.empty or test_avg.empty:
        return None

    merged = pd.merge(pre_avg, test_avg, on=geo_col, how="inner")
    if merged.empty:
        return None

    merged["group"] = merged[geo_col].apply(
        lambda x: "Treatment" if x in treatment_geos else "Control"
    )

    # Ensure both groups are present before applying lift
    if (
        "Treatment" not in merged["group"].unique()
        or "Control" not in merged["group"].unique()
    ):
        return None

    merged.loc[merged["group"] == "Treatment", "test_avg"] *= 1 + effect_size

    merged["lift"] = (merged["test_avg"] / merged["pre_avg"]) - 1

    t_vals = merged[merged["group"] == "Treatment"]["lift"].dropna()
    c_vals = merged[merged["group"] == "Control"]["lift"].dropna()

    if len(t_vals) < 2 or len(c_vals) < 2:
        return None

    _, p_val = ttest_ind(t_vals, c_vals, equal_var=False)

    return bool(p_val < alpha)


[docs] def run_power_analysis( historical_data: pd.DataFrame, geo_col: str, date_col: str, kpi_col: str, effect_sizes: List[float] = [0.01, 0.02, 0.03, 0.05], test_weeks_list: List[int] = [4, 6, 8, 10, 12], pre_period_weeks: int = 8, n_sims: int = 500, alpha: float = 0.05, ) -> pd.DataFrame: """ Runs a full power analysis using historical data to determine the probability of detecting a given effect size. Parameters ---------- historical_data : pd.DataFrame DataFrame with historical data. Must contain geo, date, and KPI columns. geo_col : str Name of the geographic identifier column. date_col : str Name of the date column. Should be weekly or daily. kpi_col : str Name of the Key Performance Indicator column to be measured. effect_sizes : list of float, optional A list of effect sizes (e.g., 0.03 for 3% lift) to simulate. test_weeks_list : list of int, optional A list of test durations (in weeks) to simulate. pre_period_weeks : int, optional The number of weeks to use for the pre-period baseline. n_sims : int, optional The number of simulations to run for each scenario. Higher is more accurate but slower. alpha : float, optional The significance level for the test (Type I error rate). Returns ------- pd.DataFrame A DataFrame summarizing the statistical power for each combination of effect size and test duration. """ historical_data[date_col] = pd.to_datetime(historical_data[date_col]) results = [] for effect in effect_sizes: for weeks in test_weeks_list: sim_results = Parallel(n_jobs=-1)( delayed(_single_simulation)( historical_data, geo_col, date_col, kpi_col, effect, weeks, pre_period_weeks, alpha, ) for _ in range(n_sims) ) valid_results = [res for res in sim_results if res is not None] power = np.mean(valid_results) if valid_results else 0 results.append({"effect_size": effect, "test_weeks": weeks, "power": power}) logger.info( f"Finished: Effect Size={effect*100:.1f}%, Test Weeks={weeks}, " f"Power={power:.2f}" ) return pd.DataFrame(results)