import logging
from typing import Any, Dict, Optional, Tuple, List
from functools import partial
import joblib
import numpy as np
import pandas as pd
import statsmodels.api as sm # type: ignore
import statsmodels.formula.api as smf # type: ignore
from scipy import stats
from .base import BaseAnalyzer
from .constants import TREATMENT
from .exceptions import AnalysisError
from .performance import (
cache_result, optimize_memory_usage, ParallelConfig, vectorize_operations,
parallel_apply
)
from .monitoring import MonitoringMixin, monitor_operation
from .schemas import CRTResult, DiDResult, LiftResult, BalanceCheckResult, DiagnosticResult
from .validation import (
standardize_assignments,
validate_columns_exist,
validate_dataframe,
validate_date_string,
validate_numeric_column,
)
from .diagnostics import (
advanced_balance_check,
calculate_cohens_d,
calculate_hedges_g,
multiple_testing_correction,
perform_placebo_test,
sensitivity_analysis_rosenbaum
)
logger = logging.getLogger(__name__)
[docs]
class LiftAnalyzer(BaseAnalyzer, MonitoringMixin):
"""Analyzer for lift analysis using normalized lift index."""
[docs]
def __init__(self, config: Optional['AnalyzerConfig'] = None, performance_config: Optional[ParallelConfig] = None):
"""Initialize LiftAnalyzer with configuration.
Parameters
----------
config : AnalyzerConfig, optional
Configuration for analyzer behavior.
performance_config : ParallelConfig, optional
Configuration for performance optimizations.
"""
# Import here to avoid circular imports
from .base import AnalyzerConfig
# Initialize with config (BaseAnalyzer expects this)
super().__init__(config)
self.performance_config = performance_config or ParallelConfig()
@monitor_operation("prepare_data")
def prepare_data(self, df: pd.DataFrame, **kwargs: Any) -> pd.DataFrame:
"""Prepare data for lift analysis with performance optimizations."""
# Call parent preparation first
prepared_df = super().prepare_data(df, **kwargs)
geo_col = kwargs["geo_col"]
assignment_col = kwargs["assignment_col"]
date_col = kwargs["date_col"]
kpi_col = kwargs["kpi_col"]
pre_period_end = kwargs["pre_period_end"]
test_period_start = kwargs["test_period_start"]
test_period_end = kwargs["test_period_end"]
# Standardize assignments first to ensure string type
prepared_df[assignment_col] = standardize_assignments(prepared_df[assignment_col])
# Convert date column BEFORE memory optimization to prevent it being converted to categorical
prepared_df[date_col] = pd.to_datetime(prepared_df[date_col])
# Optimize memory usage for the dataframe
prepared_df = optimize_memory_usage(prepared_df)
# Ensure assignment column remains string after memory optimization
# (optimize_memory_usage may convert low-cardinality string columns to categorical)
if prepared_df[assignment_col].dtype.name == 'category':
prepared_df[assignment_col] = prepared_df[assignment_col].astype(str)
# Convert period dates
pre_period_end = pd.to_datetime(pre_period_end)
test_period_start = pd.to_datetime(test_period_start)
test_period_end = pd.to_datetime(test_period_end)
# Vectorized period filtering
pre_mask = prepared_df[date_col] <= pre_period_end
test_mask = (prepared_df[date_col] >= test_period_start) & (prepared_df[date_col] <= test_period_end)
# Calculate period averages using vectorized operations
pre_avg = prepared_df[pre_mask].groupby(geo_col)[kpi_col].mean().reset_index(name="pre_avg")
test_avg = prepared_df[test_mask].groupby(geo_col)[kpi_col].mean().reset_index(name="test_avg")
# Create analysis dataframe with efficient merging
analysis_df = pd.merge(pre_avg, test_avg, on=geo_col, how="inner")
assignments = prepared_df[[geo_col, assignment_col]].drop_duplicates()
analysis_df = pd.merge(analysis_df, assignments, on=geo_col, how="left")
# Vectorized lift index calculation
analysis_df["lift_index"] = np.divide(
analysis_df["test_avg"],
analysis_df["pre_avg"],
out=np.zeros_like(analysis_df["test_avg"]),
where=analysis_df["pre_avg"] != 0
) - 1
# Don't re-optimize analysis_df to preserve string assignment column
return analysis_df
def _bca_bootstrap(
self,
df: pd.DataFrame,
geo_col: str,
assignment_col: str,
n_bootstrap: int = 1000,
alpha: float = 0.05,
random_seed: Optional[int] = None
) -> Tuple[float, float]:
"""
Calculate bias-corrected and accelerated (BCa) bootstrap confidence intervals.
Parameters
----------
df : pd.DataFrame
Analysis dataframe with lift_index calculated.
geo_col : str
Geographic identifier column.
assignment_col : str
Assignment column name.
n_bootstrap : int, optional
Number of bootstrap samples (default: 1000).
alpha : float, optional
Significance level (default: 0.05).
random_seed : int, optional
Random seed for reproducibility.
Returns
-------
Tuple[float, float]
Lower and upper confidence interval bounds.
"""
if random_seed:
np.random.seed(random_seed)
unique_geos = df[geo_col].unique()
n_geos = len(unique_geos)
# Original estimate
treatment_mask = df[assignment_col] == TREATMENT
original_estimate = df[treatment_mask]["lift_index"].mean() - df[~treatment_mask]["lift_index"].mean()
# Bootstrap estimates
bootstrap_estimates = []
for _ in range(n_bootstrap):
# Resample clusters with replacement
bootstrap_geos = np.random.choice(unique_geos, size=n_geos, replace=True)
bootstrap_df = df[df[geo_col].isin(bootstrap_geos)]
if len(bootstrap_df) == 0:
continue
# Calculate bootstrap estimate
boot_treatment_mask = bootstrap_df[assignment_col] == TREATMENT
if boot_treatment_mask.sum() > 0 and (~boot_treatment_mask).sum() > 0:
boot_estimate = (
bootstrap_df[boot_treatment_mask]["lift_index"].mean() -
bootstrap_df[~boot_treatment_mask]["lift_index"].mean()
)
bootstrap_estimates.append(boot_estimate)
bootstrap_estimates = np.array(bootstrap_estimates)
# Jackknife estimates for acceleration constant
jackknife_estimates = []
for i, geo in enumerate(unique_geos):
jack_df = df[df[geo_col] != geo]
if len(jack_df) == 0:
continue
jack_treatment_mask = jack_df[assignment_col] == TREATMENT
if jack_treatment_mask.sum() > 0 and (~jack_treatment_mask).sum() > 0:
jack_estimate = (
jack_df[jack_treatment_mask]["lift_index"].mean() -
jack_df[~jack_treatment_mask]["lift_index"].mean()
)
jackknife_estimates.append(jack_estimate)
jackknife_estimates = np.array(jackknife_estimates)
# Bias correction
n_less = np.sum(bootstrap_estimates < original_estimate)
bias_correction = stats.norm.ppf(n_less / len(bootstrap_estimates))
# Acceleration constant
jackknife_mean = np.mean(jackknife_estimates)
acceleration = (
np.sum((jackknife_mean - jackknife_estimates) ** 3) /
(6 * (np.sum((jackknife_mean - jackknife_estimates) ** 2)) ** 1.5)
)
# BCa confidence intervals
z_alpha = stats.norm.ppf(alpha / 2)
z_1_alpha = stats.norm.ppf(1 - alpha / 2)
alpha_1 = stats.norm.cdf(bias_correction + (bias_correction + z_alpha) / (1 - acceleration * (bias_correction + z_alpha)))
alpha_2 = stats.norm.cdf(bias_correction + (bias_correction + z_1_alpha) / (1 - acceleration * (bias_correction + z_1_alpha)))
# Ensure percentiles are within valid range
alpha_1 = max(0, min(1, alpha_1))
alpha_2 = max(0, min(1, alpha_2))
lower_bound = np.percentile(bootstrap_estimates, 100 * alpha_1)
upper_bound = np.percentile(bootstrap_estimates, 100 * alpha_2)
return lower_bound, upper_bound
def _permutation_test(
self,
df: pd.DataFrame,
assignment_col: str,
n_permutations: int = 1000,
random_seed: Optional[int] = None
) -> float:
"""
Perform permutation test for lift analysis.
Parameters
----------
df : pd.DataFrame
Analysis dataframe with lift_index calculated.
assignment_col : str
Assignment column name.
n_permutations : int, optional
Number of permutation samples (default: 1000).
random_seed : int, optional
Random seed for reproducibility.
Returns
-------
float
Permutation test p-value.
"""
if random_seed:
np.random.seed(random_seed)
# Original test statistic
treatment_mask = df[assignment_col] == TREATMENT
original_diff = df[treatment_mask]["lift_index"].mean() - df[~treatment_mask]["lift_index"].mean()
# Permutation distribution
permutation_diffs = []
assignments = df[assignment_col].values.copy()
for _ in range(n_permutations):
# Randomly permute assignments
np.random.shuffle(assignments)
perm_treatment_mask = assignments == TREATMENT
if perm_treatment_mask.sum() > 0 and (~perm_treatment_mask).sum() > 0:
perm_diff = (
df.loc[perm_treatment_mask, "lift_index"].mean() -
df.loc[~perm_treatment_mask, "lift_index"].mean()
)
permutation_diffs.append(perm_diff)
permutation_diffs = np.array(permutation_diffs)
# Two-tailed p-value
p_value = np.mean(np.abs(permutation_diffs) >= np.abs(original_diff))
return p_value
@monitor_operation("analyze_impl")
@cache_result(ttl=3600) # Cache results for 1 hour
def _analyze_impl(self, df: pd.DataFrame, **kwargs: Any) -> LiftResult:
"""Run the primary analysis on experimental data using a normalized lift index.
Parameters
----------
df : pd.DataFrame
The DataFrame containing the full experimental data.
geo_col : str
The name of the geographic identifier column.
assignment_col : str
The name of the column with group assignments.
date_col : str
The name of the date column.
kpi_col : str
The name of the KPI column.
pre_period_end : str
The end date of the pre-period (YYYY-MM-DD).
test_period_start : str
The start date of the test period (YYYY-MM-DD).
test_period_end : str
The end date of the test period (YYYY-MM-DD).
return_dataframe : bool, optional
If True, returns the analysis DataFrame along with results.
use_bca_bootstrap : bool, optional
If True, uses BCa bootstrap for confidence intervals (default: False).
use_permutation_test : bool, optional
If True, adds permutation test p-value (default: False).
n_bootstrap : int, optional
Number of bootstrap samples for BCa method (default: 1000).
n_permutations : int, optional
Number of permutation samples (default: 1000).
random_seed : int, optional
Random seed for reproducibility.
Returns
-------
LiftResult
Results containing lift estimate, p-value, and confidence intervals.
Raises
------
AnalysisError
If model fitting fails.
"""
geo_col = kwargs["geo_col"]
assignment_col = kwargs["assignment_col"]
return_dataframe = kwargs.get("return_dataframe", False)
use_bca_bootstrap = kwargs.get("use_bca_bootstrap", False)
use_permutation_test = kwargs.get("use_permutation_test", False)
n_bootstrap = kwargs.get("n_bootstrap", 1000)
n_permutations = kwargs.get("n_permutations", 1000)
random_seed = kwargs.get("random_seed")
if return_dataframe:
logger.warning(
"'return_dataframe=True' is not supported and will be ignored."
)
# Fit OLS model with cluster-robust standard errors
# Handle categorical data by converting to string first
if df[assignment_col].dtype.name == 'category':
treatment_dummy = (df[assignment_col].astype(str) == TREATMENT).astype(int)
else:
treatment_dummy = (df[assignment_col] == TREATMENT).astype(int)
X = sm.add_constant(treatment_dummy)
X.columns = ["const", TREATMENT]
y = df["lift_index"]
try:
model = sm.OLS(y, X).fit(
cov_type="cluster", cov_kwds={"groups": df[geo_col]}
)
# Calculate confidence intervals
if use_bca_bootstrap:
logger.info(f"Computing BCa bootstrap confidence intervals with {n_bootstrap} samples")
conf_int = list(self._bca_bootstrap(
df, geo_col, assignment_col, n_bootstrap, random_seed=random_seed
))
else:
conf_int = model.conf_int().loc[TREATMENT].tolist()
# Calculate p-value
if use_permutation_test:
logger.info(f"Computing permutation test p-value with {n_permutations} samples")
p_value = self._permutation_test(
df, assignment_col, n_permutations, random_seed=random_seed
)
else:
p_value = model.pvalues[TREATMENT]
# Calculate effect sizes
treatment_vals = df[df[assignment_col] == TREATMENT]["lift_index"]
control_vals = df[df[assignment_col] != TREATMENT]["lift_index"]
cohens_d = calculate_cohens_d(treatment_vals.values, control_vals.values)
hedges_g = calculate_hedges_g(treatment_vals.values, control_vals.values)
# Calculate raw volume uplift (average absolute change in treatment group)
treatment_df = df[df[assignment_col] == TREATMENT]
control_df = df[df[assignment_col] != TREATMENT]
# Average test period values
avg_treatment_test = treatment_df["test_avg"].mean()
avg_control_test = control_df["test_avg"].mean()
# Raw volume uplift = difference in test period averages
raw_volume_uplift = avg_treatment_test - avg_control_test
metadata = {
"model_summary": str(model.summary()),
"cohens_d": float(cohens_d),
"hedges_g": float(hedges_g),
"n_treatment": len(treatment_vals),
"n_control": len(control_vals),
"raw_volume_uplift": float(raw_volume_uplift)
}
if use_bca_bootstrap:
metadata["bootstrap_method"] = "BCa"
metadata["n_bootstrap"] = n_bootstrap
if use_permutation_test:
metadata["permutation_test"] = True
metadata["n_permutations"] = n_permutations
return LiftResult(
success=True,
estimate=model.params[TREATMENT],
p_value=float(p_value),
confidence_interval=(conf_int[0], conf_int[1]),
method="lift_analysis",
metadata=metadata
)
except Exception as e:
raise AnalysisError(f"Model fitting failed: {str(e)}") from e
@monitor_operation("balance_check")
def check_balance(
self,
df: pd.DataFrame,
geo_col: str,
assignment_col: str,
covariates: List[str],
**kwargs: Any
) -> BalanceCheckResult:
"""
Perform advanced balance check on covariates.
Parameters
----------
df : pd.DataFrame
Full experimental dataframe.
geo_col : str
Geographic identifier column.
assignment_col : str
Assignment column name.
covariates : List[str]
List of covariate columns to check.
**kwargs : Any
Additional parameters passed to advanced_balance_check.
Returns
-------
BalanceCheckResult
Comprehensive balance assessment.
"""
# Get unique geo assignments for balance check
geo_assignments = df[[geo_col, assignment_col]].drop_duplicates()
return advanced_balance_check(
geo_assignments,
assignment_col,
covariates,
**kwargs
)
[docs]
class DiDAnalyzer(BaseAnalyzer, MonitoringMixin):
"""Analyzer for Difference-in-Differences (DiD) analysis."""
[docs]
def __init__(self, config: Optional['AnalyzerConfig'] = None, performance_config: Optional[ParallelConfig] = None):
"""Initialize DiDAnalyzer with configuration.
Parameters
----------
config : AnalyzerConfig, optional
Configuration for analyzer behavior.
performance_config : ParallelConfig, optional
Configuration for performance optimizations.
"""
# Import here to avoid circular imports
from .base import AnalyzerConfig
# Initialize with config (BaseAnalyzer expects this)
super().__init__(config)
self.performance_config = performance_config or ParallelConfig()
@monitor_operation("prepare_data")
def prepare_data(self, df: pd.DataFrame, **kwargs: Any) -> pd.DataFrame:
"""Prepare data for DiD analysis with performance optimizations."""
# Call parent preparation first
df = super().prepare_data(df, **kwargs)
assignment_col = kwargs["assignment_col"]
date_col = kwargs["date_col"]
test_period_start = kwargs["test_period_start"]
# Standardize assignments first to ensure string type
df[assignment_col] = standardize_assignments(df[assignment_col])
# Convert date column BEFORE memory optimization to prevent it being converted to categorical
df[date_col] = pd.to_datetime(df[date_col])
# Optimize memory usage
df = optimize_memory_usage(df)
# Ensure assignment column remains string after memory optimization
# (optimize_memory_usage may convert low-cardinality string columns to categorical)
if df[assignment_col].dtype.name == 'category':
df[assignment_col] = df[assignment_col].astype(str)
# Vectorized post-period indicator calculation
test_period_start = pd.to_datetime(test_period_start)
df["post"] = (df[date_col] >= test_period_start).astype("int8") # Use int8 for memory efficiency
# Don't re-optimize to avoid converting datetime back to categorical
return df
def _permutation_test_did(
self,
df: pd.DataFrame,
assignment_col: str,
geo_col: str,
kpi_col: str,
n_permutations: int = 1000,
random_seed: Optional[int] = None
) -> float:
"""
Perform permutation test for DiD analysis.
Parameters
----------
df : pd.DataFrame
Analysis dataframe with post period indicator.
assignment_col : str
Assignment column name.
geo_col : str
Geographic identifier column.
kpi_col : str
KPI column name.
n_permutations : int, optional
Number of permutation samples (default: 1000).
random_seed : int, optional
Random seed for reproducibility.
Returns
-------
float
Permutation test p-value.
"""
if random_seed:
np.random.seed(random_seed)
# Original DiD formula and estimate
formula = f"{kpi_col} ~ {assignment_col} * post"
try:
original_model = smf.ols(formula=formula, data=df).fit()
interaction_term = f"{assignment_col}[T.{TREATMENT}]:post"
if interaction_term not in original_model.params:
possible_terms = [
term for term in original_model.params.index if ":post" in term
]
if possible_terms:
interaction_term = possible_terms[0]
else:
logger.warning("Could not find interaction term for permutation test")
return 1.0
original_estimate = original_model.params[interaction_term]
except Exception:
logger.warning("Failed to fit original model for permutation test")
return 1.0
# Get unique geos and their assignments for permutation
geo_assignments = df[[geo_col, assignment_col]].drop_duplicates()
unique_geos = geo_assignments[geo_col].values
assignments = geo_assignments[assignment_col].values.copy()
# Permutation distribution
permutation_estimates = []
for _ in range(n_permutations):
# Randomly permute assignments at geo level
np.random.shuffle(assignments)
# Create permuted assignment mapping
perm_assignments = dict(zip(unique_geos, assignments))
# Apply permuted assignments to dataframe
perm_df = df.copy()
perm_df[assignment_col] = perm_df[geo_col].map(perm_assignments)
try:
perm_model = smf.ols(formula=formula, data=perm_df).fit()
if interaction_term in perm_model.params:
perm_estimate = perm_model.params[interaction_term]
permutation_estimates.append(perm_estimate)
except Exception:
# Skip failed permutation samples
continue
if len(permutation_estimates) == 0:
logger.warning("No successful permutation samples")
return 1.0
permutation_estimates = np.array(permutation_estimates)
# Two-tailed p-value
p_value = np.mean(np.abs(permutation_estimates) >= np.abs(original_estimate))
return p_value
def _wild_bootstrap_did(
self,
df: pd.DataFrame,
formula: str,
assignment_col: str,
geo_col: str,
n_bootstrap: int = 1000,
random_seed: Optional[int] = None
) -> Tuple[float, float]:
"""
Calculate wild bootstrap confidence intervals for DiD analysis.
Wild bootstrap is particularly useful for handling heteroskedasticity
and cluster correlation in DiD models.
Parameters
----------
df : pd.DataFrame
Analysis dataframe.
formula : str
Regression formula.
assignment_col : str
Assignment column name.
geo_col : str
Geographic identifier column.
n_bootstrap : int, optional
Number of bootstrap samples (default: 1000).
random_seed : int, optional
Random seed for reproducibility.
Returns
-------
Tuple[float, float]
Lower and upper confidence interval bounds.
"""
if random_seed:
np.random.seed(random_seed)
# Fit original model
try:
original_model = smf.ols(formula=formula, data=df).fit()
interaction_term = f"{assignment_col}_numeric:post"
if interaction_term not in original_model.params:
# Try alternative interaction term format
interaction_term = f"{assignment_col}[T.{TREATMENT}]:post"
if interaction_term not in original_model.params:
possible_terms = [
term for term in original_model.params.index if ":post" in term
]
if possible_terms:
interaction_term = possible_terms[0]
else:
logger.warning("Could not find interaction term for wild bootstrap")
return (0.0, 0.0)
original_estimate = original_model.params[interaction_term]
residuals = original_model.resid
except Exception as e:
logger.warning(f"Failed to fit original model for wild bootstrap: {e}")
return (0.0, 0.0)
# Get unique clusters for wild bootstrap
unique_geos = df[geo_col].unique()
n_clusters = len(unique_geos)
# Bootstrap estimates using wild bootstrap
bootstrap_estimates = []
for _ in range(n_bootstrap):
# Generate Rademacher weights (±1 with equal probability) by cluster
cluster_weights = np.random.choice([-1, 1], size=n_clusters)
weight_mapping = dict(zip(unique_geos, cluster_weights))
# Apply weights to residuals
df_weights = df[geo_col].map(weight_mapping)
wild_residuals = residuals * df_weights
# Create wild bootstrap dependent variable
y_star = original_model.fittedvalues + wild_residuals
# Fit model with wild bootstrap dependent variable
try:
X = original_model.model.exog
wild_model = sm.OLS(y_star, X).fit()
# Extract interaction coefficient (position depends on model structure)
if interaction_term in original_model.params.index:
coef_position = list(original_model.params.index).index(interaction_term)
wild_estimate = wild_model.params[coef_position]
bootstrap_estimates.append(wild_estimate)
except Exception:
# Skip failed bootstrap samples
continue
if len(bootstrap_estimates) == 0:
logger.warning("No successful wild bootstrap samples")
return (0.0, 0.0)
bootstrap_estimates = np.array(bootstrap_estimates)
# Calculate confidence intervals (2.5% and 97.5% percentiles)
lower_bound = np.percentile(bootstrap_estimates, 2.5)
upper_bound = np.percentile(bootstrap_estimates, 97.5)
return lower_bound, upper_bound
@monitor_operation("analyze_impl")
@cache_result(ttl=3600) # Cache results for 1 hour
def _analyze_impl(self, df: pd.DataFrame, **kwargs: Any) -> DiDResult:
"""Run a Difference-in-Differences (DiD) analysis as a robustness check.
Parameters
----------
df : pd.DataFrame
The DataFrame containing the full experimental data.
geo_col : str
The name of the geographic identifier column.
assignment_col : str
The name of the column with group assignments.
date_col : str
The name of the date column.
kpi_col : str
The name of the KPI column.
test_period_start : str
The start date of the test period (YYYY-MM-DD).
use_wild_bootstrap : bool, optional
If True, uses wild bootstrap for confidence intervals (default: False).
use_permutation_test : bool, optional
If True, adds permutation test p-value (default: False).
n_bootstrap : int, optional
Number of bootstrap samples for wild bootstrap (default: 1000).
n_permutations : int, optional
Number of permutation samples (default: 1000).
random_seed : int, optional
Random seed for reproducibility.
Returns
-------
DiDResult
Results containing DiD estimate, p-value, and confidence intervals.
Raises
------
AnalysisError
If model fitting fails.
"""
geo_col = kwargs["geo_col"]
assignment_col = kwargs["assignment_col"]
kpi_col = kwargs["kpi_col"]
use_wild_bootstrap = kwargs.get("use_wild_bootstrap", False)
use_permutation_test = kwargs.get("use_permutation_test", False)
n_bootstrap = kwargs.get("n_bootstrap", 1000)
n_permutations = kwargs.get("n_permutations", 1000)
random_seed = kwargs.get("random_seed")
# Fit DiD model
formula = f"{kpi_col} ~ {assignment_col} * post"
try:
did_model = smf.ols(formula=formula, data=df).fit(
cov_type="cluster", cov_kwds={"groups": df[geo_col]}
)
except Exception as e:
raise AnalysisError(f"DiD model fitting failed: {str(e)}") from e
# Find the interaction term
interaction_term = f"{assignment_col}[T.{TREATMENT}]:post"
if interaction_term not in did_model.params:
possible_terms = [
term for term in did_model.params.index if ":post" in term
]
if possible_terms:
interaction_term = possible_terms[0]
else:
raise AnalysisError(
f"Could not find the interaction term '{interaction_term}' in the "
f"model results. Available terms are: "
f"{list(did_model.params.index)}"
)
# Calculate confidence intervals
if use_wild_bootstrap:
logger.info(f"Computing wild bootstrap confidence intervals with {n_bootstrap} samples")
conf_int = list(self._wild_bootstrap_did(
df, formula, assignment_col, geo_col, n_bootstrap, random_seed=random_seed
))
else:
conf_int = did_model.conf_int().loc[interaction_term].tolist()
# Calculate p-value
if use_permutation_test:
logger.info(f"Computing permutation test p-value with {n_permutations} samples")
p_value = self._permutation_test_did(
df, assignment_col, geo_col, kpi_col, n_permutations, random_seed=random_seed
)
else:
p_value = did_model.pvalues[interaction_term]
# Calculate effect sizes for pre and post periods separately
treatment_vals = df[df[assignment_col] == TREATMENT][kpi_col]
control_vals = df[df[assignment_col] != TREATMENT][kpi_col]
cohens_d = calculate_cohens_d(treatment_vals.values, control_vals.values)
hedges_g = calculate_hedges_g(treatment_vals.values, control_vals.values)
metadata = {
"model_summary": str(did_model.summary()),
"interaction_term": interaction_term,
"cohens_d": float(cohens_d),
"hedges_g": float(hedges_g),
"n_treatment": len(treatment_vals),
"n_control": len(control_vals)
}
if use_wild_bootstrap:
metadata["bootstrap_method"] = "Wild Bootstrap"
metadata["n_bootstrap"] = n_bootstrap
if use_permutation_test:
metadata["permutation_test"] = True
metadata["n_permutations"] = n_permutations
return DiDResult(
success=True,
estimate=did_model.params[interaction_term],
p_value=float(p_value),
confidence_interval=(conf_int[0], conf_int[1]),
method="did_analysis",
metadata=metadata
)
@monitor_operation("balance_check")
def check_balance(
self,
df: pd.DataFrame,
geo_col: str,
assignment_col: str,
covariates: List[str],
**kwargs: Any
) -> BalanceCheckResult:
"""
Perform advanced balance check on covariates for DiD analysis.
Parameters
----------
df : pd.DataFrame
Full experimental dataframe.
geo_col : str
Geographic identifier column.
assignment_col : str
Assignment column name.
covariates : List[str]
List of covariate columns to check.
**kwargs : Any
Additional parameters passed to advanced_balance_check.
Returns
-------
BalanceCheckResult
Comprehensive balance assessment.
"""
# Get unique geo assignments for balance check
geo_assignments = df[[geo_col, assignment_col]].drop_duplicates()
return advanced_balance_check(
geo_assignments,
assignment_col,
covariates,
**kwargs
)
[docs]
class CRTAnalyzer(BaseAnalyzer, MonitoringMixin):
"""Analyzer for Cluster Randomized Trials (CRT), including staircase designs."""
[docs]
def __init__(
self,
config: Optional['AnalyzerConfig'] = None,
bootstrap_reps: int = 1000,
bootstrap_seed: Optional[int] = None,
performance_config: Optional[ParallelConfig] = None
):
"""Initialize CRT analyzer.
Parameters
----------
config : AnalyzerConfig, optional
Configuration for analyzer behavior.
bootstrap_reps : int, optional
Number of bootstrap replications (default: 1000).
bootstrap_seed : int, optional
Random seed for bootstrap sampling.
performance_config : ParallelConfig, optional
Configuration for performance optimizations.
"""
# Import here to avoid circular imports
from .base import AnalyzerConfig
# Initialize with config (BaseAnalyzer expects this)
super().__init__(config)
self.bootstrap_reps = bootstrap_reps
self.bootstrap_seed = bootstrap_seed
self.performance_config = performance_config or ParallelConfig()
@monitor_operation("prepare_data")
def prepare_data(self, df: pd.DataFrame, **kwargs: Any) -> pd.DataFrame:
"""Prepare data for CRT analysis with performance optimizations."""
# Call parent preparation first
df = super().prepare_data(df, **kwargs)
assignment_col = kwargs["assignment_col"]
period_col = kwargs["period_col"]
kpi_col = kwargs["kpi_col"]
# Standardize assignments first to ensure string type
df[assignment_col] = standardize_assignments(df[assignment_col])
# Optimize memory usage
df = optimize_memory_usage(df)
# Ensure assignment column remains string after memory optimization
# (optimize_memory_usage may convert low-cardinality string columns to categorical)
if df[assignment_col].dtype.name == 'category':
df[assignment_col] = df[assignment_col].astype(str)
# Create numeric assignment column (handle categorical assignments properly)
if df[assignment_col].dtype.name == 'category':
# For categorical data, convert to string first then compare
df[f"{assignment_col}_numeric"] = (df[assignment_col].astype(str) == TREATMENT).astype("int8")
else:
df[f"{assignment_col}_numeric"] = (df[assignment_col] == TREATMENT).astype("int8")
# Apply other optimizations using vectorize_operations
df = vectorize_operations(
df,
operations={
period_col: lambda x: x.astype("category"),
kpi_col: lambda x: x.astype("float32") # Use float32 for memory efficiency
}
)
return optimize_memory_usage(df)
def _wild_bootstrap_crt(
self,
df: pd.DataFrame,
formula: str,
assignment_col: str,
geo_col: str,
n_bootstrap: int = 1000,
random_seed: Optional[int] = None
) -> Tuple[float, float]:
"""
Calculate wild bootstrap confidence intervals for CRT analysis.
Wild bootstrap is particularly effective for handling heteroskedasticity
and cluster correlation in CRT models.
Parameters
----------
df : pd.DataFrame
Analysis dataframe.
formula : str
Regression formula.
assignment_col : str
Assignment column name.
geo_col : str
Geographic identifier column.
n_bootstrap : int, optional
Number of bootstrap samples (default: 1000).
random_seed : int, optional
Random seed for reproducibility.
Returns
-------
Tuple[float, float]
Lower and upper confidence interval bounds.
"""
if random_seed:
np.random.seed(random_seed)
# Fit original model
try:
original_model = smf.ols(formula=formula, data=df).fit()
coefficient_name = f"{assignment_col}_numeric"
if coefficient_name not in original_model.params:
logger.warning(f"Could not find coefficient {coefficient_name} for wild bootstrap")
return (0.0, 0.0)
original_estimate = original_model.params[coefficient_name]
residuals = original_model.resid
except Exception as e:
logger.warning(f"Failed to fit original model for wild bootstrap: {e}")
return (0.0, 0.0)
# Get unique clusters for wild bootstrap
unique_geos = df[geo_col].unique()
n_clusters = len(unique_geos)
# Bootstrap estimates using wild bootstrap
bootstrap_estimates = []
for _ in range(n_bootstrap):
# Generate Rademacher weights (±1 with equal probability) by cluster
cluster_weights = np.random.choice([-1, 1], size=n_clusters)
weight_mapping = dict(zip(unique_geos, cluster_weights))
# Apply weights to residuals
df_weights = df[geo_col].map(weight_mapping)
wild_residuals = residuals * df_weights
# Create wild bootstrap dependent variable
y_star = original_model.fittedvalues + wild_residuals
# Fit model with wild bootstrap dependent variable
try:
X = original_model.model.exog
wild_model = sm.OLS(y_star, X).fit()
# Extract treatment coefficient
if coefficient_name in original_model.params.index:
coef_position = list(original_model.params.index).index(coefficient_name)
wild_estimate = wild_model.params[coef_position]
bootstrap_estimates.append(wild_estimate)
except Exception:
# Skip failed bootstrap samples
continue
if len(bootstrap_estimates) == 0:
logger.warning("No successful wild bootstrap samples")
return (0.0, 0.0)
bootstrap_estimates = np.array(bootstrap_estimates)
# Calculate confidence intervals (2.5% and 97.5% percentiles)
lower_bound = np.percentile(bootstrap_estimates, 2.5)
upper_bound = np.percentile(bootstrap_estimates, 97.5)
return lower_bound, upper_bound
def _bootstrap_single_sample(self, df: pd.DataFrame, formula: str,
assignment_col: str, geo_col: str,
unique_geos: np.ndarray, seed: int) -> Optional[float]:
"""Fit a single bootstrap sample.
Parameters
----------
df : pd.DataFrame
The original DataFrame.
formula : str
The regression formula.
assignment_col : str
The assignment column name.
geo_col : str
The geographic column name.
unique_geos : np.ndarray
Array of unique geographic identifiers.
seed : int
Random seed for this bootstrap sample.
Returns
-------
Optional[float]
Bootstrap estimate or None if fitting fails.
"""
np.random.seed(seed)
bootstrap_geos = np.random.choice(
unique_geos, size=len(unique_geos), replace=True
)
bootstrap_df = df[df[geo_col].isin(bootstrap_geos)]
try:
bootstrap_model = smf.ols(formula, data=bootstrap_df).fit()
return bootstrap_model.params[f"{assignment_col}_numeric"]
except Exception:
return None # Skip if model fails on a bootstrap sample
@monitor_operation("analyze_impl")
def _analyze_impl(self, df: pd.DataFrame, **kwargs: Any) -> CRTResult:
"""Run a Mixed-Effects Model analysis suitable for Cluster Randomized Trials.
Parameters
----------
df : pd.DataFrame
The DataFrame containing the full experimental data.
geo_col : str
The name of the geographic cluster identifier column.
period_col : str
The name of the time period column.
assignment_col : str
The name of the column indicating treatment assignment.
kpi_col : str
The name of the Key Performance Indicator column.
bootstrap_ci : bool, optional
If True, calculates confidence intervals using a cluster bootstrap.
n_bootstraps : int, optional
The number of bootstrap samples to use if bootstrap_ci is True.
random_seed : int, optional
The random seed for the bootstrap process.
Returns
-------
CRTResult
Results containing CRT estimate, p-value, and confidence intervals.
Raises
------
AnalysisError
If model fitting fails.
"""
geo_col = kwargs["geo_col"]
period_col = kwargs["period_col"]
assignment_col = kwargs["assignment_col"]
kpi_col = kwargs["kpi_col"]
bootstrap_ci = kwargs.get("bootstrap_ci", False)
n_bootstraps = kwargs.get("n_bootstraps", 1000)
random_seed: Optional[int] = kwargs.get("random_seed", self.bootstrap_seed or 42)
# Build the regression formula
formula = f"{kpi_col} ~ C({period_col}) + {assignment_col}_numeric"
logger.debug(f"Using formula for CRT analysis: {formula}")
try:
model = smf.ols(formula, data=df).fit(
cov_type="cluster", cov_kwds={"groups": df[geo_col]}
)
logger.debug(f"CRT model summary:\n{model.summary()}")
# Calculate confidence intervals
if bootstrap_ci:
unique_geos = df[geo_col].unique()
# Use sequential processing when random seed is provided to ensure reproducibility
# Parallel processing can introduce non-deterministic behavior even with proper seeding
if (self.performance_config.enabled and n_bootstraps > 100 and random_seed is None):
# Generate seeds for reproducibility
np.random.seed(random_seed)
seeds = np.random.randint(0, 2**31, n_bootstraps)
# Create partial function for parallel execution
from functools import partial
bootstrap_func = partial(
self._bootstrap_single_sample,
df, formula, assignment_col, geo_col, unique_geos
)
# Execute bootstrap samples in parallel using joblib directly
estimates = joblib.Parallel(
n_jobs=self.performance_config.n_jobs,
backend=self.performance_config.backend
)(
joblib.delayed(bootstrap_func)(seed) for seed in seeds
)
# Filter out None values from failed fits
estimates = [est for est in estimates if est is not None]
else:
# Use sequential processing for reproducibility or small samples
# Generate seeds for reproducibility
np.random.seed(random_seed)
seeds = np.random.randint(0, 2**31, n_bootstraps)
estimates = []
for seed in seeds:
estimate = self._bootstrap_single_sample(
df, formula, assignment_col, geo_col, unique_geos, seed
)
if estimate is not None:
estimates.append(estimate)
if len(estimates) < n_bootstraps * 0.8:
logger.warning(
f"Only {len(estimates)}/{n_bootstraps} bootstrap samples succeeded. "
f"Results may be unreliable."
)
conf_int = np.percentile(estimates, [2.5, 97.5]).tolist()
else:
conf_int = model.conf_int().loc[f"{assignment_col}_numeric"].tolist()
return CRTResult(
success=True,
estimate=model.params[f"{assignment_col}_numeric"],
p_value=model.pvalues[f"{assignment_col}_numeric"],
confidence_interval=(conf_int[0], conf_int[1]),
method="crt_analysis",
metadata={"model_summary": str(model.summary())}
)
except Exception as e:
raise AnalysisError(f"CRT model fitting failed: {str(e)}") from e
def _create_error_result(self, error: Exception, error_type: str) -> CRTResult:
"""Create a CRTResult for error cases.
Parameters
----------
error : Exception
The exception that occurred.
error_type : str
Type of error for metadata.
Returns
-------
CRTResult
Error result with success=False.
"""
return CRTResult(
success=False,
estimate=0.0,
p_value=1.0,
confidence_interval=(0.0, 0.0),
method="crt_analysis",
metadata={
"error_message": str(error),
"error_type": error_type
}
)