Source code for biomedical_data_generator.utils.sklearn_compat

# Copyright (c) 2025 Sigrun May,
# Ostfalia Hochschule für angewandte Wissenschaften
#
# This software is distributed under the terms of the MIT license
# which is available at https://opensource.org/licenses/MIT

"""Sklearn-like convenience wrapper around biomedical-data-generator.

This module provides a single entry point :func:`make_biomedical_dataset`
that mimics :func:`sklearn.datasets.make_classification` while mapping
cleanly to the new :class:`DatasetConfig` / :func:`generate_dataset`
API of :mod:`biomedical_data_generator`.

The goals are:

- Familiar, scikit-learn-style signature for quick experimentation.
- A *thin* translation layer to :class:`DatasetConfig`, so that users
  can "graduate" to the full configuration model once they need more
  control.
- Numpy / pandas outputs that plug directly into scikit-learn pipelines.
"""

from __future__ import annotations

from collections.abc import Sequence
from typing import Any

import numpy as np
import pandas as pd

from biomedical_data_generator.config import (
    BatchEffectsConfig,
    ClassConfig,
    CorrClusterConfig,
    DatasetConfig,
)
from biomedical_data_generator.generator import generate_dataset

# ---------------------------------------------------------------------------#
# Helper: translate total sample size + weights into per-class counts
# ---------------------------------------------------------------------------#


def _compute_class_sizes(
    n_samples: int,
    n_classes: int,
    weights: Sequence[float] | None,
) -> list[int]:
    """Translate total sample size + class weights into per-class counts.

    This mirrors scikit-learn semantics in a simplified way:

    - If ``weights is None``       → classes are (approximately) equally sized.
    - If ``weights`` is given      → must have length ``n_classes``.
                                     Values are normalized to sum to 1.0.

    Rounding is handled by assigning floor(n_samples * w_i) to each class
    and then distributing any remainder to the classes with the largest
    fractional parts.
    """
    if n_classes < 2:
        raise ValueError(f"n_classes must be >= 2, got {n_classes}.")

    if weights is None:
        # Equal-sized classes, remainder distributed to first classes
        base = n_samples // n_classes
        remainder = n_samples % n_classes
        return [base + (1 if i < remainder else 0) for i in range(n_classes)]

    if len(weights) != n_classes:
        raise ValueError(f"weights must have length n_classes={n_classes}, " f"got length {len(weights)}.")

    # Normalize to sum=1.0
    w = np.asarray(weights, dtype=float)
    if np.any(w < 0):
        raise ValueError(f"weights must be non-negative, got {weights}.")
    total = float(w.sum())
    if total <= 0:
        raise ValueError(f"Sum of weights must be > 0, got {total}.")
    w = w / total

    # Floor allocation + remainder distribution
    raw = w * n_samples
    counts = np.floor(raw).astype(int)
    remainder = n_samples - int(counts.sum())
    if remainder > 0:
        # Distribute remaining samples to classes with largest fractional parts
        frac = raw - counts
        for idx in np.argsort(frac)[::-1][:remainder]:
            counts[idx] += 1

    return counts.tolist()


# ---------------------------------------------------------------------------#
# Public API: sklearn-style dataset generator
# ---------------------------------------------------------------------------#


