Source code for biomedical_data_generator.effects.batch

# Copyright (c) 2025 Sigrun May,
# Ostfalia Hochschule für angewandte Wissenschaften
#
# This software is distributed under the terms of the MIT license
# which is available at https://opensource.org/licenses/MIT

"""Batch effect simulation for synthetic biomedical datasets.

This module provides functionality to add realistic batch effects
(site differences, instrument variations, recruitment cohorts) to
generated datasets.

Public API:
    High-level (recommended):
        - apply_batch_effects_from_config: Orchestrate assignment + effects
        - generate_batch_assignments: Unified batch assignment (random or confounded)
        - apply_batch_effects: Apply systematic feature differences

    Low-level (for experimentation):
        - random_batch_assignment: Create independent batch labels
        - confounded_batch_assignment: Create class-correlated batch labels

Typical usage (high-level):
    >>> from batch import apply_batch_effects_from_config
    >>> X_batch, batches, batch_effects = apply_batch_effects_from_config(
    ...     x=X, y=y, batch_config=cfg.batch_effects,
    ...     informative_indices=inf_idx, rng=rng
    ... )

Educational usage (low-level):
    >>> # Experiment with random vs. confounded assignments
    >>> rng = np.random.default_rng(42)
    >>> random_batches = random_batch_assignment(100, n_batches=3, rng=rng)
    >>> confounded_batches = confounded_batch_assignment(
    ...     y, n_batches=3, confounding_with_class=0.8, rng=rng
    ... )

Reproducibility:
    All functions accept np.random.Generator objects to ensure proper
    state propagation. The top-level code (generator.py) creates ONE
    generator and passes it through the call chain. This ensures:
    - No duplicate random sequences
    - Perfect reproducibility from config.random_state
    - Proper state evolution across all operations
"""

from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, Literal, cast

import numpy as np
import pandas as pd
from numpy.typing import NDArray

if TYPE_CHECKING:
    from typing import Any

    from numpy.typing import ArrayLike


__all__ = [
    "generate_batch_assignments",
    "random_batch_assignment",
    "confounded_batch_assignment",
    "apply_batch_effects",
    "apply_batch_effects_from_config",
]


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------


def _normalize_proportions(
    proportions: Sequence[float],
    n_batches: int,
) -> NDArray[np.float64]:
    """Normalize a proportions vector to length-n_batches and sum 1.

    Raises:
        ValueError: if length mismatch, negative entries, or non-positive sum.
    """
    proportions_arr = np.asarray(proportions, dtype=float)

    if proportions_arr.shape[0] != n_batches:
        raise ValueError("len(proportions) must equal n_batches")

    if np.any(proportions_arr < 0):
        raise ValueError("proportions must be non-negative")

    total = proportions_arr.sum()
    if total <= 0:
        raise ValueError("proportions must sum to a positive value")

    return proportions_arr / total


