from typing import List
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans # type: ignore
from sklearn.preprocessing import StandardScaler # type: ignore
from .base import BaseDesigner
from .constants import CONTROL, TREATMENT
from .exceptions import DesignError
from .validation import (
validate_columns_exist,
validate_dataframe,
validate_positive_number,
)
[docs]
class StaircaseDesigner(BaseDesigner):
[docs]
def __init__(
self,
num_sequences: int,
clusters_per_sequence: int,
control_periods: int,
intervention_periods: int,
):
super().__init__()
self.num_sequences = num_sequences
self.clusters_per_sequence = clusters_per_sequence
self.control_periods = control_periods
self.intervention_periods = intervention_periods
[docs]
def prepare_data(self, *args, **kwargs) -> None:
"""Prepare data for staircase design.
For staircase design, no data preparation is needed as it's parameter-based.
"""
pass
def _design_impl(self, *args, **kwargs) -> pd.DataFrame:
"""Create the staircase design matrix.
Returns
-------
pd.DataFrame
A design matrix with columns for sequence, cluster, period, and assignment.
'assignment' is 'Control' or 'Treatment'.
"""
design = []
total_clusters = 0
for s in range(1, self.num_sequences + 1):
for k in range(1, self.clusters_per_sequence + 1):
cluster_id = total_clusters + k
start_period = s
end_period = s + self.control_periods + self.intervention_periods - 1
for t in range(start_period, end_period + 1):
switch_period = s + self.control_periods
assignment = TREATMENT if t >= switch_period else CONTROL
design.append(
{
"sequence": s,
"cluster_id": cluster_id,
"period": t,
"assignment": assignment,
}
)
total_clusters += self.clusters_per_sequence
return pd.DataFrame(design)
[docs]
def design(self) -> pd.DataFrame:
"""Creates a design matrix for a staircase cluster randomized trial.
Returns
-------
pd.DataFrame
A design matrix with columns for sequence, cluster, period, and assignment.
'assignment' is 'Control' or 'Treatment'.
Raises
------
ValidationError
If input validation fails.
"""
return super().design()
[docs]
class SimpleRandomizationDesigner(BaseDesigner):
[docs]
def __init__(self, num_groups: int = 2, seed: int = 42):
super().__init__()
self.num_groups = num_groups
self.seed = seed
self._df = None
self._geo_col = None
self._unique_geos = None
[docs]
def prepare_data(self, df: pd.DataFrame = None, geo_col: str = None, **kwargs) -> None:
"""Prepare data for simple randomization.
Parameters
----------
df : pd.DataFrame
DataFrame containing the geographic units to randomize.
geo_col : str
The name of the column containing the unique geographic identifiers.
"""
if df is not None:
self._df = df
if geo_col is not None:
self._geo_col = geo_col
if self._df is not None and self._geo_col is not None:
self._unique_geos = self._df[self._geo_col].unique()
def _design_impl(self, df: pd.DataFrame = None, geo_col: str = None, **kwargs) -> pd.DataFrame:
"""Perform the simple randomization assignment.
Returns
-------
pd.DataFrame
A DataFrame with a new 'assignment' column.
"""
np.random.seed(self.seed)
assignments = np.random.randint(0, self.num_groups, size=len(self._unique_geos))
assignment_map = {
geo: TREATMENT if assign == 1 else CONTROL
for geo, assign in zip(self._unique_geos, assignments)
}
df_assigned = self._df.copy()
df_assigned["assignment"] = df_assigned[self._geo_col].map(assignment_map)
return df_assigned
[docs]
def design(self, df: pd.DataFrame, geo_col: str) -> pd.DataFrame:
"""Assigns geographic units to experimental groups using simple randomization.
Parameters
----------
df : pd.DataFrame
DataFrame containing the geographic units to randomize.
geo_col : str
The name of the column containing the unique geographic identifiers.
Returns
-------
pd.DataFrame
A DataFrame with a new 'assignment' column.
Raises
------
ValidationError
If input validation fails.
"""
return super().design(df=df, geo_col=geo_col)
[docs]
class StratifiedRandomizationDesigner(BaseDesigner):
[docs]
def __init__(self, num_groups: int = 2, n_strata: int = 4, seed: int = 42):
super().__init__()
self.num_groups = num_groups
self.n_strata = n_strata
self.seed = seed
self._df = None
self._geo_col = None
self._strat_vars = None
self._geo_df = None
self._stratum_map = None
[docs]
def prepare_data(
self, df: pd.DataFrame = None, geo_col: str = None, strat_vars: List[str] = None, **kwargs
) -> None:
"""Prepare data for stratified randomization.
Parameters
----------
df : pd.DataFrame
DataFrame containing geographic units and stratification variables.
geo_col : str
The name of the column with unique geographic identifiers.
strat_vars : list of str
A list of column names to use for stratification.
"""
if df is not None:
self._df = df
if geo_col is not None:
self._geo_col = geo_col
if strat_vars is not None:
self._strat_vars = strat_vars
# Ensure we have all required data before proceeding
if self._df is not None and self._geo_col is not None and self._strat_vars is not None:
# Extract unique geos with stratification variables
self._geo_df = (
self._df[[self._geo_col] + self._strat_vars]
.drop_duplicates(subset=[self._geo_col])
.reset_index(drop=True)
)
# Scale stratification variables and create strata using KMeans
scaler = StandardScaler()
X_scaled = scaler.fit_transform(self._geo_df[self._strat_vars])
kmeans = KMeans(n_clusters=self.n_strata, random_state=self.seed, n_init=10)
self._geo_df["stratum"] = kmeans.fit_predict(X_scaled)
# Create stratum mapping for later use
self._stratum_map = dict(zip(self._geo_df[self._geo_col], self._geo_df["stratum"]))
else:
# If we don't have all required data yet, set placeholders to None
self._geo_df = None
self._stratum_map = None
def _design_impl(
self, df: pd.DataFrame = None, geo_col: str = None, strat_vars: List[str] = None, **kwargs
) -> pd.DataFrame:
"""Perform the stratified randomization assignment.
Returns
-------
pd.DataFrame
The input DataFrame with 'stratum' and 'assignment' columns added.
"""
# Ensure data was properly prepared
if self._geo_df is None:
raise DesignError(
"Data preparation failed. Ensure prepare_data() was called with valid "
"df, geo_col, and strat_vars parameters."
)
if self._stratum_map is None:
raise DesignError("Stratum mapping was not created during data preparation.")
np.random.seed(self.seed)
assignments = {}
# Perform random assignment within each stratum
for stratum_id in range(self.n_strata):
stratum_geos = self._geo_df[self._geo_df["stratum"] == stratum_id][self._geo_col].tolist()
# Correctly perform random assignment within each stratum
n_treatment = len(stratum_geos) // self.num_groups
treatment_geos = np.random.choice(stratum_geos, n_treatment, replace=False)
for geo in stratum_geos:
assignments[geo] = TREATMENT if geo in treatment_geos else CONTROL
# Apply assignments and strata to the original dataframe
df_assigned = self._df.copy()
df_assigned["assignment"] = df_assigned[self._geo_col].map(assignments)
df_assigned["stratum"] = df_assigned[self._geo_col].map(self._stratum_map)
return df_assigned
[docs]
def design(
self, df: pd.DataFrame, geo_col: str, strat_vars: List[str]
) -> pd.DataFrame:
"""Assigns geographic units to groups using stratified randomization.
Parameters
----------
df : pd.DataFrame
DataFrame containing geographic units and stratification variables.
geo_col : str
The name of the column with unique geographic identifiers.
strat_vars : list of str
A list of column names to use for stratification.
Returns
-------
pd.DataFrame
The input DataFrame with 'stratum' and 'assignment' columns added.
Raises
------
ValidationError
If input validation fails.
"""
return super().design(df=df, geo_col=geo_col, strat_vars=strat_vars)