[docs] def make_biomedical_dataset( n_samples: int = 30, n_features: int = 200, n_informative: int = 5, n_redundant: int = 0, n_classes: int = 2, class_sep: float = 1.2, weights: tuple[float, ...] | None = None, random_state: int | None = 42, # Extensions beyond sklearn: n_noise: int = 0, noise_distribution: str = "normal", noise_distribution_params: dict[str, Any] | None = None, batch_effect: bool = False, n_batches: int = 1, batch_effect_strength: float = 0.5, confounding_with_class: float = 0.0, return_meta: bool = False, return_pandas: bool = False, **kwargs: Any, ) -> tuple[Any, Any] | tuple[Any, Any, object]: """Sklearn-like convenience wrapper around the biomedical-data-generator. Parameters broadly mirror :func:`sklearn.datasets.make_classification` where sensible, but are translated to the new :class:`DatasetConfig` / :func:`generate_dataset` design. Redundant features ------------------ ``n_redundant`` is implemented via a single correlated feature cluster: - One **informative anchor** (shared signal) - ``n_redundant`` **proxy** features that are strongly correlated (equicorrelated with a high ``correlation``) In terms of :class:`DatasetConfig`, this means: n_features = n_informative + n_noise + proxies_from_clusters and the proxies contributed by this wrapper are exactly ``n_redundant``. Notes: ----- - ``n_features`` must equal ``n_informative + n_redundant + n_noise`` in this wrapper (no repeated features). If ``n_noise == 0``, it is inferred as ``n_features - n_informative - n_redundant``. - If you pass ``corr_clusters`` explicitly via ``**kwargs``, then ``n_redundant`` **must be 0**; you are responsible for defining the cluster layout yourself in that advanced mode. By default the function returns ``(X, y)`` using NumPy arrays for broad compatibility with scikit-learn. Set ``return_pandas=True`` to obtain a ``DataFrame`` and ``Series`` instead. Set ``return_meta=True`` to additionally return the :class:`DatasetMeta` object. Returns: ------- (X, y) or (X, y, meta) Depending on ``return_meta``. ``X`` is a NumPy array or pandas ``DataFrame``; ``y`` is a NumPy array or pandas ``Series``. """ # ------------------------------------------------------------------ # 0) Corr-cluster handling & feature accounting mode # ------------------------------------------------------------------ explicit_corr_clusters = "corr_clusters" in kwargs and bool(kwargs["corr_clusters"]) if explicit_corr_clusters and n_redundant > 0: raise ValueError( "n_redundant cannot be used together with an explicit " "'corr_clusters' configuration. Either let the sklearn-style " "wrapper create a redundant cluster from n_redundant, or define " "all CorrClusterConfig instances yourself." ) if n_informative < 0 or n_redundant < 0 or n_noise < 0: raise ValueError( f"n_informative, n_redundant and n_noise must be >= 0, got " f"n_informative={n_informative}, n_redundant={n_redundant}, n_noise={n_noise}." ) # ------------------------------------------------------------------ # 1) Validate and resolve feature counts # ------------------------------------------------------------------ if explicit_corr_clusters: # Advanced mode: user provides full corr_clusters; we do not try to # infer n_noise from n_features because we do not know the number # of proxies contributed by those clusters. We let DatasetConfig # perform consistency checks instead. n_noise_effective = n_noise else: # Simple sklearn-like mode: we know exactly how many proxies we add: # proxies_from_clusters = n_redundant (one informative anchor cluster). base_required = n_informative + n_redundant if n_features < base_required: raise ValueError( "n_features must be >= n_informative + n_redundant; " f"got n_features={n_features}, " f"n_informative={n_informative}, n_redundant={n_redundant}." ) if n_noise == 0: # Infer remaining features as independent noise n_noise_effective = n_features - base_required else: n_noise_effective = n_noise if base_required + n_noise_effective != n_features: raise ValueError( "In this sklearn-style wrapper we currently support only " "free informative + correlated redundant (via clusters) " "+ independent noise features. " "Expected n_features == n_informative + n_redundant + n_noise, " f"got n_features={n_features}, n_informative={n_informative}, " f"n_redundant={n_redundant}, n_noise={n_noise_effective}." ) if n_noise_effective < 0: raise ValueError( f"Inferred n_noise would be negative ({n_noise_effective}). " "Check the combination of n_features, n_informative and n_redundant." ) # ------------------------------------------------------------------ # 2) Build class configuration (sizes + labels) # ------------------------------------------------------------------ class_sizes = _compute_class_sizes( n_samples=n_samples, n_classes=n_classes, weights=weights, ) class_configs: list[ClassConfig] = [ClassConfig(n_samples=int(sz)) for sz in class_sizes] # ------------------------------------------------------------------ # 3) Optional batch-effect configuration # ------------------------------------------------------------------ if batch_effect and n_batches > 1: batch_cfg: BatchEffectsConfig | None = BatchEffectsConfig( n_batches=n_batches, effect_strength=batch_effect_strength, effect_type="additive", confounding_with_class=confounding_with_class, affected_features="all", # simple wrapper: affect all features proportions=None, ) else: batch_cfg = None # ------------------------------------------------------------------ # 4) Optional correlated cluster for redundant features # ------------------------------------------------------------------ corr_clusters: list[CorrClusterConfig] = [] if not explicit_corr_clusters and n_redundant > 0: if n_informative < 1: raise ValueError( "n_redundant > 0 requires at least one informative feature " "to serve as the cluster anchor (n_informative >= 1)." ) # One informative anchor + n_redundant proxies → total cluster size n_cluster_features = 1 + n_redundant # Strong, but not perfect, equicorrelation to represent redundancy. redundant_cluster = CorrClusterConfig( n_cluster_features=n_cluster_features, structure="equicorrelated", correlation=0.9, anchor_role="informative", anchor_effect_size=None, # use DatasetConfig / informative defaults anchor_class=1 if n_classes > 1 else 0, label="sklearn_redundant_cluster", ) corr_clusters.append(redundant_cluster) # ------------------------------------------------------------------ # 5) Construct DatasetConfig # ------------------------------------------------------------------ cfg_kwargs: dict[str, Any] = { "n_informative": int(n_informative), "n_noise": int(n_noise_effective), "class_configs": class_configs, "class_sep": class_sep, # scalar → normalized by DatasetConfig validator "noise_distribution": noise_distribution, } if noise_distribution_params is not None: cfg_kwargs["noise_distribution_params"] = noise_distribution_params if batch_cfg is not None: cfg_kwargs["batch_effects"] = batch_cfg # First apply user kwargs (advanced mode); then we may append our cluster. cfg_kwargs.update(kwargs) # Attach automatically generated corr_clusters if we are in the simple # sklearn-style mode (no explicit non-empty corr_clusters from kwargs). if n_redundant > 0 and not explicit_corr_clusters: existing = cfg_kwargs.get("corr_clusters") if existing is None: # No user-provided clusters at all → use our cluster list. cfg_kwargs["corr_clusters"] = corr_clusters else: # User may have passed corr_clusters=None or []. # Treat that as "no clusters yet" and append our redundant cluster. if not isinstance(existing, list): raise TypeError( "corr_clusters must be a list of CorrClusterConfig or dicts; " f"got {type(existing).__name__}." ) existing_extended = list(existing) existing_extended.extend(corr_clusters) cfg_kwargs["corr_clusters"] = existing_extended cfg = DatasetConfig( random_state=random_state, **cfg_kwargs, ) # ------------------------------------------------------------------ # 6) Generate data via the core API # ------------------------------------------------------------------ X, y, meta = generate_dataset(cfg, return_dataframe=return_pandas) # Convert y to Series if pandas output is requested if return_pandas: if not isinstance(X, pd.DataFrame): # Defensive: generate_dataset should already have returned a DataFrame X = pd.DataFrame(X) y_out: Any = pd.Series(y, name="target") else: y_out = y if return_meta: return X, y_out, meta return X, y_out