# ---------------------------------------------------------------------------
# Batch assignment
# ---------------------------------------------------------------------------
[docs] def generate_batch_assignments( n_samples: int, n_batches: int, rng: np.random.Generator, proportions: Sequence[float] | None = None, class_labels: ArrayLike | None = None, confounding_with_class: float = 0.0, ) -> NDArray[np.int_]: """Generate batch assignments for samples. Unified entry point for creating batch labels. Automatically handles both random and confounded assignments based on the confounding_with_class parameter. This is the recommended function for batch assignment. For direct access to the underlying implementations, see: - random_batch_assignment() for independent assignments - confounded_batch_assignment() for class-correlated assignments Args: n_samples: Number of samples. n_batches: Number of batches (must be >= 1). rng: NumPy random generator (for reproducibility). proportions: Relative batch sizes (None = equal split). class_labels: Optional class labels for confounding. May be any array-like of shape (n_samples,) with labels that np.unique can handle (e.g. integers, strings). Only used if confounding_with_class > 0. confounding_with_class: Correlation with class (0.0 = random, 1.0 = strong preference), must be in [0, 1]. Returns: Array of batch assignments (integers 0 to n_batches-1). Examples: >>> rng = np.random.default_rng(42) >>> >>> # Random assignment >>> batches = generate_batch_assignments(100, n_batches=3, rng=rng) >>> np.bincount(batches) array([33, 33, 34]) >>> >>> # Confounded with class (simulate recruitment bias) >>> y = np.array([0]*50 + [1]*50) >>> batches = generate_batch_assignments( ... n_samples=100, ... n_batches=2, ... rng=rng, ... class_labels=y, ... confounding_with_class=0.8, ... ) Notes: - confounding_with_class=0.0: Random assignment (no bias) - confounding_with_class=1.0: Strong preference (each class → preferred batch) - For 2 equal-sized batches, confounding_with_class=0.5 yields approx. a 75/25 class split towards the preferred batch. """ if n_samples <= 0: raise ValueError("n_samples must be positive") if n_batches <= 0: raise ValueError("n_batches must be >= 1") if not (0.0 <= confounding_with_class <= 1.0): raise ValueError("confounding_with_class must be in [0, 1]") # Trivial case: only one batch → all zeros (no confounding possible) if n_batches == 1: return np.zeros(n_samples, dtype=int) if class_labels is not None: labels_arr = np.asarray(class_labels) if labels_arr.shape[0] != n_samples: raise ValueError("class_labels must have length n_samples") else: labels_arr = None if confounding_with_class == 0.0 or labels_arr is None: return random_batch_assignment( n_samples=n_samples, n_batches=n_batches, rng=rng, proportions=proportions, ) else: return confounded_batch_assignment( class_labels=labels_arr, n_batches=n_batches, confounding_with_class=confounding_with_class, rng=rng, proportions=proportions, )
[docs] def random_batch_assignment( n_samples: int, n_batches: int, rng: np.random.Generator, proportions: Sequence[float] | None = None, ) -> NDArray[np.int_]: """Create random batch assignments without confounding. Direct function for creating batch labels with specified proportions but no correlation with class labels. For most use cases, prefer generate_batch_assignments() which provides a unified interface for both random and confounded assignments. Args: n_samples: Number of samples to assign. n_batches: Number of distinct batches (must be >= 1). rng: NumPy random generator (for reproducibility). proportions: Relative batch sizes (None = equal split). Returns: Array of batch assignments (integers 0 to n_batches-1). Examples: >>> rng = np.random.default_rng(42) >>> batches = random_batch_assignment(100, n_batches=3, rng=rng) >>> np.bincount(batches) array([33, 33, 34]) >>> >>> # Unequal proportions >>> batches = random_batch_assignment( ... 100, n_batches=3, rng=rng, proportions=[0.5, 0.3, 0.2] ... ) Notes: - Generator state advances during shuffling. - Multiple calls with the same generator produce different results. """ if n_samples <= 0: raise ValueError("n_samples must be positive") if n_batches <= 0: raise ValueError("n_batches must be >= 1") if proportions is None: # Even split (integer counts with remainder distributed over first batches) base = n_samples // n_batches remainder = n_samples % n_batches counts = np.array( [base + (1 if i < remainder else 0) for i in range(n_batches)], dtype=int, ) else: proportions_arr = _normalize_proportions(proportions, n_batches) counts = np.round(proportions_arr * n_samples).astype(int) # Fix rounding to match n_samples exactly diff = n_samples - int(counts.sum()) if diff != 0: # Adjust the batches with largest target proportions first idx = np.argsort(-proportions_arr)[: abs(diff)] counts[idx] += int(np.sign(diff)) # Create labels and shuffle labels = np.concatenate([np.full(c, i, dtype=int) for i, c in enumerate(counts) if c > 0]) # In pathological rounding cases we might end up slightly short/long; guard here if labels.shape[0] != n_samples: # Simple fallback: sample with replacement from batches according to proportions if proportions is None: p = np.full(n_batches, 1.0 / n_batches) else: p = _normalize_proportions(proportions, n_batches) labels = rng.choice(n_batches, size=n_samples, p=p) rng.shuffle(labels) return labels.astype(int)
[docs] def confounded_batch_assignment( class_labels: ArrayLike, n_batches: int, confounding_with_class: float, rng: np.random.Generator, proportions: Sequence[float] | None = None, ) -> NDArray[np.int_]: """Create batch assignments confounded with class labels. Direct function for creating batch labels that are correlated with class membership, simulating recruitment bias or site selection effects. For most use cases, prefer generate_batch_assignments() which provides a unified interface for both random and confounded assignments. Strategy: - Use np.unique to derive K distinct classes (works for int and str). - For each class index k (0..K-1), preferentially assign to a "preferred" batch (k % n_batches). - Higher confounding_with_class = stronger preference for the preferred batch. - Uses redistribution: probability mass is moved from other batches to the preferred batch. - At confounding_with_class=1.0: strong preference (all samples of a class tend to fall into its preferred batch, subject to randomness). Args: class_labels: Class membership for each sample. May be integer or string labels; only relative grouping matters. n_batches: Number of distinct batches (must be >= 2). confounding_with_class: Degree of correlation (0.0 = random assignment, 1.0 = strong preference), must be in [0, 1]. rng: NumPy random generator (for reproducibility). proportions: Relative batch sizes (None = equal split). Returns: Array of batch assignments (integers 0 to n_batches-1). Examples: >>> rng = np.random.default_rng(42) >>> y = np.array(["control"] * 50 + ["case"] * 50) >>> batches = confounded_batch_assignment( ... y, n_batches=2, confounding_with_class=0.8, rng=rng ... ) >>> # Most "control" samples will be in batch 0, >>> # most "case" samples will be in batch 1. Notes: - confounding_with_class=0.0 would reduce to random assignment; use generate_batch_assignments for that branch. - For 2 equal-sized batches, confounding_with_class=0.5 yields approx. a 75/25 preference towards the class's preferred batch. """ labels = np.asarray(class_labels) if labels.ndim != 1: labels = labels.ravel() n_samples = labels.shape[0] if n_samples == 0: raise ValueError("class_labels must contain at least one sample") if n_batches <= 1: raise ValueError("confounded_batch_assignment requires n_batches >= 2") if not (0.0 <= confounding_with_class <= 1.0): raise ValueError("confounding_with_class must be in [0, 1]") # Map arbitrary labels → integer class indices 0..K-1 unique_classes, class_indices = np.unique(labels, return_inverse=True) n_classes = unique_classes.shape[0] # Base probabilities for each (class, batch) combination if proportions is None: base_probs = np.full((n_classes, n_batches), 1.0 / n_batches, dtype=float) base_proportions = np.full(n_batches, 1.0 / n_batches, dtype=float) else: base_proportions = _normalize_proportions(proportions, n_batches) base_probs = np.tile(base_proportions, (n_classes, 1)) # Apply confounding via redistribution confounded_probs = base_probs.copy() for cls_idx in range(n_classes): preferred_batch = cls_idx % n_batches base_pref = base_probs[cls_idx, preferred_batch] # Amount of probability mass to add to preferred batch boost = confounding_with_class * (1.0 - base_pref) if boost <= 0: # Nothing to do for this class (e.g., confounding_with_class == 0) continue # How much to subtract from each other batch (redistribute) reduction_per_batch = boost / (n_batches - 1) # Apply changes: add to preferred, subtract from others confounded_probs[cls_idx, preferred_batch] += boost for batch_id in range(n_batches): if batch_id != preferred_batch: confounded_probs[cls_idx, batch_id] -= reduction_per_batch # Clip to ensure non-negative (handle floating point precision) confounded_probs[cls_idx] = np.clip(confounded_probs[cls_idx], 0.0, 1.0) # Renormalize to exactly 1.0 (handle any clipping effects) row_sum = confounded_probs[cls_idx].sum() if row_sum <= 0: # Fallback: revert to base probabilities for this class confounded_probs[cls_idx] = base_probs[cls_idx] else: confounded_probs[cls_idx] /= row_sum # Assign batches per class index (0..K-1) batch_assignments = np.empty(n_samples, dtype=int) for cls_idx in range(n_classes): mask = class_indices == cls_idx n_in_class = int(mask.sum()) if n_in_class == 0: continue batch_assignments[mask] = rng.choice( n_batches, size=n_in_class, p=confounded_probs[cls_idx], ) return batch_assignments.astype(int)
# --------------------------------------------------------------------------- # Applying batch effects to feature matrices # ---------------------------------------------------------------------------
[docs] def apply_batch_effects( X: pd.DataFrame | NDArray[np.float64], batch_assignments: NDArray[np.int_], rng: np.random.Generator, effect_type: Literal["additive", "multiplicative"] = "additive", effect_strength: float = 0.5, affected_features: Sequence[int] | Literal["all"] = "all", effect_granularity: Literal["per_feature", "scalar"] = "per_feature", ) -> tuple[pd.DataFrame | NDArray[np.float64], NDArray[np.float64]]: """Apply batch effects to a feature matrix. Apply systematic batch effects to specified features in the dataset, simulating site differences or instrument variations. Args: X: Feature matrix (DataFrame or array). batch_assignments: Array of batch assignments per sample. rng: NumPy random number generator used to sample batch effects. effect_type: Type of batch effect to apply: - ``"additive"``: Adds a batch-specific shift to affected features. - ``"multiplicative"``: Multiplies affected features by a batch-specific scaling factor around 1.0. effect_strength: Standard deviation of the batch-effect distribution. Larger values generate stronger perturbations. Must be non-negative. affected_features: Defines which features are affected: - ``"all"``: Apply batch effects to all features. - Sequence[int]: Apply effects only to the listed feature indices. effect_granularity: Controls whether effects vary across features: - ``"per_feature"`` (default): Draws effects with shape ``(n_batches, n_affected_features)``, so features within the same batch differ in their batch-specific effect. - ``"scalar"``: Draws a single scalar per batch and applies it uniformly across all affected features. This corresponds to a global per-batch offset (additive) or global scaling factor (multiplicative). Returns: Tuple[pd.DataFrame | np.ndarray, np.ndarray]: A tuple ``(X_affected, batch_effects)`` where: - **X_affected**: The feature matrix with batch effects applied. The output type matches the input type (DataFrame or ndarray). - **batch_effects**: An array of length ``n_batches`` summarizing the effect applied in each batch: * For ``effect_granularity="scalar"``, these are the exact additive shifts (additive mode) or multiplicative factors minus 1.0 (multiplicative mode) drawn for each batch. * For ``"per_feature"``, these values are the mean additive effect or mean multiplicative deviation from 1.0 across all affected features in each batch. This provides a compact summary even though the full per-feature effects vary within batches. Raises: ValueError: If ``batch_assignments`` has the wrong shape, contains negative values, or ``effect_granularity`` is not one of the allowed strings. IndexError: If ``affected_features`` contains indices outside the valid feature range. Notes: - The generator state ``rng`` is advanced during sampling. - Additive effects are sampled from ``Normal(0, effect_strength)``. - Multiplicative effects are sampled from ``1 + Normal(0, effect_strength)``. - For DataFrame input, column names and index are preserved in the output. Examples: >>> rng = np.random.default_rng(42) >>> x = np.random.normal(size=(100, 5)) >>> batches = np.random.randint(0, 3, size=100) >>> >>> X_batch, batch_effects = apply_batch_effects( ... x, batches, rng, ... effect_type="additive", ... effect_strength=0.5, ... affected_features=[0, 2], ... effect_granularity="scalar", ... ) >>> X_batch.shape (100, 5) >>> batch_effects.shape (3,) """ # Convert to DataFrame if needed for consistent handling is_dataframe = isinstance(X, pd.DataFrame) X_df: pd.DataFrame | None = None feature_names: list[str] | None = None if is_dataframe: X_df = cast(pd.DataFrame, X) X_array = X_df.to_numpy(copy=True, dtype=float) feature_names = X_df.columns.tolist() else: X_array = np.asarray(X, dtype=float).copy() n_samples, n_features = X_array.shape batch_assignments = np.asarray(batch_assignments, dtype=int) if batch_assignments.shape[0] != n_samples: raise ValueError("batch_assignments must have length n_samples") if np.any(batch_assignments < 0): raise ValueError("batch_assignments must be non-negative integers") n_batches = int(batch_assignments.max()) + 1 # Determine which features to affect if affected_features == "all": feature_indices = np.arange(n_features, dtype=int) else: feature_indices = np.asarray(list(affected_features), dtype=int) if feature_indices.ndim != 1: feature_indices = feature_indices.ravel() if feature_indices.size == 0: # Nothing to do; return copy of X and an empty effects array batch_effects = np.zeros(n_batches, dtype=float) if is_dataframe and X_df is not None and feature_names is not None: return pd.DataFrame(X_array, columns=feature_names, index=X_df.index), batch_effects else: return X_array, batch_effects if np.any((feature_indices < 0) | (feature_indices >= n_features)): raise IndexError("affected_features contains indices out of range") # Quick path: no effect if float(effect_strength) == 0.0: batch_effects = np.zeros(n_batches, dtype=float) if is_dataframe and X_df is not None and feature_names is not None: return pd.DataFrame(X_array, columns=feature_names, index=X_df.index), batch_effects return X_array, batch_effects m = feature_indices.size if effect_granularity not in ("per_feature", "scalar"): raise ValueError("effect_granularity must be 'per_feature' or 'scalar'") # Draw and apply effects according to granularity if effect_type == "additive": if effect_granularity == "per_feature": # shape (n_batches, m) effects = rng.normal(loc=0.0, scale=float(effect_strength), size=(n_batches, m)) per_sample_effects = effects[batch_assignments] # (n_samples, m) X_array[:, feature_indices] += per_sample_effects batch_effects = effects.mean(axis=1).astype(float) else: # scalar per batch effects = rng.normal(loc=0.0, scale=float(effect_strength), size=n_batches) # (n_batches,) per_sample = effects[batch_assignments] # (n_samples,) X_array[:, feature_indices] += per_sample[:, None] # broadcast across features batch_effects = effects.astype(float) elif effect_type == "multiplicative": if effect_granularity == "per_feature": factors = 1.0 + rng.normal(loc=0.0, scale=float(effect_strength), size=(n_batches, m)) per_sample_factors = factors[batch_assignments] # (n_samples, m) X_array[:, feature_indices] *= per_sample_factors batch_effects = (factors - 1.0).mean(axis=1).astype(float) else: # scalar per batch factors = 1.0 + rng.normal(loc=0.0, scale=float(effect_strength), size=n_batches) # (n_batches,) per_sample = factors[batch_assignments] X_array[:, feature_indices] *= per_sample[:, None] batch_effects = (factors - 1.0).astype(float) else: raise ValueError(f"Unknown effect_type: {effect_type}") # Convert back to DataFrame if needed if is_dataframe and X_df is not None and feature_names is not None: X_result: pd.DataFrame | NDArray[np.float64] = pd.DataFrame(X_array, columns=feature_names, index=X_df.index) else: X_result = X_array return X_result, batch_effects
[docs] def apply_batch_effects_from_config( x: pd.DataFrame | NDArray[np.float64], y: NDArray[np.int_] | ArrayLike, batch_config: Any, rng: np.random.Generator, ) -> tuple[pd.DataFrame | NDArray[np.float64], NDArray[np.int_], NDArray[np.float64]]: """Apply batch effects based on a configuration object. High-level orchestration function that handles batch assignment and effect application in a single call. Args: x: Feature matrix (DataFrame or array). y: Class labels (for potential confounding). May be any array-like accepted by np.unique (e.g. integers or strings). batch_config: Configuration object with attributes: - n_batches: Number of batches (>= 1) - proportions: Optional batch size proportions - confounding_with_class: Degree of class-batch correlation in [0, 1] - effect_type: "additive" or "multiplicative" - effect_strength: Magnitude of batch effects - affected_features: "all", "informative", or list of indices rng: NumPy random generator (for reproducibility). Returns: Tuple of (X_affected, batch_labels, batch_effects): - X_affected: Feature matrix with batch effects applied - batch_labels: Array of batch assignments per sample - batch_effects: Random effects drawn per batch Notes: - Automatically handles confounding when confounding_with_class > 0. """ y_arr = np.asarray(y) n_samples = y_arr.shape[0] # Generate batch assignments (handles confounding internally) batch_labels = generate_batch_assignments( n_samples=n_samples, n_batches=batch_config.n_batches, rng=rng, proportions=batch_config.proportions, class_labels=y_arr if batch_config.confounding_with_class > 0.0 else None, confounding_with_class=batch_config.confounding_with_class, ) # Apply batch effects to feature matrix X_affected, batch_effects = apply_batch_effects( X=x, batch_assignments=batch_labels, rng=rng, effect_type=batch_config.effect_type, effect_strength=batch_config.effect_strength, affected_features=batch_config.affected_features, ) return X_affected, batch_labels, batch_effects