Source code for geostep.analyzer

import logging
from typing import Any, Dict, Optional, Tuple, List
from functools import partial

import joblib
import numpy as np
import pandas as pd
import statsmodels.api as sm  # type: ignore
import statsmodels.formula.api as smf  # type: ignore
from scipy import stats

from .base import BaseAnalyzer
from .constants import TREATMENT
from .exceptions import AnalysisError
from .performance import (
    cache_result, optimize_memory_usage, ParallelConfig, vectorize_operations,
    parallel_apply
)
from .monitoring import MonitoringMixin, monitor_operation
from .schemas import CRTResult, DiDResult, LiftResult, BalanceCheckResult, DiagnosticResult
from .validation import (
    standardize_assignments,
    validate_columns_exist,
    validate_dataframe,
    validate_date_string,
    validate_numeric_column,
)
from .diagnostics import (
    advanced_balance_check,
    calculate_cohens_d,
    calculate_hedges_g,
    multiple_testing_correction,
    perform_placebo_test,
    sensitivity_analysis_rosenbaum
)

logger = logging.getLogger(__name__)


[docs] class LiftAnalyzer(BaseAnalyzer, MonitoringMixin): """Analyzer for lift analysis using normalized lift index."""
[docs] def __init__(self, config: Optional['AnalyzerConfig'] = None, performance_config: Optional[ParallelConfig] = None): """Initialize LiftAnalyzer with configuration. Parameters ---------- config : AnalyzerConfig, optional Configuration for analyzer behavior. performance_config : ParallelConfig, optional Configuration for performance optimizations. """ # Import here to avoid circular imports from .base import AnalyzerConfig # Initialize with config (BaseAnalyzer expects this) super().__init__(config) self.performance_config = performance_config or ParallelConfig()
[docs] def validate_inputs(self, df: pd.DataFrame, **kwargs: Any) -> None: """Validate inputs for lift analysis.""" # Call parent validation first super().validate_inputs(df, **kwargs) geo_col = kwargs.get("geo_col") assignment_col = kwargs.get("assignment_col") date_col = kwargs.get("date_col") kpi_col = kwargs.get("kpi_col") pre_period_end = kwargs.get("pre_period_end") test_period_start = kwargs.get("test_period_start") test_period_end = kwargs.get("test_period_end") assert isinstance(geo_col, str) assert isinstance(assignment_col, str) assert isinstance(date_col, str) assert isinstance(kpi_col, str) assert pre_period_end is not None assert test_period_start is not None assert test_period_end is not None df = validate_dataframe(df, "df") validate_columns_exist(df, [geo_col, assignment_col, date_col, kpi_col]) validate_numeric_column(df, kpi_col) validate_date_string(str(pre_period_end), "pre_period_end") validate_date_string(str(test_period_start), "test_period_start") validate_date_string(str(test_period_end), "test_period_end")
@monitor_operation("prepare_data") def prepare_data(self, df: pd.DataFrame, **kwargs: Any) -> pd.DataFrame: """Prepare data for lift analysis with performance optimizations.""" # Call parent preparation first prepared_df = super().prepare_data(df, **kwargs) geo_col = kwargs["geo_col"] assignment_col = kwargs["assignment_col"] date_col = kwargs["date_col"] kpi_col = kwargs["kpi_col"] pre_period_end = kwargs["pre_period_end"] test_period_start = kwargs["test_period_start"] test_period_end = kwargs["test_period_end"] # Standardize assignments first to ensure string type prepared_df[assignment_col] = standardize_assignments(prepared_df[assignment_col]) # Convert date column BEFORE memory optimization to prevent it being converted to categorical prepared_df[date_col] = pd.to_datetime(prepared_df[date_col]) # Optimize memory usage for the dataframe prepared_df = optimize_memory_usage(prepared_df) # Ensure assignment column remains string after memory optimization # (optimize_memory_usage may convert low-cardinality string columns to categorical) if prepared_df[assignment_col].dtype.name == 'category': prepared_df[assignment_col] = prepared_df[assignment_col].astype(str) # Convert period dates pre_period_end = pd.to_datetime(pre_period_end) test_period_start = pd.to_datetime(test_period_start) test_period_end = pd.to_datetime(test_period_end) # Vectorized period filtering pre_mask = prepared_df[date_col] <= pre_period_end test_mask = (prepared_df[date_col] >= test_period_start) & (prepared_df[date_col] <= test_period_end) # Calculate period averages using vectorized operations pre_avg = prepared_df[pre_mask].groupby(geo_col)[kpi_col].mean().reset_index(name="pre_avg") test_avg = prepared_df[test_mask].groupby(geo_col)[kpi_col].mean().reset_index(name="test_avg") # Create analysis dataframe with efficient merging analysis_df = pd.merge(pre_avg, test_avg, on=geo_col, how="inner") assignments = prepared_df[[geo_col, assignment_col]].drop_duplicates() analysis_df = pd.merge(analysis_df, assignments, on=geo_col, how="left") # Vectorized lift index calculation analysis_df["lift_index"] = np.divide( analysis_df["test_avg"], analysis_df["pre_avg"], out=np.zeros_like(analysis_df["test_avg"]), where=analysis_df["pre_avg"] != 0 ) - 1 # Don't re-optimize analysis_df to preserve string assignment column return analysis_df def _bca_bootstrap( self, df: pd.DataFrame, geo_col: str, assignment_col: str, n_bootstrap: int = 1000, alpha: float = 0.05, random_seed: Optional[int] = None ) -> Tuple[float, float]: """ Calculate bias-corrected and accelerated (BCa) bootstrap confidence intervals. Parameters ---------- df : pd.DataFrame Analysis dataframe with lift_index calculated. geo_col : str Geographic identifier column. assignment_col : str Assignment column name. n_bootstrap : int, optional Number of bootstrap samples (default: 1000). alpha : float, optional Significance level (default: 0.05). random_seed : int, optional Random seed for reproducibility. Returns ------- Tuple[float, float] Lower and upper confidence interval bounds. """ if random_seed: np.random.seed(random_seed) unique_geos = df[geo_col].unique() n_geos = len(unique_geos) # Original estimate treatment_mask = df[assignment_col] == TREATMENT original_estimate = df[treatment_mask]["lift_index"].mean() - df[~treatment_mask]["lift_index"].mean() # Bootstrap estimates bootstrap_estimates = [] for _ in range(n_bootstrap): # Resample clusters with replacement bootstrap_geos = np.random.choice(unique_geos, size=n_geos, replace=True) bootstrap_df = df[df[geo_col].isin(bootstrap_geos)] if len(bootstrap_df) == 0: continue # Calculate bootstrap estimate boot_treatment_mask = bootstrap_df[assignment_col] == TREATMENT if boot_treatment_mask.sum() > 0 and (~boot_treatment_mask).sum() > 0: boot_estimate = ( bootstrap_df[boot_treatment_mask]["lift_index"].mean() - bootstrap_df[~boot_treatment_mask]["lift_index"].mean() ) bootstrap_estimates.append(boot_estimate) bootstrap_estimates = np.array(bootstrap_estimates) # Jackknife estimates for acceleration constant jackknife_estimates = [] for i, geo in enumerate(unique_geos): jack_df = df[df[geo_col] != geo] if len(jack_df) == 0: continue jack_treatment_mask = jack_df[assignment_col] == TREATMENT if jack_treatment_mask.sum() > 0 and (~jack_treatment_mask).sum() > 0: jack_estimate = ( jack_df[jack_treatment_mask]["lift_index"].mean() - jack_df[~jack_treatment_mask]["lift_index"].mean() ) jackknife_estimates.append(jack_estimate) jackknife_estimates = np.array(jackknife_estimates) # Bias correction n_less = np.sum(bootstrap_estimates < original_estimate) bias_correction = stats.norm.ppf(n_less / len(bootstrap_estimates)) # Acceleration constant jackknife_mean = np.mean(jackknife_estimates) acceleration = ( np.sum((jackknife_mean - jackknife_estimates) ** 3) / (6 * (np.sum((jackknife_mean - jackknife_estimates) ** 2)) ** 1.5) ) # BCa confidence intervals z_alpha = stats.norm.ppf(alpha / 2) z_1_alpha = stats.norm.ppf(1 - alpha / 2) alpha_1 = stats.norm.cdf(bias_correction + (bias_correction + z_alpha) / (1 - acceleration * (bias_correction + z_alpha))) alpha_2 = stats.norm.cdf(bias_correction + (bias_correction + z_1_alpha) / (1 - acceleration * (bias_correction + z_1_alpha))) # Ensure percentiles are within valid range alpha_1 = max(0, min(1, alpha_1)) alpha_2 = max(0, min(1, alpha_2)) lower_bound = np.percentile(bootstrap_estimates, 100 * alpha_1) upper_bound = np.percentile(bootstrap_estimates, 100 * alpha_2) return lower_bound, upper_bound def _permutation_test( self, df: pd.DataFrame, assignment_col: str, n_permutations: int = 1000, random_seed: Optional[int] = None ) -> float: """ Perform permutation test for lift analysis. Parameters ---------- df : pd.DataFrame Analysis dataframe with lift_index calculated. assignment_col : str Assignment column name. n_permutations : int, optional Number of permutation samples (default: 1000). random_seed : int, optional Random seed for reproducibility. Returns ------- float Permutation test p-value. """ if random_seed: np.random.seed(random_seed) # Original test statistic treatment_mask = df[assignment_col] == TREATMENT original_diff = df[treatment_mask]["lift_index"].mean() - df[~treatment_mask]["lift_index"].mean() # Permutation distribution permutation_diffs = [] assignments = df[assignment_col].values.copy() for _ in range(n_permutations): # Randomly permute assignments np.random.shuffle(assignments) perm_treatment_mask = assignments == TREATMENT if perm_treatment_mask.sum() > 0 and (~perm_treatment_mask).sum() > 0: perm_diff = ( df.loc[perm_treatment_mask, "lift_index"].mean() - df.loc[~perm_treatment_mask, "lift_index"].mean() ) permutation_diffs.append(perm_diff) permutation_diffs = np.array(permutation_diffs) # Two-tailed p-value p_value = np.mean(np.abs(permutation_diffs) >= np.abs(original_diff)) return p_value @monitor_operation("analyze_impl") @cache_result(ttl=3600) # Cache results for 1 hour def _analyze_impl(self, df: pd.DataFrame, **kwargs: Any) -> LiftResult: """Run the primary analysis on experimental data using a normalized lift index. Parameters ---------- df : pd.DataFrame The DataFrame containing the full experimental data. geo_col : str The name of the geographic identifier column. assignment_col : str The name of the column with group assignments. date_col : str The name of the date column. kpi_col : str The name of the KPI column. pre_period_end : str The end date of the pre-period (YYYY-MM-DD). test_period_start : str The start date of the test period (YYYY-MM-DD). test_period_end : str The end date of the test period (YYYY-MM-DD). return_dataframe : bool, optional If True, returns the analysis DataFrame along with results. use_bca_bootstrap : bool, optional If True, uses BCa bootstrap for confidence intervals (default: False). use_permutation_test : bool, optional If True, adds permutation test p-value (default: False). n_bootstrap : int, optional Number of bootstrap samples for BCa method (default: 1000). n_permutations : int, optional Number of permutation samples (default: 1000). random_seed : int, optional Random seed for reproducibility. Returns ------- LiftResult Results containing lift estimate, p-value, and confidence intervals. Raises ------ AnalysisError If model fitting fails. """ geo_col = kwargs["geo_col"] assignment_col = kwargs["assignment_col"] return_dataframe = kwargs.get("return_dataframe", False) use_bca_bootstrap = kwargs.get("use_bca_bootstrap", False) use_permutation_test = kwargs.get("use_permutation_test", False) n_bootstrap = kwargs.get("n_bootstrap", 1000) n_permutations = kwargs.get("n_permutations", 1000) random_seed = kwargs.get("random_seed") if return_dataframe: logger.warning( "'return_dataframe=True' is not supported and will be ignored." ) # Fit OLS model with cluster-robust standard errors # Handle categorical data by converting to string first if df[assignment_col].dtype.name == 'category': treatment_dummy = (df[assignment_col].astype(str) == TREATMENT).astype(int) else: treatment_dummy = (df[assignment_col] == TREATMENT).astype(int) X = sm.add_constant(treatment_dummy) X.columns = ["const", TREATMENT] y = df["lift_index"] try: model = sm.OLS(y, X).fit( cov_type="cluster", cov_kwds={"groups": df[geo_col]} ) # Calculate confidence intervals if use_bca_bootstrap: logger.info(f"Computing BCa bootstrap confidence intervals with {n_bootstrap} samples") conf_int = list(self._bca_bootstrap( df, geo_col, assignment_col, n_bootstrap, random_seed=random_seed )) else: conf_int = model.conf_int().loc[TREATMENT].tolist() # Calculate p-value if use_permutation_test: logger.info(f"Computing permutation test p-value with {n_permutations} samples") p_value = self._permutation_test( df, assignment_col, n_permutations, random_seed=random_seed ) else: p_value = model.pvalues[TREATMENT] # Calculate effect sizes treatment_vals = df[df[assignment_col] == TREATMENT]["lift_index"] control_vals = df[df[assignment_col] != TREATMENT]["lift_index"] cohens_d = calculate_cohens_d(treatment_vals.values, control_vals.values) hedges_g = calculate_hedges_g(treatment_vals.values, control_vals.values) # Calculate raw volume uplift (average absolute change in treatment group) treatment_df = df[df[assignment_col] == TREATMENT] control_df = df[df[assignment_col] != TREATMENT] # Average test period values avg_treatment_test = treatment_df["test_avg"].mean() avg_control_test = control_df["test_avg"].mean() # Raw volume uplift = difference in test period averages raw_volume_uplift = avg_treatment_test - avg_control_test metadata = { "model_summary": str(model.summary()), "cohens_d": float(cohens_d), "hedges_g": float(hedges_g), "n_treatment": len(treatment_vals), "n_control": len(control_vals), "raw_volume_uplift": float(raw_volume_uplift) } if use_bca_bootstrap: metadata["bootstrap_method"] = "BCa" metadata["n_bootstrap"] = n_bootstrap if use_permutation_test: metadata["permutation_test"] = True metadata["n_permutations"] = n_permutations return LiftResult( success=True, estimate=model.params[TREATMENT], p_value=float(p_value), confidence_interval=(conf_int[0], conf_int[1]), method="lift_analysis", metadata=metadata ) except Exception as e: raise AnalysisError(f"Model fitting failed: {str(e)}") from e @monitor_operation("balance_check") def check_balance( self, df: pd.DataFrame, geo_col: str, assignment_col: str, covariates: List[str], **kwargs: Any ) -> BalanceCheckResult: """ Perform advanced balance check on covariates. Parameters ---------- df : pd.DataFrame Full experimental dataframe. geo_col : str Geographic identifier column. assignment_col : str Assignment column name. covariates : List[str] List of covariate columns to check. **kwargs : Any Additional parameters passed to advanced_balance_check. Returns ------- BalanceCheckResult Comprehensive balance assessment. """ # Get unique geo assignments for balance check geo_assignments = df[[geo_col, assignment_col]].drop_duplicates() return advanced_balance_check( geo_assignments, assignment_col, covariates, **kwargs )
[docs] class DiDAnalyzer(BaseAnalyzer, MonitoringMixin): """Analyzer for Difference-in-Differences (DiD) analysis."""
[docs] def __init__(self, config: Optional['AnalyzerConfig'] = None, performance_config: Optional[ParallelConfig] = None): """Initialize DiDAnalyzer with configuration. Parameters ---------- config : AnalyzerConfig, optional Configuration for analyzer behavior. performance_config : ParallelConfig, optional Configuration for performance optimizations. """ # Import here to avoid circular imports from .base import AnalyzerConfig # Initialize with config (BaseAnalyzer expects this) super().__init__(config) self.performance_config = performance_config or ParallelConfig()
[docs] def validate_inputs(self, df: pd.DataFrame, **kwargs: Any) -> None: """Validate inputs for DiD analysis.""" # Call parent validation first super().validate_inputs(df, **kwargs) geo_col = kwargs.get("geo_col") assignment_col = kwargs.get("assignment_col") date_col = kwargs.get("date_col") kpi_col = kwargs.get("kpi_col") test_period_start = kwargs.get("test_period_start") assert isinstance(geo_col, str) assert isinstance(assignment_col, str) assert isinstance(date_col, str) assert isinstance(kpi_col, str) assert test_period_start is not None df = validate_dataframe(df, "df") validate_columns_exist(df, [geo_col, assignment_col, date_col, kpi_col]) validate_numeric_column(df, kpi_col) validate_date_string(test_period_start, "test_period_start")
@monitor_operation("prepare_data") def prepare_data(self, df: pd.DataFrame, **kwargs: Any) -> pd.DataFrame: """Prepare data for DiD analysis with performance optimizations.""" # Call parent preparation first df = super().prepare_data(df, **kwargs) assignment_col = kwargs["assignment_col"] date_col = kwargs["date_col"] test_period_start = kwargs["test_period_start"] # Standardize assignments first to ensure string type df[assignment_col] = standardize_assignments(df[assignment_col]) # Convert date column BEFORE memory optimization to prevent it being converted to categorical df[date_col] = pd.to_datetime(df[date_col]) # Optimize memory usage df = optimize_memory_usage(df) # Ensure assignment column remains string after memory optimization # (optimize_memory_usage may convert low-cardinality string columns to categorical) if df[assignment_col].dtype.name == 'category': df[assignment_col] = df[assignment_col].astype(str) # Vectorized post-period indicator calculation test_period_start = pd.to_datetime(test_period_start) df["post"] = (df[date_col] >= test_period_start).astype("int8") # Use int8 for memory efficiency # Don't re-optimize to avoid converting datetime back to categorical return df def _permutation_test_did( self, df: pd.DataFrame, assignment_col: str, geo_col: str, kpi_col: str, n_permutations: int = 1000, random_seed: Optional[int] = None ) -> float: """ Perform permutation test for DiD analysis. Parameters ---------- df : pd.DataFrame Analysis dataframe with post period indicator. assignment_col : str Assignment column name. geo_col : str Geographic identifier column. kpi_col : str KPI column name. n_permutations : int, optional Number of permutation samples (default: 1000). random_seed : int, optional Random seed for reproducibility. Returns ------- float Permutation test p-value. """ if random_seed: np.random.seed(random_seed) # Original DiD formula and estimate formula = f"{kpi_col} ~ {assignment_col} * post" try: original_model = smf.ols(formula=formula, data=df).fit() interaction_term = f"{assignment_col}[T.{TREATMENT}]:post" if interaction_term not in original_model.params: possible_terms = [ term for term in original_model.params.index if ":post" in term ] if possible_terms: interaction_term = possible_terms[0] else: logger.warning("Could not find interaction term for permutation test") return 1.0 original_estimate = original_model.params[interaction_term] except Exception: logger.warning("Failed to fit original model for permutation test") return 1.0 # Get unique geos and their assignments for permutation geo_assignments = df[[geo_col, assignment_col]].drop_duplicates() unique_geos = geo_assignments[geo_col].values assignments = geo_assignments[assignment_col].values.copy() # Permutation distribution permutation_estimates = [] for _ in range(n_permutations): # Randomly permute assignments at geo level np.random.shuffle(assignments) # Create permuted assignment mapping perm_assignments = dict(zip(unique_geos, assignments)) # Apply permuted assignments to dataframe perm_df = df.copy() perm_df[assignment_col] = perm_df[geo_col].map(perm_assignments) try: perm_model = smf.ols(formula=formula, data=perm_df).fit() if interaction_term in perm_model.params: perm_estimate = perm_model.params[interaction_term] permutation_estimates.append(perm_estimate) except Exception: # Skip failed permutation samples continue if len(permutation_estimates) == 0: logger.warning("No successful permutation samples") return 1.0 permutation_estimates = np.array(permutation_estimates) # Two-tailed p-value p_value = np.mean(np.abs(permutation_estimates) >= np.abs(original_estimate)) return p_value def _wild_bootstrap_did( self, df: pd.DataFrame, formula: str, assignment_col: str, geo_col: str, n_bootstrap: int = 1000, random_seed: Optional[int] = None ) -> Tuple[float, float]: """ Calculate wild bootstrap confidence intervals for DiD analysis. Wild bootstrap is particularly useful for handling heteroskedasticity and cluster correlation in DiD models. Parameters ---------- df : pd.DataFrame Analysis dataframe. formula : str Regression formula. assignment_col : str Assignment column name. geo_col : str Geographic identifier column. n_bootstrap : int, optional Number of bootstrap samples (default: 1000). random_seed : int, optional Random seed for reproducibility. Returns ------- Tuple[float, float] Lower and upper confidence interval bounds. """ if random_seed: np.random.seed(random_seed) # Fit original model try: original_model = smf.ols(formula=formula, data=df).fit() interaction_term = f"{assignment_col}_numeric:post" if interaction_term not in original_model.params: # Try alternative interaction term format interaction_term = f"{assignment_col}[T.{TREATMENT}]:post" if interaction_term not in original_model.params: possible_terms = [ term for term in original_model.params.index if ":post" in term ] if possible_terms: interaction_term = possible_terms[0] else: logger.warning("Could not find interaction term for wild bootstrap") return (0.0, 0.0) original_estimate = original_model.params[interaction_term] residuals = original_model.resid except Exception as e: logger.warning(f"Failed to fit original model for wild bootstrap: {e}") return (0.0, 0.0) # Get unique clusters for wild bootstrap unique_geos = df[geo_col].unique() n_clusters = len(unique_geos) # Bootstrap estimates using wild bootstrap bootstrap_estimates = [] for _ in range(n_bootstrap): # Generate Rademacher weights (±1 with equal probability) by cluster cluster_weights = np.random.choice([-1, 1], size=n_clusters) weight_mapping = dict(zip(unique_geos, cluster_weights)) # Apply weights to residuals df_weights = df[geo_col].map(weight_mapping) wild_residuals = residuals * df_weights # Create wild bootstrap dependent variable y_star = original_model.fittedvalues + wild_residuals # Fit model with wild bootstrap dependent variable try: X = original_model.model.exog wild_model = sm.OLS(y_star, X).fit() # Extract interaction coefficient (position depends on model structure) if interaction_term in original_model.params.index: coef_position = list(original_model.params.index).index(interaction_term) wild_estimate = wild_model.params[coef_position] bootstrap_estimates.append(wild_estimate) except Exception: # Skip failed bootstrap samples continue if len(bootstrap_estimates) == 0: logger.warning("No successful wild bootstrap samples") return (0.0, 0.0) bootstrap_estimates = np.array(bootstrap_estimates) # Calculate confidence intervals (2.5% and 97.5% percentiles) lower_bound = np.percentile(bootstrap_estimates, 2.5) upper_bound = np.percentile(bootstrap_estimates, 97.5) return lower_bound, upper_bound @monitor_operation("analyze_impl") @cache_result(ttl=3600) # Cache results for 1 hour def _analyze_impl(self, df: pd.DataFrame, **kwargs: Any) -> DiDResult: """Run a Difference-in-Differences (DiD) analysis as a robustness check. Parameters ---------- df : pd.DataFrame The DataFrame containing the full experimental data. geo_col : str The name of the geographic identifier column. assignment_col : str The name of the column with group assignments. date_col : str The name of the date column. kpi_col : str The name of the KPI column. test_period_start : str The start date of the test period (YYYY-MM-DD). use_wild_bootstrap : bool, optional If True, uses wild bootstrap for confidence intervals (default: False). use_permutation_test : bool, optional If True, adds permutation test p-value (default: False). n_bootstrap : int, optional Number of bootstrap samples for wild bootstrap (default: 1000). n_permutations : int, optional Number of permutation samples (default: 1000). random_seed : int, optional Random seed for reproducibility. Returns ------- DiDResult Results containing DiD estimate, p-value, and confidence intervals. Raises ------ AnalysisError If model fitting fails. """ geo_col = kwargs["geo_col"] assignment_col = kwargs["assignment_col"] kpi_col = kwargs["kpi_col"] use_wild_bootstrap = kwargs.get("use_wild_bootstrap", False) use_permutation_test = kwargs.get("use_permutation_test", False) n_bootstrap = kwargs.get("n_bootstrap", 1000) n_permutations = kwargs.get("n_permutations", 1000) random_seed = kwargs.get("random_seed") # Fit DiD model formula = f"{kpi_col} ~ {assignment_col} * post" try: did_model = smf.ols(formula=formula, data=df).fit( cov_type="cluster", cov_kwds={"groups": df[geo_col]} ) except Exception as e: raise AnalysisError(f"DiD model fitting failed: {str(e)}") from e # Find the interaction term interaction_term = f"{assignment_col}[T.{TREATMENT}]:post" if interaction_term not in did_model.params: possible_terms = [ term for term in did_model.params.index if ":post" in term ] if possible_terms: interaction_term = possible_terms[0] else: raise AnalysisError( f"Could not find the interaction term '{interaction_term}' in the " f"model results. Available terms are: " f"{list(did_model.params.index)}" ) # Calculate confidence intervals if use_wild_bootstrap: logger.info(f"Computing wild bootstrap confidence intervals with {n_bootstrap} samples") conf_int = list(self._wild_bootstrap_did( df, formula, assignment_col, geo_col, n_bootstrap, random_seed=random_seed )) else: conf_int = did_model.conf_int().loc[interaction_term].tolist() # Calculate p-value if use_permutation_test: logger.info(f"Computing permutation test p-value with {n_permutations} samples") p_value = self._permutation_test_did( df, assignment_col, geo_col, kpi_col, n_permutations, random_seed=random_seed ) else: p_value = did_model.pvalues[interaction_term] # Calculate effect sizes for pre and post periods separately treatment_vals = df[df[assignment_col] == TREATMENT][kpi_col] control_vals = df[df[assignment_col] != TREATMENT][kpi_col] cohens_d = calculate_cohens_d(treatment_vals.values, control_vals.values) hedges_g = calculate_hedges_g(treatment_vals.values, control_vals.values) metadata = { "model_summary": str(did_model.summary()), "interaction_term": interaction_term, "cohens_d": float(cohens_d), "hedges_g": float(hedges_g), "n_treatment": len(treatment_vals), "n_control": len(control_vals) } if use_wild_bootstrap: metadata["bootstrap_method"] = "Wild Bootstrap" metadata["n_bootstrap"] = n_bootstrap if use_permutation_test: metadata["permutation_test"] = True metadata["n_permutations"] = n_permutations return DiDResult( success=True, estimate=did_model.params[interaction_term], p_value=float(p_value), confidence_interval=(conf_int[0], conf_int[1]), method="did_analysis", metadata=metadata ) @monitor_operation("balance_check") def check_balance( self, df: pd.DataFrame, geo_col: str, assignment_col: str, covariates: List[str], **kwargs: Any ) -> BalanceCheckResult: """ Perform advanced balance check on covariates for DiD analysis. Parameters ---------- df : pd.DataFrame Full experimental dataframe. geo_col : str Geographic identifier column. assignment_col : str Assignment column name. covariates : List[str] List of covariate columns to check. **kwargs : Any Additional parameters passed to advanced_balance_check. Returns ------- BalanceCheckResult Comprehensive balance assessment. """ # Get unique geo assignments for balance check geo_assignments = df[[geo_col, assignment_col]].drop_duplicates() return advanced_balance_check( geo_assignments, assignment_col, covariates, **kwargs )
[docs] class CRTAnalyzer(BaseAnalyzer, MonitoringMixin): """Analyzer for Cluster Randomized Trials (CRT), including staircase designs."""
[docs] def __init__( self, config: Optional['AnalyzerConfig'] = None, bootstrap_reps: int = 1000, bootstrap_seed: Optional[int] = None, performance_config: Optional[ParallelConfig] = None ): """Initialize CRT analyzer. Parameters ---------- config : AnalyzerConfig, optional Configuration for analyzer behavior. bootstrap_reps : int, optional Number of bootstrap replications (default: 1000). bootstrap_seed : int, optional Random seed for bootstrap sampling. performance_config : ParallelConfig, optional Configuration for performance optimizations. """ # Import here to avoid circular imports from .base import AnalyzerConfig # Initialize with config (BaseAnalyzer expects this) super().__init__(config) self.bootstrap_reps = bootstrap_reps self.bootstrap_seed = bootstrap_seed self.performance_config = performance_config or ParallelConfig()
[docs] def validate_inputs(self, df: pd.DataFrame, **kwargs: Any) -> None: """Validate inputs for CRT analysis.""" # Call parent validation first super().validate_inputs(df, **kwargs) geo_col = kwargs.get("geo_col") period_col = kwargs.get("period_col") assignment_col = kwargs.get("assignment_col") kpi_col = kwargs.get("kpi_col") assert isinstance(geo_col, str) assert isinstance(period_col, str) assert isinstance(assignment_col, str) assert isinstance(kpi_col, str) df = validate_dataframe(df, "df") validate_columns_exist(df, [geo_col, period_col, assignment_col, kpi_col]) validate_numeric_column(df, kpi_col)
@monitor_operation("prepare_data") def prepare_data(self, df: pd.DataFrame, **kwargs: Any) -> pd.DataFrame: """Prepare data for CRT analysis with performance optimizations.""" # Call parent preparation first df = super().prepare_data(df, **kwargs) assignment_col = kwargs["assignment_col"] period_col = kwargs["period_col"] kpi_col = kwargs["kpi_col"] # Standardize assignments first to ensure string type df[assignment_col] = standardize_assignments(df[assignment_col]) # Optimize memory usage df = optimize_memory_usage(df) # Ensure assignment column remains string after memory optimization # (optimize_memory_usage may convert low-cardinality string columns to categorical) if df[assignment_col].dtype.name == 'category': df[assignment_col] = df[assignment_col].astype(str) # Create numeric assignment column (handle categorical assignments properly) if df[assignment_col].dtype.name == 'category': # For categorical data, convert to string first then compare df[f"{assignment_col}_numeric"] = (df[assignment_col].astype(str) == TREATMENT).astype("int8") else: df[f"{assignment_col}_numeric"] = (df[assignment_col] == TREATMENT).astype("int8") # Apply other optimizations using vectorize_operations df = vectorize_operations( df, operations={ period_col: lambda x: x.astype("category"), kpi_col: lambda x: x.astype("float32") # Use float32 for memory efficiency } ) return optimize_memory_usage(df) def _wild_bootstrap_crt( self, df: pd.DataFrame, formula: str, assignment_col: str, geo_col: str, n_bootstrap: int = 1000, random_seed: Optional[int] = None ) -> Tuple[float, float]: """ Calculate wild bootstrap confidence intervals for CRT analysis. Wild bootstrap is particularly effective for handling heteroskedasticity and cluster correlation in CRT models. Parameters ---------- df : pd.DataFrame Analysis dataframe. formula : str Regression formula. assignment_col : str Assignment column name. geo_col : str Geographic identifier column. n_bootstrap : int, optional Number of bootstrap samples (default: 1000). random_seed : int, optional Random seed for reproducibility. Returns ------- Tuple[float, float] Lower and upper confidence interval bounds. """ if random_seed: np.random.seed(random_seed) # Fit original model try: original_model = smf.ols(formula=formula, data=df).fit() coefficient_name = f"{assignment_col}_numeric" if coefficient_name not in original_model.params: logger.warning(f"Could not find coefficient {coefficient_name} for wild bootstrap") return (0.0, 0.0) original_estimate = original_model.params[coefficient_name] residuals = original_model.resid except Exception as e: logger.warning(f"Failed to fit original model for wild bootstrap: {e}") return (0.0, 0.0) # Get unique clusters for wild bootstrap unique_geos = df[geo_col].unique() n_clusters = len(unique_geos) # Bootstrap estimates using wild bootstrap bootstrap_estimates = [] for _ in range(n_bootstrap): # Generate Rademacher weights (±1 with equal probability) by cluster cluster_weights = np.random.choice([-1, 1], size=n_clusters) weight_mapping = dict(zip(unique_geos, cluster_weights)) # Apply weights to residuals df_weights = df[geo_col].map(weight_mapping) wild_residuals = residuals * df_weights # Create wild bootstrap dependent variable y_star = original_model.fittedvalues + wild_residuals # Fit model with wild bootstrap dependent variable try: X = original_model.model.exog wild_model = sm.OLS(y_star, X).fit() # Extract treatment coefficient if coefficient_name in original_model.params.index: coef_position = list(original_model.params.index).index(coefficient_name) wild_estimate = wild_model.params[coef_position] bootstrap_estimates.append(wild_estimate) except Exception: # Skip failed bootstrap samples continue if len(bootstrap_estimates) == 0: logger.warning("No successful wild bootstrap samples") return (0.0, 0.0) bootstrap_estimates = np.array(bootstrap_estimates) # Calculate confidence intervals (2.5% and 97.5% percentiles) lower_bound = np.percentile(bootstrap_estimates, 2.5) upper_bound = np.percentile(bootstrap_estimates, 97.5) return lower_bound, upper_bound def _bootstrap_single_sample(self, df: pd.DataFrame, formula: str, assignment_col: str, geo_col: str, unique_geos: np.ndarray, seed: int) -> Optional[float]: """Fit a single bootstrap sample. Parameters ---------- df : pd.DataFrame The original DataFrame. formula : str The regression formula. assignment_col : str The assignment column name. geo_col : str The geographic column name. unique_geos : np.ndarray Array of unique geographic identifiers. seed : int Random seed for this bootstrap sample. Returns ------- Optional[float] Bootstrap estimate or None if fitting fails. """ np.random.seed(seed) bootstrap_geos = np.random.choice( unique_geos, size=len(unique_geos), replace=True ) bootstrap_df = df[df[geo_col].isin(bootstrap_geos)] try: bootstrap_model = smf.ols(formula, data=bootstrap_df).fit() return bootstrap_model.params[f"{assignment_col}_numeric"] except Exception: return None # Skip if model fails on a bootstrap sample @monitor_operation("analyze_impl") def _analyze_impl(self, df: pd.DataFrame, **kwargs: Any) -> CRTResult: """Run a Mixed-Effects Model analysis suitable for Cluster Randomized Trials. Parameters ---------- df : pd.DataFrame The DataFrame containing the full experimental data. geo_col : str The name of the geographic cluster identifier column. period_col : str The name of the time period column. assignment_col : str The name of the column indicating treatment assignment. kpi_col : str The name of the Key Performance Indicator column. bootstrap_ci : bool, optional If True, calculates confidence intervals using a cluster bootstrap. n_bootstraps : int, optional The number of bootstrap samples to use if bootstrap_ci is True. random_seed : int, optional The random seed for the bootstrap process. Returns ------- CRTResult Results containing CRT estimate, p-value, and confidence intervals. Raises ------ AnalysisError If model fitting fails. """ geo_col = kwargs["geo_col"] period_col = kwargs["period_col"] assignment_col = kwargs["assignment_col"] kpi_col = kwargs["kpi_col"] bootstrap_ci = kwargs.get("bootstrap_ci", False) n_bootstraps = kwargs.get("n_bootstraps", 1000) random_seed: Optional[int] = kwargs.get("random_seed", self.bootstrap_seed or 42) # Build the regression formula formula = f"{kpi_col} ~ C({period_col}) + {assignment_col}_numeric" logger.debug(f"Using formula for CRT analysis: {formula}") try: model = smf.ols(formula, data=df).fit( cov_type="cluster", cov_kwds={"groups": df[geo_col]} ) logger.debug(f"CRT model summary:\n{model.summary()}") # Calculate confidence intervals if bootstrap_ci: unique_geos = df[geo_col].unique() # Use sequential processing when random seed is provided to ensure reproducibility # Parallel processing can introduce non-deterministic behavior even with proper seeding if (self.performance_config.enabled and n_bootstraps > 100 and random_seed is None): # Generate seeds for reproducibility np.random.seed(random_seed) seeds = np.random.randint(0, 2**31, n_bootstraps) # Create partial function for parallel execution from functools import partial bootstrap_func = partial( self._bootstrap_single_sample, df, formula, assignment_col, geo_col, unique_geos ) # Execute bootstrap samples in parallel using joblib directly estimates = joblib.Parallel( n_jobs=self.performance_config.n_jobs, backend=self.performance_config.backend )( joblib.delayed(bootstrap_func)(seed) for seed in seeds ) # Filter out None values from failed fits estimates = [est for est in estimates if est is not None] else: # Use sequential processing for reproducibility or small samples # Generate seeds for reproducibility np.random.seed(random_seed) seeds = np.random.randint(0, 2**31, n_bootstraps) estimates = [] for seed in seeds: estimate = self._bootstrap_single_sample( df, formula, assignment_col, geo_col, unique_geos, seed ) if estimate is not None: estimates.append(estimate) if len(estimates) < n_bootstraps * 0.8: logger.warning( f"Only {len(estimates)}/{n_bootstraps} bootstrap samples succeeded. " f"Results may be unreliable." ) conf_int = np.percentile(estimates, [2.5, 97.5]).tolist() else: conf_int = model.conf_int().loc[f"{assignment_col}_numeric"].tolist() return CRTResult( success=True, estimate=model.params[f"{assignment_col}_numeric"], p_value=model.pvalues[f"{assignment_col}_numeric"], confidence_interval=(conf_int[0], conf_int[1]), method="crt_analysis", metadata={"model_summary": str(model.summary())} ) except Exception as e: raise AnalysisError(f"CRT model fitting failed: {str(e)}") from e def _create_error_result(self, error: Exception, error_type: str) -> CRTResult: """Create a CRTResult for error cases. Parameters ---------- error : Exception The exception that occurred. error_type : str Type of error for metadata. Returns ------- CRTResult Error result with success=False. """ return CRTResult( success=False, estimate=0.0, p_value=1.0, confidence_interval=(0.0, 0.0), method="crt_analysis", metadata={ "error_message": str(error), "error_type": error_type } )