# Copyright (c) 2025 Sigrun May,
# Ostfalia Hochschule für angewandte Wissenschaften
#
# This software is distributed under the terms of the MIT license
# which is available at https://opensource.org/licenses/MIT
"""Generator for synthetic classification datasets with correlated feature clusters."""
from __future__ import annotations
import numpy as np
import pandas as pd
from .config import DatasetConfig
from .effects.batch import apply_batch_effects_from_config
from .features.correlated import sample_all_correlated_clusters
from .features.informative import generate_informative_features
from .meta import DatasetMeta
from .utils.sampling import sample_distribution
def _make_names_and_roles(
cfg: DatasetConfig,
*,
n_cluster_cols: int,
n_inf_cols: int,
n_noise_cols: int,
) -> tuple[
list[str], # names
list[int], # informative_idx (anchors + free informative)
list[int], # noise_idx (independent/free noise only)
dict[int, list[int]], # cluster_indices[cid] -> list of column indices
dict[int, int | None], # anchor_idx[cid] -> anchor column (or None)
]:
"""Build feature names and role indices for the final concatenated matrix.
We assume that :func:`generate_dataset` has constructed the feature
matrix ``x`` by horizontal concatenation in the following order::
x = [x_informative | x_clusters | x_noise]
where
* ``x_informative`` contains **only free informative features**
(no cluster anchors),
* ``x_clusters`` contains, for each :class:`CorrClusterConfig`,
the **anchor** (first column of the block) followed by its
**proxy features**,
* ``x_noise`` contains **only independent/free noise features**
(no cluster anchors).
This function is purely book-keeping:
* it assigns human-readable feature names,
* it returns index lists for informative and (independent) noise
features,
* it provides cluster layouts via ``cluster_indices`` and
``anchor_idx`` so that proxy indices can be derived later as
``set(cluster_indices[cid]) - {anchor_idx[cid]}``.
Args:
cfg: Resolved :class:`DatasetConfig` used for generation.
n_cluster_cols: Number of columns in ``x_clusters``.
n_inf_cols: Number of columns in ``x_informative``.
n_noise_cols: Number of columns in ``x_noise``.
Returns:
names, informative_idx, noise_idx, cluster_indices, anchor_idx
"""
names: list[str] = []
informative_idx: list[int] = []
noise_idx: list[int] = []
cluster_indices: dict[int, list[int]] = {}
anchor_idx: dict[int, int | None] = {}
# -------------------------------------------------------------
# Sanity checks: shapes from generator vs. structural config
# -------------------------------------------------------------
clusters = cfg.corr_clusters or []
expected_cluster_cols = sum(int(c.n_cluster_features) for c in clusters)
if n_cluster_cols != expected_cluster_cols:
raise ValueError(
"Mismatch between x_clusters.shape[1] and corr_clusters definition: "
f"x_clusters has {n_cluster_cols} columns, but corr_clusters imply "
f"{expected_cluster_cols} columns."
)
# Expected number of free informative / noise features from config
n_inf_free_expected = cfg.n_informative_free
n_noise_free_expected = cfg.n_noise_free
if n_inf_cols != n_inf_free_expected:
raise ValueError(
"generate_informative_features must produce exactly "
f"cfg.n_informative_free={n_inf_free_expected} columns, "
f"but returned {n_inf_cols}."
)
if n_noise_cols != n_noise_free_expected:
raise ValueError(
"The noise block must contain exactly cfg.n_noise_free "
f"={n_noise_free_expected} independent noise features, "
f"but x_noise has {n_noise_cols} columns."
)
total_cols = n_inf_cols + n_cluster_cols + n_noise_cols
if total_cols != cfg.n_features:
raise ValueError(
"Total number of columns in X does not match cfg.n_features. "
f"Got {total_cols} columns from generator but cfg.n_features="
f"{cfg.n_features}."
)
# -------------------------------------------------------------
# 1) Free informative features: block [0, n_inf_cols)
# -------------------------------------------------------------
for j in range(n_inf_cols):
col = j
if cfg.prefixed_feature_naming:
names.append(f"{cfg.prefix_informative}{j + 1}")
else:
names.append(f"feature_{len(names) + 1}")
# All columns in x_informative are informative
informative_idx.append(col)
# -------------------------------------------------------------
# 2) Correlated clusters: block [n_inf_cols, n_inf_cols + n_cluster_cols)
# One contiguous block per CorrClusterConfig, in config order.
# -------------------------------------------------------------
current = n_inf_cols
for cid, cluster_cfg in enumerate(clusters):
k = int(cluster_cfg.n_cluster_features)
cols = list(range(current, current + k))
cluster_indices[cid] = cols
# Anchor is always the first column of the block
anchor_col = cols[0]
anchor_idx[cid] = anchor_col
# Name anchor (display: 1-based with cid+1)
if cfg.prefixed_feature_naming:
if cluster_cfg.anchor_role == "informative":
anchor_name = f"{cfg.prefix_corr}{cid + 1}_anchor" # corr1_anchor, corr2_anchor, ...
else:
anchor_name = f"{cfg.prefix_corr}{cid + 1}_1" # corr1_1, corr2_1, ...
else:
anchor_name = f"feature_{len(names) + 1}"
names.append(anchor_name)
# Mark anchor as informative if requested
if cluster_cfg.anchor_role == "informative":
informative_idx.append(anchor_col)
# Name proxy features (never added to informative_idx / noise_idx)
for offset, col in enumerate(cols[1:], start=2):
if cfg.prefixed_feature_naming:
proxy_name = f"{cfg.prefix_corr}{cid + 1}_{offset}" # corr1_2, corr1_3, ...
else:
proxy_name = f"feature_{len(names) + 1}"
names.append(proxy_name)
current += k
# -------------------------------------------------------------
# 3) Independent / free noise: block at the end
# -------------------------------------------------------------
noise_start = n_inf_cols + n_cluster_cols
for j in range(n_noise_cols):
col = noise_start + j
if cfg.prefixed_feature_naming:
names.append(f"{cfg.prefix_noise}{j + 1}")
else:
names.append(f"feature_{len(names) + 1}")
noise_idx.append(col)
# Final consistency check
if len(names) != total_cols:
raise AssertionError(
"Internal inconsistency in _make_names_and_roles: constructed "
f"{len(names)} names, but expected {total_cols}."
)
# Informative feature count in X (anchors + free informative) must match cfg.n_informative
if len(informative_idx) != cfg.n_informative:
raise AssertionError(
"Mismatch between cfg.n_informative and resolved informative indices: "
f"cfg.n_informative={cfg.n_informative}, but informative_idx has "
f"{len(informative_idx)} entries."
)
return names, informative_idx, noise_idx, cluster_indices, anchor_idx
# =================
# Public generator
# =================
[docs]
def generate_dataset(cfg, return_dataframe=True) -> tuple[pd.DataFrame | np.ndarray, np.ndarray, DatasetMeta]:
"""Generate synthetic biomedical dataset with specified feature structure.
Creates a classification dataset with configurable informative features, noise,
correlated feature clusters (e.g., biological pathways), and optional batch effects.
Args:
cfg: Configuration object defining dataset the structure. See :class:`~biomedical_data_generator.config /
:class:`~biomedical_data_generator.config.DatasetConfig` for details.
return_dataframe: If ``True``, return features as a :class:`pandas.DataFrame`
with named columns. If ``False``, return as a NumPy array.
Returns:
tuple: A 3-tuple containing:
- **x** (:class:`pandas.DataFrame` or :class:`numpy.ndarray`):
Feature matrix of shape ``(n_samples, n_features)``. Each row represents one sample (e.g., patient),
each column represents one feature (e.g., biomarker, gene expression value). When returned
as DataFrame, column names depend on ``cfg.feature_naming``: "prefixed" (default)
uses type-based prefixes (``i`` for informative, ``corr`` for correlated
clusters, ``n`` for noise), yielding names like ``i1, corr1_anchor, n1``.
"sequential" uses sequential numbering ``feature_1, feature_2, ...``.
- **y** (:class:`numpy.ndarray`):
Class labels of shape ``(n_samples,)`` with integer values
``0, 1, ..., n_classes-1``.
- **meta** (:class:`DatasetMeta`):
Metadata object containing feature masks (informative, correlated, noise,
batch-specific), correlation block specifications, batch assignments,
and complete generation configuration.
Examples:
>>> from biomedical_data_generator.config import DatasetConfig, ClassConfig
>>> data_cfg_1 = DatasetConfig(
... n_informative=5,
... n_noise=10,
... class_configs=[ClassConfig(n_samples=100, label="healthy"),
... ClassConfig(n_samples=100, label="diseased")],
... random_state=42
... )
>>> x1, y1, meta_data1 = generate_dataset(data_cfg_1)
"""
rng_global = np.random.default_rng(cfg.random_state)
# ================================================================
# STEP 1: Generate informative features + labels (with shifts)
# ================================================================
x_informative, y = generate_informative_features(cfg, rng_global)
# Returns SHIFTED features (class separation already applied)
# ================================================================
# STEP 2: Generate correlated clusters (with anchor shifts)
# ================================================================
x_clusters, cluster_meta = sample_all_correlated_clusters(cfg=cfg, y=y, rng=rng_global)
# Returns clusters with anchor shifts already applied
# ================================================================
# STEP 3: Generate noise features
# ================================================================
x_noise = sample_distribution(
distribution=cfg.noise_distribution,
params=cfg.noise_distribution_params,
rng=rng_global,
size=(cfg.n_samples, cfg.n_noise),
)
# ================================================================
# STEP 4: Concatenate all feature blocks
# ================================================================
x = np.concatenate([x_informative, x_clusters, x_noise], axis=1)
# ================================================================
# STEP 5: Apply batch effects (technical overlay)
# ================================================================
batch_labels = None
batch_effects = None
if cfg.batch_effects is not None and cfg.batch_effects.n_batches > 1:
x, batch_labels, batch_effects = apply_batch_effects_from_config(
x=x,
y=y,
batch_config=cfg.batch_effects,
rng=rng_global,
)
# ================================================================
# STEP 6: Build names and role indices (knows final structure)
# ================================================================
names, inf_idx, noi_idx, cluster_idx, anch_idx = _make_names_and_roles(
cfg,
n_cluster_cols=x_clusters.shape[1],
n_inf_cols=x_informative.shape[1],
n_noise_cols=x_noise.shape[1],
)
# ================================================================
# STEP 7: Build metadata
# ================================================================
counts = np.bincount(y, minlength=cfg.n_classes)
meta = DatasetMeta(
feature_names=names,
informative_idx=inf_idx,
noise_idx=noi_idx,
corr_cluster_indices=cluster_idx,
anchor_idx=anch_idx,
anchor_role=cluster_meta["anchor_role"],
anchor_effect_size=cluster_meta["anchor_effect_size"],
anchor_class=cluster_meta["anchor_class"],
cluster_label=cluster_meta["label"],
n_classes=cfg.n_classes,
class_names=cfg.class_labels,
samples_per_class={int(k): int(counts[k]) for k in range(cfg.n_classes)},
class_sep=cfg.class_sep,
corr_between=cfg.corr_between,
batch_labels=batch_labels,
batch_effects=batch_effects,
batch_config=cfg.batch_effects.model_dump() if cfg.batch_effects is not None else None,
random_state=cfg.random_state,
resolved_config=cfg.model_dump(),
)
if return_dataframe:
return pd.DataFrame(x, columns=names), y, meta
return x, y, meta