Skip to content

ensemble

Ensemble methods for combining multiple models.

Overview

Ensemble approaches for:

  • Outlier detection voting
  • Imputation averaging
  • Classification ensembles

ensemble_anomaly_detection

Anomaly detection ensemble module.

Provides functionality to combine outlier detection masks from multiple models using majority voting and compute ensemble metrics across difficulty levels.

Cross-references: - src/anomaly_detection/outlier_sklearn.py for metric computation - src/ensemble/ensemble_utils.py for run retrieval

get_anomaly_results_per_model

get_anomaly_results_per_model(
    model_name: str,
    outlier_artifacts: dict,
    pred_masks: dict,
    labels: dict,
    idx: int,
)

Extract prediction masks and labels from model artifacts.

Handles different artifact structures for various model types (TimesNet, UniTS, MOMENT, sklearn-based).

PARAMETER DESCRIPTION
model_name

Name of the model architecture.

TYPE: str

outlier_artifacts

Loaded artifacts from MLflow.

TYPE: dict

pred_masks

Accumulated prediction masks (modified in place).

TYPE: dict

labels

Accumulated ground truth labels (modified in place).

TYPE: dict

idx

Index of current model (for logging).

TYPE: int

RETURNS DESCRIPTION
dict

Updated prediction masks.

dict

Updated labels.

Source code in src/ensemble/ensemble_anomaly_detection.py
def get_anomaly_results_per_model(
    model_name: str, outlier_artifacts: dict, pred_masks: dict, labels: dict, idx: int
):
    """
    Extract prediction masks and labels from model artifacts.

    Handles different artifact structures for various model types
    (TimesNet, UniTS, MOMENT, sklearn-based).

    Parameters
    ----------
    model_name : str
        Name of the model architecture.
    outlier_artifacts : dict
        Loaded artifacts from MLflow.
    pred_masks : dict
        Accumulated prediction masks (modified in place).
    labels : dict
        Accumulated ground truth labels (modified in place).
    idx : int
        Index of current model (for logging).

    Returns
    -------
    dict
        Updated prediction masks.
    dict
        Updated labels.
    """
    if model_name == "TimesNet" or model_name == "TimesNet-orig":
        arrays = outlier_artifacts["results_best"]["outlier_train"]["results_dict"][
            "split_results"
        ]["arrays"]
        pred_masks["train"].append(arrays["pred_mask"])
        labels["train"] = arrays["trues"]
        arrays = outlier_artifacts["results_best"]["outlier_test"]["results_dict"][
            "split_results"
        ]["arrays"]
        pred_masks["test"].append(arrays["pred_mask"])
        labels["test"] = arrays["trues"]
    elif "UniTS-Outlier" in model_name:
        arrays = outlier_artifacts["results"]["outlier_train"]["arrays"]
        pred_masks["train"].append(arrays["pred_mask"])
        labels["train"] = arrays["trues"]
        arrays = outlier_artifacts["results"]["outlier_test"]["arrays"]
        pred_masks["test"].append(arrays["pred_mask"])
        labels["test"] = arrays["trues"]
    elif model_name == "MOMENT":
        arrays = outlier_artifacts["preds"]["outlier_train"]["arrays"]
        pred_masks["train"].append(arrays["pred_mask"].astype(int))
        # labels["train"] = arrays["trues"] # No labels
        arrays = outlier_artifacts["preds"]["outlier_test"]["arrays"]
        pred_masks["test"].append(arrays["pred_mask"].astype(int))
        # labels["test"] = arrays["trues"] # No labels

    else:  # sklearn models
        pred_masks["train"].append(outlier_artifacts["train"]["arrays"]["pred_mask"])
        pred_masks["test"].append(outlier_artifacts["test"]["arrays"]["pred_mask"])
        labels["train"] = outlier_artifacts["train"]["arrays"]["trues"]
        labels["test"] = outlier_artifacts["test"]["arrays"]["trues"]

    # we collect numpy arrays to a list of n submodels, thus this should be the same as the index of for loop
    # no_of_masks = len(pred_masks["test"])

    pred_mask_sum = np.sum(pred_masks["test"][idx])
    pred_mask_size = np.size(pred_masks["test"][idx])
    pred_mask_percent = 100 * (pred_mask_sum / pred_mask_size)
    logger.info(f"{model_name} {pred_mask_percent:.3f}% of model predictions are TRUE")

    label_mask_sum = np.sum(labels["test"])
    label_mask_size = np.size(labels["test"])
    label_mask_percent = 100 * (label_mask_sum / label_mask_size)
    logger.info(
        f"{model_name} {label_mask_percent:.3f}% of ground truth labels are TRUE (should be same for all models)"
    )

    return pred_masks, labels

write_granular_outlier_metrics

write_granular_outlier_metrics(metrics)

Log granular outlier metrics to MLflow.

Writes metrics at different difficulty levels (easy, medium, hard) for both train and test splits.

PARAMETER DESCRIPTION
metrics

Dictionary of metrics per split and granularity level.

TYPE: dict

Source code in src/ensemble/ensemble_anomaly_detection.py
def write_granular_outlier_metrics(metrics):
    """
    Log granular outlier metrics to MLflow.

    Writes metrics at different difficulty levels (easy, medium, hard)
    for both train and test splits.

    Parameters
    ----------
    metrics : dict
        Dictionary of metrics per split and granularity level.
    """
    if "outlier_test" not in metrics:
        metrics["outlier_test"] = metrics["test"]
        metrics["outlier_train"] = metrics["train"]

    for split in metrics:
        for granularity in metrics[split]:
            granular_scalars = metrics[split][granularity]["scalars"]["global"]
            for metric, value in granular_scalars.items():
                if value is not None:
                    if granularity == "all":
                        granularity_out = ""
                    else:
                        granularity_out = "__" + granularity

                    if isinstance(value, np.ndarray):
                        mlflow.log_metric(
                            f"{split}/{metric}_lo{granularity_out}", value[0]
                        )
                        mlflow.log_metric(
                            f"{split}/{metric}_hi{granularity_out}", value[1]
                        )
                    else:
                        mlflow.log_metric(f"{split}/{metric}{granularity_out}", value)
                        logger.debug(f"{split}/{metric}{granularity_out}: {value}")

get_granular_outlier_metrics

get_granular_outlier_metrics(data_dict, pred_masks, idx)

Compute outlier metrics at different difficulty levels.

PARAMETER DESCRIPTION
data_dict

Source data containing difficulty level information.

TYPE: dict

pred_masks

Prediction masks per split.

TYPE: dict

idx

Index of model to evaluate.

TYPE: int

RETURNS DESCRIPTION
dict

Metrics per split and difficulty level.

Source code in src/ensemble/ensemble_anomaly_detection.py
def get_granular_outlier_metrics(data_dict, pred_masks, idx):
    """
    Compute outlier metrics at different difficulty levels.

    Parameters
    ----------
    data_dict : dict
        Source data containing difficulty level information.
    pred_masks : dict
        Prediction masks per split.
    idx : int
        Index of model to evaluate.

    Returns
    -------
    dict
        Metrics per split and difficulty level.
    """
    metrics = {}
    for split in pred_masks.keys():
        preds = pred_masks[split][idx]
        assert isinstance(preds, np.ndarray)
        metrics[split] = eval_on_all_outlier_difficulty_levels(
            data_dict, preds, split.replace("outlier_", "")
        )
    return metrics

compute_granular_metrics

compute_granular_metrics(
    run_id,
    mlflow_run: Series,
    model_name: str,
    pred_masks: dict,
    idx: int,
    sources: dict,
    debug_verbose: bool = True,
)

Compute and log granular outlier metrics if not already computed.

Checks if granular metrics exist in MLflow, computes them if not, and logs them to the original run.

PARAMETER DESCRIPTION
run_id

MLflow run ID.

TYPE: str

mlflow_run

MLflow run data.

TYPE: Series

model_name

Name of the model.

TYPE: str

pred_masks

Prediction masks per split.

TYPE: dict

idx

Index of current model.

TYPE: int

sources

Source data.

TYPE: dict

debug_verbose

If True, log debug information.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
dict

Computed metrics.

Source code in src/ensemble/ensemble_anomaly_detection.py
def compute_granular_metrics(
    run_id,
    mlflow_run: pd.Series,
    model_name: str,
    pred_masks: dict,
    idx: int,
    sources: dict,
    debug_verbose: bool = True,
):
    """
    Compute and log granular outlier metrics if not already computed.

    Checks if granular metrics exist in MLflow, computes them if not,
    and logs them to the original run.

    Parameters
    ----------
    run_id : str
        MLflow run ID.
    mlflow_run : pd.Series
        MLflow run data.
    model_name : str
        Name of the model.
    pred_masks : dict
        Prediction masks per split.
    idx : int
        Index of current model.
    sources : dict
        Source data.
    debug_verbose : bool, default True
        If True, log debug information.

    Returns
    -------
    dict
        Computed metrics.
    """
    if "metrics.train/f1__easy_DEBUG" not in mlflow_run:
        # computing the granular metrics as they were not initially computed during the run
        metrics = get_granular_outlier_metrics(sources, pred_masks, idx)
        if debug_verbose:
            pred_mask_sum = np.sum(pred_masks["test"][idx])
            pred_mask_size = np.size(pred_masks["test"][idx])
            pred_mask_percent = 100 * (pred_mask_sum / pred_mask_size)
            logger.info(f"{model_name} {pred_mask_percent:.3f}% of pred_mask")

        # re-open the original mlflow_run and write the values
        with mlflow.start_run(run_id=run_id):
            write_granular_outlier_metrics(metrics)
            mlflow.end_run()

    else:
        logger.info("Granular metrics already computed")

    return metrics

get_anomaly_masks_and_labels

get_anomaly_masks_and_labels(
    ensembled_output: dict, sources: dict
)

Load and aggregate anomaly masks from all ensemble submodels.

PARAMETER DESCRIPTION
ensembled_output

Dictionary mapping model names to MLflow run data.

TYPE: dict

sources

Source data.

TYPE: dict

RETURNS DESCRIPTION
dict

Stacked prediction masks (n_models x n_subjects x n_timepoints).

dict

Ground truth labels (from last model, should be same for all).

Source code in src/ensemble/ensemble_anomaly_detection.py
def get_anomaly_masks_and_labels(ensembled_output: dict, sources: dict):
    """
    Load and aggregate anomaly masks from all ensemble submodels.

    Parameters
    ----------
    ensembled_output : dict
        Dictionary mapping model names to MLflow run data.
    sources : dict
        Source data.

    Returns
    -------
    dict
        Stacked prediction masks (n_models x n_subjects x n_timepoints).
    dict
        Ground truth labels (from last model, should be same for all).
    """
    pred_masks = {"train": [], "test": []}
    labels = {"train": [], "test": []}
    # assert (
    #     len(ensembled_output) % 2 != 0
    # ), "The number of models to ensemble must be odd"
    for idx, (model_name, mlflow_run) in enumerate(ensembled_output.items()):
        run_id = mlflow_run["run_id"]
        if isinstance(run_id, pd.Series):
            run_id = run_id.values[0]
        run_name = mlflow_run["tags.mlflow.runName"]
        if isinstance(run_name, pd.Series):
            run_name = run_name.values[0]

        if "MOMENT" in model_name:
            # quick'n'dirty fix for all the different MOMENT variants still being saved as MOMENT in the artifacts
            model_name = "MOMENT"
        if "TimesNet" in model_name:
            # quick'n'dirty fix for all the different MOMENT variants still being saved as MOMENT in the artifacts
            model_name = "TimesNet"

        from src.anomaly_detection.anomaly_utils import get_artifact

        outlier_artifacts_path = get_artifact(
            run_id, run_name, model_name, subdir="outlier_detection"
        )
        logger.info(
            f'Load ({run_id}) artifact file "{os.path.split(outlier_artifacts_path)[1]}" from "{outlier_artifacts_path}"'
        )
        outlier_artifacts = load_results_dict(outlier_artifacts_path)
        try:
            pred_masks, labels = get_anomaly_results_per_model(
                model_name, outlier_artifacts, pred_masks, labels, idx
            )
        except KeyError as e:
            logger.error(
                "Either harmonize the outputs of the models, or add parsing for the new model"
            )
            raise e

        _ = compute_granular_metrics(
            run_id, mlflow_run, model_name, pred_masks, idx, sources
        )

    # concantenate the results to a 3d numpy array
    pred_masks["train"] = np.stack(
        pred_masks["train"], axis=0
    )  # (no_of_models, no_of_patients, no_of_timepoints)
    pred_masks["test"] = np.stack(
        pred_masks["test"], axis=0
    )  # (no_of_models, no_of_patients, no_of_timepoints)

    return pred_masks, labels

ensemble_masks

ensemble_masks(preds_3d: ndarray, method: str = 'over_0.5')

Combine prediction masks using averaging and thresholding.

PARAMETER DESCRIPTION
preds_3d

Stacked prediction masks (n_models x n_subjects x n_timepoints).

TYPE: ndarray

method

Thresholding method: - 'over_0.5': Majority voting (>50% agreement) - 'over_0': Any positive vote

TYPE: str DEFAULT: 'over_0.5'

RETURNS DESCRIPTION
ndarray

Ensembled binary mask (n_subjects x n_timepoints).

Source code in src/ensemble/ensemble_anomaly_detection.py
def ensemble_masks(preds_3d: np.ndarray, method: str = "over_0.5"):
    """
    Combine prediction masks using averaging and thresholding.

    Parameters
    ----------
    preds_3d : np.ndarray
        Stacked prediction masks (n_models x n_subjects x n_timepoints).
    method : str, default 'over_0.5'
        Thresholding method:
        - 'over_0.5': Majority voting (>50% agreement)
        - 'over_0': Any positive vote

    Returns
    -------
    np.ndarray
        Ensembled binary mask (n_subjects x n_timepoints).
    """
    preds_2d_float = np.mean(preds_3d, axis=0)
    if method == "over_0.5":
        preds_2d = (preds_2d_float > 0.5).astype(int)
    elif method == "over_0":
        # instead of averaging, and requiring the output to be over 0.5, you could binarize the output
        # so that the outlier is True when it is non-zero, more aggressive and you get more false positives
        preds_2d = (preds_2d_float > 0).astype(int)
    else:
        logger.error(
            "Unknown method for ensembling the masks, method: {}".format(method)
        )
        raise ValueError(
            "Unknown method for ensembling the masks, method: {}".format(method)
        )
    return preds_2d

compute_ensemble_anomaly_metrics

compute_ensemble_anomaly_metrics(
    pred_masks, labels, sources
)

Compute anomaly detection metrics for ensemble predictions.

Applies majority voting to combine masks, then evaluates at all difficulty levels.

PARAMETER DESCRIPTION
pred_masks

Stacked 3D prediction masks per split.

TYPE: dict

labels

Ground truth labels per split.

TYPE: dict

sources

Source data with difficulty level information.

TYPE: dict

RETURNS DESCRIPTION
dict

Metrics per split and difficulty level.

See Also

src.anomaly_detection.anomaly_detection_metrics_wrapper.metrics_per_split

Source code in src/ensemble/ensemble_anomaly_detection.py
def compute_ensemble_anomaly_metrics(pred_masks, labels, sources):
    """
    Compute anomaly detection metrics for ensemble predictions.

    Applies majority voting to combine masks, then evaluates
    at all difficulty levels.

    Parameters
    ----------
    pred_masks : dict
        Stacked 3D prediction masks per split.
    labels : dict
        Ground truth labels per split.
    sources : dict
        Source data with difficulty level information.

    Returns
    -------
    dict
        Metrics per split and difficulty level.

    See Also
    --------
    src.anomaly_detection.anomaly_detection_metrics_wrapper.metrics_per_split
    """
    assert len(pred_masks["train"].shape) == 3, (
        "The pred_masks should be a 3D numpy array"
    )
    assert len(pred_masks["test"].shape) == 3, (
        "The pred_masks should be a 3D numpy array"
    )

    metrics = {}
    for split in labels.keys():
        preds_3d = pred_masks[split]
        preds_2d = ensemble_masks(preds_3d)
        # labels_array = labels[split]
        metrics[split] = eval_on_all_outlier_difficulty_levels(
            sources, preds_2d, split.replace("outlier_", "")
        )
        # metrics[split] = get_scalar_outlier_metrics(preds=preds_2d, gt=labels_array)

    return metrics

ensemble_anomaly_detection

ensemble_anomaly_detection(
    ensembled_output: dict,
    cfg: DictConfig,
    experiment_name: str,
    ensemble_name: str,
    sources: dict,
)

Create anomaly detection ensemble from multiple models.

Main entry point for anomaly detection ensembling. Loads masks from submodels, combines them via majority voting, and computes metrics.

PARAMETER DESCRIPTION
ensembled_output

Dictionary mapping model names to MLflow run data.

TYPE: dict

cfg

Main Hydra configuration.

TYPE: DictConfig

experiment_name

MLflow experiment name.

TYPE: str

ensemble_name

Name for the ensemble.

TYPE: str

sources

Source data.

TYPE: dict

RETURNS DESCRIPTION
dict

Ensemble metrics per split and difficulty level.

dict

Stacked prediction masks from all submodels.

Source code in src/ensemble/ensemble_anomaly_detection.py
def ensemble_anomaly_detection(
    ensembled_output: dict,
    cfg: DictConfig,
    experiment_name: str,
    ensemble_name: str,
    sources: dict,
):
    """
    Create anomaly detection ensemble from multiple models.

    Main entry point for anomaly detection ensembling. Loads masks from
    submodels, combines them via majority voting, and computes metrics.

    Parameters
    ----------
    ensembled_output : dict
        Dictionary mapping model names to MLflow run data.
    cfg : DictConfig
        Main Hydra configuration.
    experiment_name : str
        MLflow experiment name.
    ensemble_name : str
        Name for the ensemble.
    sources : dict
        Source data.

    Returns
    -------
    dict
        Ensemble metrics per split and difficulty level.
    dict
        Stacked prediction masks from all submodels.
    """
    pred_masks, labels = get_anomaly_masks_and_labels(ensembled_output, sources)
    metrics = compute_ensemble_anomaly_metrics(pred_masks, labels, sources)

    return metrics, pred_masks

ensemble_classification

Classification ensemble module.

Provides functionality to aggregate predictions from multiple classification models and compute ensemble metrics with bootstrap confidence intervals.

Cross-references: - src/ensemble/ensemble_utils.py for run retrieval - src/classification/bootstrap_evaluation.py for metric computation

import_model_metrics

import_model_metrics(
    run_id: str,
    run_name: str,
    model_name: str,
    subdir: str = "baseline_model",
) -> dict[str, Any]

Load model metrics from MLflow artifact storage.

PARAMETER DESCRIPTION
run_id

MLflow run ID.

TYPE: str

run_name

MLflow run name.

TYPE: str

model_name

Name of the model (for artifact path construction).

TYPE: str

subdir

Subdirectory within artifacts to load from.

TYPE: str DEFAULT: 'baseline_model'

RETURNS DESCRIPTION
dict

Dictionary containing model artifacts and metrics.

Source code in src/ensemble/ensemble_classification.py
def import_model_metrics(
    run_id: str, run_name: str, model_name: str, subdir: str = "baseline_model"
) -> dict[str, Any]:
    """
    Load model metrics from MLflow artifact storage.

    Parameters
    ----------
    run_id : str
        MLflow run ID.
    run_name : str
        MLflow run name.
    model_name : str
        Name of the model (for artifact path construction).
    subdir : str, default 'baseline_model'
        Subdirectory within artifacts to load from.

    Returns
    -------
    dict
        Dictionary containing model artifacts and metrics.
    """
    artifacts_path = get_artifact(run_id, run_name, model_name, subdir=subdir)
    artifacts = load_results_dict(artifacts_path)
    return artifacts

get_preds_and_labels_from_artifacts

get_preds_and_labels_from_artifacts(
    artifacts: dict[str, Any],
) -> tuple[
    dict[str, ndarray],
    dict[str, ndarray],
    dict[str, ndarray],
]

Extract predictions and labels from model artifacts.

Retrieves mean predictions and variance from subjectwise statistics, which are averaged across bootstrap iterations.

PARAMETER DESCRIPTION
artifacts

Dictionary containing model artifacts with 'subjectwise_stats'.

TYPE: dict

RETURNS DESCRIPTION
dict

Mean predicted probabilities per split.

dict

Variance of predicted probabilities per split.

dict

Ground truth labels per split.

Source code in src/ensemble/ensemble_classification.py
def get_preds_and_labels_from_artifacts(
    artifacts: dict[str, Any],
) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray], dict[str, np.ndarray]]:
    """
    Extract predictions and labels from model artifacts.

    Retrieves mean predictions and variance from subjectwise statistics,
    which are averaged across bootstrap iterations.

    Parameters
    ----------
    artifacts : dict
        Dictionary containing model artifacts with 'subjectwise_stats'.

    Returns
    -------
    dict
        Mean predicted probabilities per split.
    dict
        Variance of predicted probabilities per split.
    dict
        Ground truth labels per split.
    """
    # NOTE! to make things simpler, we get the data from the stats subdict which are averaged from n bootstrap iters
    # this does not account the case in which you did not use same number of bootstrap iters (if you care), assuming
    # that all the models used the same number of iterations
    y_pred_proba, y_pred_proba_var, label = {}, {}, {}
    for split in artifacts["subjectwise_stats"].keys():
        y_pred_proba[split] = artifacts["subjectwise_stats"][split]["preds"][
            "y_pred_proba"
        ]["mean"]
        y_pred_proba_var[split] = (
            artifacts["subjectwise_stats"][split]["preds"]["y_pred_proba"]["std"] ** 2
        )
        label[split] = artifacts["subjectwise_stats"][split]["preds"]["label"]["mean"]

    return y_pred_proba, y_pred_proba_var, label

import_model_preds_and_labels

import_model_preds_and_labels(
    run_id: str,
    run_name: str,
    model_name: str,
    subdir: str = "metrics",
) -> tuple[
    dict[str, ndarray],
    dict[str, ndarray],
    dict[str, ndarray],
]

Load predictions and labels from MLflow run.

PARAMETER DESCRIPTION
run_id

MLflow run ID.

TYPE: str

run_name

MLflow run name.

TYPE: str

model_name

Name of the model.

TYPE: str

subdir

Subdirectory within artifacts.

TYPE: str DEFAULT: 'metrics'

RETURNS DESCRIPTION
dict

Mean predicted probabilities per split.

dict

Variance of predicted probabilities per split.

dict

Ground truth labels per split.

Source code in src/ensemble/ensemble_classification.py
def import_model_preds_and_labels(
    run_id: str, run_name: str, model_name: str, subdir: str = "metrics"
) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray], dict[str, np.ndarray]]:
    """
    Load predictions and labels from MLflow run.

    Parameters
    ----------
    run_id : str
        MLflow run ID.
    run_name : str
        MLflow run name.
    model_name : str
        Name of the model.
    subdir : str, default 'metrics'
        Subdirectory within artifacts.

    Returns
    -------
    dict
        Mean predicted probabilities per split.
    dict
        Variance of predicted probabilities per split.
    dict
        Ground truth labels per split.
    """
    artifacts = import_model_metrics(run_id, run_name, model_name, subdir=subdir)
    y_pred_proba, y_pred_proba_var, label = get_preds_and_labels_from_artifacts(
        artifacts
    )
    return y_pred_proba, y_pred_proba_var, label

import_metrics_iter

import_metrics_iter(
    run_id: str,
    run_name: str,
    model_name: str,
    subdir: str = "metrics",
) -> dict[str, Any]

Load per-iteration metrics from MLflow run.

PARAMETER DESCRIPTION
run_id

MLflow run ID.

TYPE: str

run_name

MLflow run name.

TYPE: str

model_name

Name of the model.

TYPE: str

subdir

Subdirectory within artifacts.

TYPE: str DEFAULT: 'metrics'

RETURNS DESCRIPTION
dict

Dictionary of metrics per bootstrap iteration per split.

Source code in src/ensemble/ensemble_classification.py
def import_metrics_iter(
    run_id: str, run_name: str, model_name: str, subdir: str = "metrics"
) -> dict[str, Any]:
    """
    Load per-iteration metrics from MLflow run.

    Parameters
    ----------
    run_id : str
        MLflow run ID.
    run_name : str
        MLflow run name.
    model_name : str
        Name of the model.
    subdir : str, default 'metrics'
        Subdirectory within artifacts.

    Returns
    -------
    dict
        Dictionary of metrics per bootstrap iteration per split.
    """
    artifacts = import_model_metrics(run_id, run_name, model_name, subdir=subdir)
    return artifacts["metrics_iter"]

concentate_one_var

concentate_one_var(
    array_out: dict[str, ndarray] | None,
    array_per_submodel: dict[str, ndarray],
) -> dict[str, ndarray]

Concatenate arrays from submodel into accumulated output.

Stacks submodel arrays along a new first axis.

PARAMETER DESCRIPTION
array_out

Accumulated arrays (None for first submodel).

TYPE: dict or None

array_per_submodel

Arrays from current submodel, keyed by split.

TYPE: dict

RETURNS DESCRIPTION
dict

Updated accumulated arrays.

Source code in src/ensemble/ensemble_classification.py
def concentate_one_var(
    array_out: dict[str, np.ndarray] | None,
    array_per_submodel: dict[str, np.ndarray],
) -> dict[str, np.ndarray]:
    """
    Concatenate arrays from submodel into accumulated output.

    Stacks submodel arrays along a new first axis.

    Parameters
    ----------
    array_out : dict or None
        Accumulated arrays (None for first submodel).
    array_per_submodel : dict
        Arrays from current submodel, keyed by split.

    Returns
    -------
    dict
        Updated accumulated arrays.
    """
    if array_out is None:
        array_out = {}
        for split in array_per_submodel.keys():
            array_out[split] = array_per_submodel[split][np.newaxis, :]
    else:
        for split in array_out.keys():
            array_out[split] = np.concatenate(
                [array_out[split], array_per_submodel[split][np.newaxis, :]], axis=0
            )

    return array_out

concatenate_arrays

concatenate_arrays(
    preds_out: dict[str, ndarray] | None,
    preds_var_out: dict[str, ndarray] | None,
    _labels_out: dict[str, ndarray] | None,
    y_pred_proba: dict[str, ndarray],
    y_pred_proba_var: dict[str, ndarray],
    label: dict[str, ndarray],
) -> tuple[
    dict[str, ndarray],
    dict[str, ndarray],
    dict[str, ndarray],
]

Concatenate prediction arrays from multiple submodels.

PARAMETER DESCRIPTION
preds_out

Accumulated predictions.

TYPE: dict or None

preds_var_out

Accumulated prediction variances.

TYPE: dict or None

_labels_out

Accumulated labels (not modified, labels are same across models).

TYPE: dict or None

y_pred_proba

Predictions from current submodel.

TYPE: dict

y_pred_proba_var

Prediction variances from current submodel.

TYPE: dict

label

Labels from current submodel.

TYPE: dict

RETURNS DESCRIPTION
dict

Updated predictions.

dict

Updated variances.

dict

Labels (unchanged).

Source code in src/ensemble/ensemble_classification.py
def concatenate_arrays(
    preds_out: dict[str, np.ndarray] | None,
    preds_var_out: dict[str, np.ndarray] | None,
    _labels_out: dict[str, np.ndarray] | None,
    y_pred_proba: dict[str, np.ndarray],
    y_pred_proba_var: dict[str, np.ndarray],
    label: dict[str, np.ndarray],
) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray], dict[str, np.ndarray]]:
    """
    Concatenate prediction arrays from multiple submodels.

    Parameters
    ----------
    preds_out : dict or None
        Accumulated predictions.
    preds_var_out : dict or None
        Accumulated prediction variances.
    _labels_out : dict or None
        Accumulated labels (not modified, labels are same across models).
    y_pred_proba : dict
        Predictions from current submodel.
    y_pred_proba_var : dict
        Prediction variances from current submodel.
    label : dict
        Labels from current submodel.

    Returns
    -------
    dict
        Updated predictions.
    dict
        Updated variances.
    dict
        Labels (unchanged).
    """
    preds_out = concentate_one_var(preds_out, y_pred_proba)
    preds_var_out = concentate_one_var(preds_var_out, y_pred_proba_var)
    # should be the same, thus no need for this
    # labels_out = concentate_one_var(labels_out, label)

    return preds_out, preds_var_out, label

check_dicts

check_dicts(
    preds_out: dict[str, ndarray],
    preds_var_out: dict[str, ndarray],
    _labels_out: dict[str, ndarray] | None,
    no_submodel_runs: int,
) -> None

Verify concatenated arrays have expected dimensions.

PARAMETER DESCRIPTION
preds_out

Accumulated predictions.

TYPE: dict

preds_var_out

Accumulated variances.

TYPE: dict

_labels_out

Labels (unused, kept for API compatibility).

TYPE: dict

no_submodel_runs

Expected number of submodels.

TYPE: int

RAISES DESCRIPTION
AssertionError

If array dimensions don't match expected submodel count.

Source code in src/ensemble/ensemble_classification.py
def check_dicts(
    preds_out: dict[str, np.ndarray],
    preds_var_out: dict[str, np.ndarray],
    _labels_out: dict[str, np.ndarray] | None,
    no_submodel_runs: int,
) -> None:
    """
    Verify concatenated arrays have expected dimensions.

    Parameters
    ----------
    preds_out : dict
        Accumulated predictions.
    preds_var_out : dict
        Accumulated variances.
    _labels_out : dict
        Labels (unused, kept for API compatibility).
    no_submodel_runs : int
        Expected number of submodels.

    Raises
    ------
    AssertionError
        If array dimensions don't match expected submodel count.
    """
    for split in preds_out.keys():
        assert preds_out[split].shape[0] == no_submodel_runs, (
            f"preds_out[split].shape[0]: "
            f"{preds_out[split].shape[0]}, "
            f"no_submodel_runs: {no_submodel_runs}"
        )
        assert preds_var_out[split].shape[0] == no_submodel_runs, (
            f"preds_var_out[split].shape[0]: "
            f"{preds_var_out[split].shape[0]}, "
            f"no_submodel_runs: {no_submodel_runs}"
        )

compute_stats

compute_stats(
    preds_out: dict[str, ndarray],
    preds_var_out: dict[str, ndarray],
) -> tuple[
    dict[str, ndarray],
    dict[str, ndarray],
    dict[str, ndarray],
]

Compute ensemble statistics from stacked predictions.

PARAMETER DESCRIPTION
preds_out

Stacked predictions (n_models x n_subjects).

TYPE: dict

preds_var_out

Stacked variances (n_models x n_subjects).

TYPE: dict

RETURNS DESCRIPTION
dict

Mean predictions across models.

dict

Standard deviation of predictions across models.

dict

Mean of within-model standard deviations.

Source code in src/ensemble/ensemble_classification.py
def compute_stats(
    preds_out: dict[str, np.ndarray], preds_var_out: dict[str, np.ndarray]
) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray], dict[str, np.ndarray]]:
    """
    Compute ensemble statistics from stacked predictions.

    Parameters
    ----------
    preds_out : dict
        Stacked predictions (n_models x n_subjects).
    preds_var_out : dict
        Stacked variances (n_models x n_subjects).

    Returns
    -------
    dict
        Mean predictions across models.
    dict
        Standard deviation of predictions across models.
    dict
        Mean of within-model standard deviations.
    """
    preds = {}
    preds_std = {}
    preds_meanstd = {}
    for split in preds_out.keys():
        preds[split] = np.mean(preds_out[split], axis=0)
        preds_std[split] = np.std(preds_out[split], axis=0)
        preds_meanstd[split] = np.mean(preds_var_out[split], axis=0) ** 0.5

    return preds, preds_std, preds_meanstd

aggregate_pred_dict

aggregate_pred_dict(
    preds_out: dict[str, dict[str, list[Any]]],
    preds_per_submodel: dict[str, dict[str, list[Any]]],
    ensemble: bool = False,
) -> dict[str, dict[str, list[Any]]]

Aggregate prediction dictionaries from submodels.

Extends lists of predictions for each subject code.

PARAMETER DESCRIPTION
preds_out

Accumulated predictions keyed by variable and subject code.

TYPE: dict

preds_per_submodel

Predictions from current submodel.

TYPE: dict

ensemble

If True, performs additional consistency checks.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict

Updated accumulated predictions.

Source code in src/ensemble/ensemble_classification.py
def aggregate_pred_dict(
    preds_out: dict[str, dict[str, list[Any]]],
    preds_per_submodel: dict[str, dict[str, list[Any]]],
    ensemble: bool = False,
) -> dict[str, dict[str, list[Any]]]:
    """
    Aggregate prediction dictionaries from submodels.

    Extends lists of predictions for each subject code.

    Parameters
    ----------
    preds_out : dict
        Accumulated predictions keyed by variable and subject code.
    preds_per_submodel : dict
        Predictions from current submodel.
    ensemble : bool, default False
        If True, performs additional consistency checks.

    Returns
    -------
    dict
        Updated accumulated predictions.
    """
    for var in preds_per_submodel:  # e.g. y_pred_proba, y_pred, label
        assert isinstance(preds_per_submodel[var], dict), (
            f"preds_per_submodel[var] is not a dict, "
            f"but {type(preds_per_submodel[var])}"
        )
        unique_codes_out = sorted(list(preds_out[var].keys()))
        unique_codes_in = sorted(list(preds_per_submodel[var].keys()))
        if ensemble:
            assert unique_codes_out == unique_codes_in, (
                "You do not have to have the same subjects in all splits? \n"
                "As in you ran some MLflow with runs with certain subjects,\n"
                "And later redefined the splits?"
            )
        for code in preds_per_submodel[var]:
            list_of_preds = preds_per_submodel[var][code]
            # no_of_bootstrap_iters = len(list_of_preds)
            preds_out[var][code] += list_of_preds

    return preds_out

aggregate_preds

aggregate_preds(
    preds_out: dict[str, ndarray],
    preds_per_submodel: dict[str, ndarray],
) -> dict[str, ndarray]

Concatenate prediction arrays along iteration axis.

PARAMETER DESCRIPTION
preds_out

Accumulated predictions (n_subjects x n_accumulated_iters).

TYPE: dict

preds_per_submodel

Predictions from current submodel.

TYPE: dict

RETURNS DESCRIPTION
dict

Updated predictions with new iterations concatenated.

Source code in src/ensemble/ensemble_classification.py
def aggregate_preds(
    preds_out: dict[str, np.ndarray], preds_per_submodel: dict[str, np.ndarray]
) -> dict[str, np.ndarray]:
    """
    Concatenate prediction arrays along iteration axis.

    Parameters
    ----------
    preds_out : dict
        Accumulated predictions (n_subjects x n_accumulated_iters).
    preds_per_submodel : dict
        Predictions from current submodel.

    Returns
    -------
    dict
        Updated predictions with new iterations concatenated.
    """
    for split in preds_per_submodel:
        preds_out[split] = np.concatenate(
            [preds_out[split], preds_per_submodel[split]], axis=1
        )
    return preds_out

check_metrics_iter_preds_dict

check_metrics_iter_preds_dict(
    dict_arrays: dict[str, dict[str, list[Any]]],
) -> None

Validate prediction dictionary has consistent dimensions.

Checks that y_pred_proba, y_pred, and labels have same length.

PARAMETER DESCRIPTION
dict_arrays

Dictionary with 'y_pred_proba', 'y_pred', and 'label'/'labels' keys.

TYPE: dict

RAISES DESCRIPTION
AssertionError

If lengths don't match.

Source code in src/ensemble/ensemble_classification.py
def check_metrics_iter_preds_dict(
    dict_arrays: dict[str, dict[str, list[Any]]],
) -> None:
    """
    Validate prediction dictionary has consistent dimensions.

    Checks that y_pred_proba, y_pred, and labels have same length.

    Parameters
    ----------
    dict_arrays : dict
        Dictionary with 'y_pred_proba', 'y_pred', and 'label'/'labels' keys.

    Raises
    ------
    AssertionError
        If lengths don't match.
    """
    # (no_subjects, no_of_bootstrap_iters)
    if "labels" in dict_arrays:
        # TODO! why with CATBOOST? fix this eventually so that only one key
        assert len(dict_arrays["y_pred_proba"]) == len(dict_arrays["labels"]), (
            f"you have {len(dict_arrays['y_pred_proba'])} y_pred_proba and {len(dict_arrays['labels'])} labels"
        )
    elif "label" in dict_arrays:
        assert len(dict_arrays["y_pred_proba"]) == len(dict_arrays["label"]), (
            f"you have {len(dict_arrays['y_pred_proba'])} y_pred_proba and {len(dict_arrays['label'])} labels"
        )
    assert len(dict_arrays["y_pred_proba"]) == len(dict_arrays["y_pred"])

check_metrics_iter_preds

check_metrics_iter_preds(
    dict_arrays: dict[str, ndarray],
) -> None

Validate prediction arrays have consistent shapes.

Checks that y_pred_proba, y_pred, and labels have same first dimension.

PARAMETER DESCRIPTION
dict_arrays

Dictionary with numpy array values.

TYPE: dict

RAISES DESCRIPTION
AssertionError

If shapes don't match.

Source code in src/ensemble/ensemble_classification.py
def check_metrics_iter_preds(dict_arrays: dict[str, np.ndarray]) -> None:
    """
    Validate prediction arrays have consistent shapes.

    Checks that y_pred_proba, y_pred, and labels have same first dimension.

    Parameters
    ----------
    dict_arrays : dict
        Dictionary with numpy array values.

    Raises
    ------
    AssertionError
        If shapes don't match.
    """
    # (no_subjects, no_of_bootstrap_iters)
    if "label" in dict_arrays:
        assert dict_arrays["y_pred_proba"].shape[0] == dict_arrays["label"].shape[0], (
            f"you have {dict_arrays['y_pred_proba'].shape[0]} y_pred_proba and {dict_arrays['label'].shape[0]} labels"
        )
    elif "labels" in dict_arrays:
        # TODO! why with CATBOOST? fix this eventually so that only one key
        assert dict_arrays["y_pred_proba"].shape[0] == dict_arrays["labels"].shape[0], (
            f"you have {dict_arrays['y_pred_proba'].shape[0]} y_pred_proba and {dict_arrays['labels'].shape[0]} labels"
        )
    assert dict_arrays["y_pred_proba"].shape[0] == dict_arrays["y_pred"].shape[0]

check_metrics_iter_shapes

check_metrics_iter_shapes(
    iter_split: dict[str, Any],
) -> None

Dispatch shape checking based on data structure type.

PARAMETER DESCRIPTION
iter_split

Split-level metrics iteration data.

TYPE: dict

Source code in src/ensemble/ensemble_classification.py
def check_metrics_iter_shapes(iter_split: dict[str, Any]) -> None:
    """
    Dispatch shape checking based on data structure type.

    Parameters
    ----------
    iter_split : dict
        Split-level metrics iteration data.
    """
    if "preds_dict" in iter_split:
        check_metrics_iter_preds_dict(dict_arrays=iter_split["preds_dict"]["arrays"])
    else:
        check_metrics_iter_preds(
            dict_arrays=iter_split["preds"]["arrays"]["predictions"]
        )

check_subjects_in_splits

check_subjects_in_splits(
    metrics_iter: dict[str, Any] | None,
) -> dict[str, list[str]] | None

Extract and validate subject codes from metrics iteration data.

PARAMETER DESCRIPTION
metrics_iter

Metrics per iteration dictionary.

TYPE: dict or None

RETURNS DESCRIPTION
dict or None

Dictionary mapping splits to sorted subject code lists.

Source code in src/ensemble/ensemble_classification.py
def check_subjects_in_splits(
    metrics_iter: dict[str, Any] | None,
) -> dict[str, list[str]] | None:
    """
    Extract and validate subject codes from metrics iteration data.

    Parameters
    ----------
    metrics_iter : dict or None
        Metrics per iteration dictionary.

    Returns
    -------
    dict or None
        Dictionary mapping splits to sorted subject code lists.
    """
    if metrics_iter is not None:
        subjects = {}
        for split in metrics_iter.keys():
            if "preds_dict" in metrics_iter[split]:
                subjects[split] = sorted(
                    list(
                        metrics_iter[split]["preds_dict"]["arrays"][
                            "y_pred_proba"
                        ].keys()
                    )
                )
            elif "preds" in metrics_iter[split]:
                no_subjects_preds = len(
                    metrics_iter[split]["preds"]["arrays"]["predictions"][
                        "y_pred_proba"
                    ]
                )
                logger.debug(f"{no_subjects_preds} in test split")
            else:
                logger.error("Where are the preds?")
                raise ValueError("Where are the preds?")

        if "val" in subjects:
            assert subjects["train"] == subjects["val"], (
                "Your train and val codes do not match?"
            )

        return subjects
    else:
        return None

check_compare_subjects_for_aggregation

check_compare_subjects_for_aggregation(
    subject_codes: dict[str, list[str]] | None,
    subject_codes_model: dict[str, list[str]] | None,
    run_name: str,
    i: int,
    split_to_check: str = "train",
) -> list[str]

Compare subject codes between ensemble and current submodel.

Verifies that the current submodel used same subjects as previous models.

PARAMETER DESCRIPTION
subject_codes

Subject codes from ensemble (accumulated).

TYPE: dict or None

subject_codes_model

Subject codes from current submodel.

TYPE: dict or None

run_name

Name of current run for error reporting.

TYPE: str

i

Index of current submodel.

TYPE: int

split_to_check

Which split to compare.

TYPE: str DEFAULT: 'train'

RETURNS DESCRIPTION
list

List of run names with mismatched codes (empty if match).

Source code in src/ensemble/ensemble_classification.py
def check_compare_subjects_for_aggregation(
    subject_codes: dict[str, list[str]] | None,
    subject_codes_model: dict[str, list[str]] | None,
    run_name: str,
    i: int,
    split_to_check: str = "train",
) -> list[str]:
    """
    Compare subject codes between ensemble and current submodel.

    Verifies that the current submodel used same subjects as previous models.

    Parameters
    ----------
    subject_codes : dict or None
        Subject codes from ensemble (accumulated).
    subject_codes_model : dict or None
        Subject codes from current submodel.
    run_name : str
        Name of current run for error reporting.
    i : int
        Index of current submodel.
    split_to_check : str, default 'train'
        Which split to compare.

    Returns
    -------
    list
        List of run names with mismatched codes (empty if match).
    """
    error_run = []
    if subject_codes is not None and subject_codes_model is not None:
        for split in subject_codes.keys():
            # these come from whole bootstrap experiment, so train and val should have the same codes
            if split == split_to_check:
                if subject_codes[split] != subject_codes_model[split]:
                    logger.error(
                        f"Submodel #{i + 1}: {run_name} seem to have different subjects in splits than in previous submodels"
                    )
                    error_run = [run_name]
                    if len(subject_codes[split]) != len(subject_codes_model[split]):
                        logger.error(
                            "Lengths of splits do not even seem to match, ensemble n = {}, submodel n = {}".format(
                                len(subject_codes[split]),
                                len(subject_codes_model[split]),
                            )
                        )
                        # raise ValueError('Your ensemble seem to come from different splits, did you redefine the splits\n'
                        #                  'while running the experiments? Need to delete the runs with old split definitions\n'
                        #                  'and rerun those to get this ensembling working?')
                    else:
                        logger.debug("Ensemble codes | Model Codes")
                        for code_ens, code in zip(
                            subject_codes[split], subject_codes_model[split]
                        ):
                            logger.debug(f"{code_ens} | {code}")
                        # raise ValueError('Your ensemble seem to come from different splits, did you redefine the splits\n'
                        #                  'while running the experiments? Need to delete the runs with old split definitions\n'
                        #                  'and rerun those to get this ensembling working?')

    return error_run

aggregate_metric_iter

aggregate_metric_iter(
    metrics_iter: dict[str, Any] | None,
    metrics_iter_model: dict[str, Any],
    run_name: str,
    ensemble: bool = False,
) -> dict[str, Any]

Aggregate metrics iteration data from submodel into ensemble.

Combines predictions from multiple bootstrap models by extending the iteration arrays.

PARAMETER DESCRIPTION
metrics_iter

Accumulated metrics (None for first submodel).

TYPE: dict or None

metrics_iter_model

Metrics from current submodel.

TYPE: dict

run_name

Name of current run for logging.

TYPE: str

ensemble

If True, performs additional consistency checks.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict

Updated accumulated metrics.

Source code in src/ensemble/ensemble_classification.py
def aggregate_metric_iter(
    metrics_iter: dict[str, Any] | None,
    metrics_iter_model: dict[str, Any],
    run_name: str,
    ensemble: bool = False,
) -> dict[str, Any]:
    """
    Aggregate metrics iteration data from submodel into ensemble.

    Combines predictions from multiple bootstrap models by extending
    the iteration arrays.

    Parameters
    ----------
    metrics_iter : dict or None
        Accumulated metrics (None for first submodel).
    metrics_iter_model : dict
        Metrics from current submodel.
    run_name : str
        Name of current run for logging.
    ensemble : bool, default False
        If True, performs additional consistency checks.

    Returns
    -------
    dict
        Updated accumulated metrics.
    """
    if metrics_iter is None:
        metrics_iter = {}
        for split in metrics_iter_model.keys():
            metrics_iter[split] = metrics_iter_model[split]
            metrics_iter[split].pop("metrics")

    else:
        for split in metrics_iter.keys():
            metrics_iter_model[split].pop("metrics")
            if "preds_dict" in metrics_iter_model[split]:
                # train/val
                preds: dict[str, dict] = metrics_iter_model[split]["preds_dict"][
                    "arrays"
                ]
                metrics_iter[split]["preds_dict"]["arrays"] = aggregate_pred_dict(
                    preds_out=metrics_iter[split]["preds_dict"]["arrays"],
                    preds_per_submodel=preds,
                    ensemble=ensemble,
                )
                check_metrics_iter_preds_dict(
                    dict_arrays=metrics_iter[split]["preds_dict"]["arrays"]
                )

            else:
                # test
                preds: dict[str, np.ndarray] = metrics_iter_model[split]["preds"][
                    "arrays"
                ]["predictions"]
                metrics_iter[split]["preds"]["arrays"]["predictions"] = aggregate_preds(
                    preds_out=metrics_iter[split]["preds"]["arrays"]["predictions"],
                    preds_per_submodel=preds,
                )
                check_metrics_iter_preds(
                    dict_arrays=metrics_iter[split]["preds"]["arrays"]["predictions"]
                )

    return metrics_iter

get_label_array

get_label_array(label_dict: dict[str, ndarray]) -> ndarray

Convert label dictionary to array.

PARAMETER DESCRIPTION
label_dict

Dictionary mapping subject codes to label arrays.

TYPE: dict

RETURNS DESCRIPTION
ndarray

Array of labels in same order as dictionary keys.

Source code in src/ensemble/ensemble_classification.py
def get_label_array(label_dict: dict[str, np.ndarray]) -> np.ndarray:
    """
    Convert label dictionary to array.

    Parameters
    ----------
    label_dict : dict
        Dictionary mapping subject codes to label arrays.

    Returns
    -------
    np.ndarray
        Array of labels in same order as dictionary keys.
    """
    label_array = []
    for code in label_dict.keys():
        label_array.append(label_dict[code][0])
    label_array = np.array(label_array)
    assert label_array.shape[0] == len(label_dict), (
        f"label_array.shape[0]: {label_array.shape[0]}, "
        f"len(label_dict): {len(label_dict)}"
    )
    return label_array

get_preds_array

get_preds_array(preds_dict: dict[str, ndarray]) -> ndarray

Convert prediction dictionary to 2D array.

PARAMETER DESCRIPTION
preds_dict

Dictionary mapping subject codes to prediction arrays.

TYPE: dict

RETURNS DESCRIPTION
ndarray

Array of shape (n_subjects, n_bootstrap_iters).

Source code in src/ensemble/ensemble_classification.py
def get_preds_array(preds_dict: dict[str, np.ndarray]) -> np.ndarray:
    """
    Convert prediction dictionary to 2D array.

    Parameters
    ----------
    preds_dict : dict
        Dictionary mapping subject codes to prediction arrays.

    Returns
    -------
    np.ndarray
        Array of shape (n_subjects, n_bootstrap_iters).
    """

    def get_min_bootstrap_iters_from_subjects(
        preds_dict: dict[str, np.ndarray],
    ) -> int:
        lengths = []
        for code in preds_dict.keys():
            lengths.append(len(preds_dict[code]))
        return min(lengths)

    array_iter_no = get_min_bootstrap_iters_from_subjects(preds_dict)
    preds_array = np.zeros((len(preds_dict), array_iter_no))
    for i, code in enumerate(preds_dict.keys()):
        preds_array[i] = preds_dict[code][:array_iter_no]

    return preds_array

recompute_ensemble_metrics

recompute_ensemble_metrics(
    metrics_iter: dict[str, Any],
    sources: dict[str, Any],
    cfg: DictConfig,
) -> dict[str, Any]

Recompute metrics for aggregated ensemble predictions.

Takes combined predictions from all submodels and recomputes bootstrap metrics as if they were from a single model.

PARAMETER DESCRIPTION
metrics_iter

Aggregated predictions from all submodels.

TYPE: dict

sources

Source data containing subject information.

TYPE: dict

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Updated metrics_iter with recomputed metrics.

Source code in src/ensemble/ensemble_classification.py
def recompute_ensemble_metrics(
    metrics_iter: dict[str, Any], sources: dict[str, Any], cfg: DictConfig
) -> dict[str, Any]:
    """
    Recompute metrics for aggregated ensemble predictions.

    Takes combined predictions from all submodels and recomputes
    bootstrap metrics as if they were from a single model.

    Parameters
    ----------
    metrics_iter : dict
        Aggregated predictions from all submodels.
    sources : dict
        Source data containing subject information.
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    dict
        Updated metrics_iter with recomputed metrics.
    """
    warnings.simplefilter("ignore")
    # skip "val" for now
    splits = ["train", "test"]
    for split in splits:  # metrics_iter.keys():
        if "preds_dict" in metrics_iter[split]:
            y_true = get_label_array(
                label_dict=metrics_iter[split]["preds_dict"]["arrays"]["label"]
            )
            preds_array = get_preds_array(
                preds_dict=metrics_iter[split]["preds_dict"]["arrays"]["y_pred_proba"]
            )

        elif "preds" in metrics_iter[split]:
            y_true = metrics_iter[split]["preds"]["arrays"]["predictions"]["label"][
                :, 0
            ]
            preds_array = metrics_iter[split]["preds"]["arrays"]["predictions"][
                "y_pred_proba"
            ]

        else:
            logger.error(
                "Where are the predictions? {}".format(metrics_iter[split].keys())
            )
            raise ValueError(
                "Where are the predictions? {}".format(metrics_iter[split].keys())
            )

        method_cfg = cfg["CLS_EVALUATION"]["BOOTSTRAP"]
        dict_arrays_compact = get_compact_dict_arrays(sources)
        codes_per_split = dict_arrays_compact[f"subject_codes_{split}"]

        for idx in tqdm(
            range(preds_array.shape[1]),
            desc=f"Recomputing ensemble metrics for {split}",
        ):
            preds = create_pred_dict(split_preds=preds_array[:, idx], y_true=y_true)
            metrics_iter[split] = bootstrap_metrics_per_split(
                None,
                y_true,
                preds,
                None,
                model_name="ensemble",
                metrics_per_split=metrics_iter[split],
                codes_per_split=codes_per_split,
                method_cfg=method_cfg,
                cfg=cfg,
                split=split,
                skip_mlflow=True,
                recompute_for_ensemble=True,
            )
            check_metrics_iter_shapes(iter_split=metrics_iter[split])

    warnings.resetwarnings()

    return metrics_iter

get_cls_preds_from_artifact

get_cls_preds_from_artifact(
    run: Series,
    i: int,
    no_submodel_runs: int,
    aggregate_preds: bool = False,
) -> dict[str, Any]

Load classification predictions from MLflow run artifact.

PARAMETER DESCRIPTION
run

MLflow run data.

TYPE: Series

i

Index of current submodel (for logging).

TYPE: int

no_submodel_runs

Total number of submodels (for logging).

TYPE: int

aggregate_preds

If True, log aggregation progress.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict

Metrics per iteration dictionary from the run.

Source code in src/ensemble/ensemble_classification.py
def get_cls_preds_from_artifact(
    run: pd.Series, i: int, no_submodel_runs: int, aggregate_preds: bool = False
) -> dict[str, Any]:
    """
    Load classification predictions from MLflow run artifact.

    Parameters
    ----------
    run : pd.Series
        MLflow run data.
    i : int
        Index of current submodel (for logging).
    no_submodel_runs : int
        Total number of submodels (for logging).
    aggregate_preds : bool, default False
        If True, log aggregation progress.

    Returns
    -------
    dict
        Metrics per iteration dictionary from the run.
    """
    run_id = run["run_id"]
    run_name = run["tags.mlflow.runName"]
    model_name = run["params.model_name"]
    if aggregate_preds:
        logger.info(
            f"{i + 1}/{no_submodel_runs}: Ensembling model: {model_name}, run_id: {run_id}, run_name: {run_name}"
        )
    # Baseline (as in no bootstrapping)
    # preds_baseline, labels_baseline = import_model_preds_and_labels(run_id, run_name, model_name, subdir='baseline_model')

    # Bootstrapped model
    # y_pred_proba, y_pred_proba_var, label = (
    #     import_model_preds_and_labels(run_id, run_name, model_name, subdir='metrics'))
    # preds_out, preds_var_out, labels_out = concatenate_arrays(
    #     preds_out, preds_var_out, labels_out, y_pred_proba, y_pred_proba_var, label
    # )
    metrics_iter_model = import_metrics_iter(
        run_id, run_name, model_name, subdir="metrics"
    )
    n = metrics_iter_model["test"]["preds"]["arrays"]["predictions"][
        "y_pred_proba"
    ].shape[1]
    if aggregate_preds:
        logger.info("Submodel consists of {} bootstrap iterations".format(n))

    return metrics_iter_model

aggregate_submodels

aggregate_submodels(
    ensemble_model_runs: DataFrame,
    no_submodel_runs: int,
    aggregate_preds: bool = True,
    split_to_check: str = "train",
    ensemble_codes: DataFrame | None = None,
) -> tuple[dict[str, Any] | None, DataFrame, bool]

Aggregate predictions from multiple classification submodels.

PARAMETER DESCRIPTION
ensemble_model_runs

DataFrame of MLflow runs to aggregate.

TYPE: DataFrame

no_submodel_runs

Number of submodels.

TYPE: int

aggregate_preds

If True, actually aggregate predictions. If False, only check codes.

TYPE: bool DEFAULT: True

split_to_check

Split to use for code consistency checking.

TYPE: str DEFAULT: 'train'

ensemble_codes

Pre-computed ensemble codes for validation.

TYPE: DataFrame DEFAULT: None

RETURNS DESCRIPTION
dict or None

Aggregated metrics_iter (or None if aggregate_preds=False).

DataFrame

DataFrame of subject codes per submodel.

bool

Whether all submodels have same subject codes.

Source code in src/ensemble/ensemble_classification.py
def aggregate_submodels(
    ensemble_model_runs: pd.DataFrame,
    no_submodel_runs: int,
    aggregate_preds: bool = True,
    split_to_check: str = "train",
    ensemble_codes: pd.DataFrame | None = None,
) -> tuple[dict[str, Any] | None, pd.DataFrame, bool]:
    """
    Aggregate predictions from multiple classification submodels.

    Parameters
    ----------
    ensemble_model_runs : pd.DataFrame
        DataFrame of MLflow runs to aggregate.
    no_submodel_runs : int
        Number of submodels.
    aggregate_preds : bool, default True
        If True, actually aggregate predictions. If False, only check codes.
    split_to_check : str, default 'train'
        Split to use for code consistency checking.
    ensemble_codes : pd.DataFrame, optional
        Pre-computed ensemble codes for validation.

    Returns
    -------
    dict or None
        Aggregated metrics_iter (or None if aggregate_preds=False).
    pd.DataFrame
        DataFrame of subject codes per submodel.
    bool
        Whether all submodels have same subject codes.
    """
    metrics_iter = None
    subject_codes_out = {}
    error_runs = []

    for i, (idx, run) in enumerate(ensemble_model_runs.iterrows()):
        metrics_iter_model = get_cls_preds_from_artifact(
            run, i, no_submodel_runs, aggregate_preds=aggregate_preds
        )
        subject_codes = check_subjects_in_splits(metrics_iter)
        subject_codes_model = check_subjects_in_splits(metrics_iter_model)
        error_runs += check_compare_subjects_for_aggregation(
            subject_codes, subject_codes_model, run["tags.mlflow.runName"], i
        )
        subject_codes_out[run["tags.mlflow.runName"]] = subject_codes_model[
            split_to_check
        ]
        df_out = pd.DataFrame()

        if aggregate_preds:
            metrics_iter = aggregate_metric_iter(
                metrics_iter,
                metrics_iter_model,
                run_name=run["tags.mlflow.runName"],
                ensemble=True,
            )

    for run_name, list_of_codes in subject_codes_out.items():
        df_out[run_name] = list_of_codes
    all_submodels_have_same_codes = are_codes_the_same(df_out)

    if not aggregate_preds:
        if len(error_runs) > 0:
            # well assuming that the first submodel of the ensemble is correct
            logger.error(
                "These runs seem to be done with a different set of subjects than others?"
            )
            for run in error_runs:
                logger.error(run)
            raise ValueError(
                "Your ensemble seem to come from different splits, did you redefine the splits\n"
                "while running the experiments? Need to delete the runs with old split definitions\n"
                "and rerun those to get this ensembling working?"
            )

    return metrics_iter, df_out, all_submodels_have_same_codes

get_classification_preds

get_classification_preds(
    ensemble_model_runs: DataFrame,
    sources: dict[str, Any],
    cfg: DictConfig,
) -> dict[str, Any] | None

Get aggregated classification predictions from submodels.

Coordinates the full aggregation process: code checking, prediction aggregation, and metric recomputation.

PARAMETER DESCRIPTION
ensemble_model_runs

DataFrame of MLflow runs to ensemble.

TYPE: DataFrame

sources

Source data.

TYPE: dict

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict or None

Aggregated metrics_iter with recomputed metrics, or None if failed.

Source code in src/ensemble/ensemble_classification.py
def get_classification_preds(
    ensemble_model_runs: pd.DataFrame, sources: dict[str, Any], cfg: DictConfig
) -> dict[str, Any] | None:
    """
    Get aggregated classification predictions from submodels.

    Coordinates the full aggregation process: code checking, prediction
    aggregation, and metric recomputation.

    Parameters
    ----------
    ensemble_model_runs : pd.DataFrame
        DataFrame of MLflow runs to ensemble.
    sources : dict
        Source data.
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    dict or None
        Aggregated metrics_iter with recomputed metrics, or None if failed.
    """
    no_submodel_runs = ensemble_model_runs.shape[0]
    if no_submodel_runs > 0:
        _, ensemble_codes, same_codes = aggregate_submodels(
            ensemble_model_runs, no_submodel_runs, aggregate_preds=False
        )
        if same_codes:
            metrics_iter, _, _ = aggregate_submodels(
                ensemble_model_runs,
                no_submodel_runs,
                aggregate_preds=True,
                ensemble_codes=ensemble_codes,
            )
            # This metrics_iter is now the same that you would get from the "normal bootstrap" with just all the
            # iterations aggregated together
            n = metrics_iter["test"]["preds"]["arrays"]["predictions"][
                "y_pred_proba"
            ].shape[1]
            logger.info(
                "Ensemble consists a total of {} bootstrap iterations".format(n)
            )

            # compute the metrics for the ensemble
            metrics_iter = recompute_ensemble_metrics(metrics_iter, sources, cfg)
        else:
            logger.warning(
                "The codes used to train different submodels do not seem to be the same!"
            )
            metrics_iter = None
    else:
        metrics_iter = None

    return metrics_iter

create_pred_dict

create_pred_dict(
    split_preds: ndarray, y_true: ndarray
) -> dict[str, ndarray]

Create prediction dictionary from arrays.

PARAMETER DESCRIPTION
split_preds

Predicted probabilities.

TYPE: ndarray

y_true

Ground truth labels.

TYPE: ndarray

RETURNS DESCRIPTION
dict

Dictionary with y_pred, y_pred_proba, and labels.

Source code in src/ensemble/ensemble_classification.py
def create_pred_dict(
    split_preds: np.ndarray, y_true: np.ndarray
) -> dict[str, np.ndarray]:
    """
    Create prediction dictionary from arrays.

    Parameters
    ----------
    split_preds : np.ndarray
        Predicted probabilities.
    y_true : np.ndarray
        Ground truth labels.

    Returns
    -------
    dict
        Dictionary with y_pred, y_pred_proba, and labels.
    """
    preds = {
        "y_pred": (split_preds > 0.5).astype(int),
        "y_pred_proba": split_preds,
        "labels": y_true,
    }
    return preds

get_compact_dict_arrays

get_compact_dict_arrays(
    sources: dict[str, Any],
) -> dict[str, ndarray]

Extract compact arrays of subject codes and labels from sources.

Used by bootstrap metric computation functions.

PARAMETER DESCRIPTION
sources

Source data dictionary.

TYPE: dict

RETURNS DESCRIPTION
dict

Dictionary with keys like 'subject_codes_train', 'y_train', etc.

Source code in src/ensemble/ensemble_classification.py
def get_compact_dict_arrays(sources: dict[str, Any]) -> dict[str, np.ndarray]:
    """
    Extract compact arrays of subject codes and labels from sources.

    Used by bootstrap metric computation functions.

    Parameters
    ----------
    sources : dict
        Source data dictionary.

    Returns
    -------
    dict
        Dictionary with keys like 'subject_codes_train', 'y_train', etc.
    """
    first_feature_source = list(sources.keys())[0]
    data = sources[first_feature_source]["data"]
    dict_arrays_compact = {}
    for split in data.keys():
        dict_arrays_compact[f"subject_codes_{split}"] = data[split][
            "subject_code"
        ].to_numpy()
        # this is now as a string, e.g. "control" vs. "glaucoma"
        dict_arrays_compact[f"y_{split}"] = data[split][
            "metadata_class_label"
        ].to_numpy()
        # to integers
        dict_arrays_compact[f"y_{split}"] = encode_labels_to_integers(
            dict_arrays_compact[f"y_{split}"]
        )

    return dict_arrays_compact

compute_cls_ensemble_metrics

compute_cls_ensemble_metrics(
    metrics_iter: dict[str, Any],
    sources: dict[str, Any],
    cfg: DictConfig,
) -> dict[str, Any]

Compute classification ensemble statistics.

PARAMETER DESCRIPTION
metrics_iter

Aggregated predictions from ensemble.

TYPE: dict

sources

Source data.

TYPE: dict

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Dictionary containing metrics_iter, metrics_stats, subjectwise_stats, and subject_global_stats.

Source code in src/ensemble/ensemble_classification.py
def compute_cls_ensemble_metrics(
    metrics_iter: dict[str, Any], sources: dict[str, Any], cfg: DictConfig
) -> dict[str, Any]:
    """
    Compute classification ensemble statistics.

    Parameters
    ----------
    metrics_iter : dict
        Aggregated predictions from ensemble.
    sources : dict
        Source data.
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    dict
        Dictionary containing metrics_iter, metrics_stats, subjectwise_stats,
        and subject_global_stats.
    """
    method_cfg = cfg["CLS_EVALUATION"]["BOOTSTRAP"]
    dict_arrays_compact = get_compact_dict_arrays(sources)
    metrics_stats, subjectwise_stats, subject_global_stats = get_ensemble_stats(
        metrics_iter,
        dict_arrays_compact,
        method_cfg,
        call_from="classification_ensemble",
    )

    metrics = {
        "metrics_iter": metrics_iter,
        "metrics_stats": metrics_stats,
        "subjectwise_stats": subjectwise_stats,
        "subject_global_stats": subject_global_stats,
    }

    return metrics

ensemble_classification

ensemble_classification(
    ensemble_model_runs: DataFrame,
    cfg: DictConfig,
    sources: dict[str, Any],
    ensemble_name: str,
) -> dict[str, Any] | None

Create classification ensemble from multiple models.

Main entry point for classification ensembling. Aggregates predictions from submodels and computes ensemble metrics.

PARAMETER DESCRIPTION
ensemble_model_runs

DataFrame of MLflow classification runs to ensemble.

TYPE: DataFrame

cfg

Main Hydra configuration.

TYPE: DictConfig

sources

Source data.

TYPE: dict

ensemble_name

Name for the ensemble.

TYPE: str

RETURNS DESCRIPTION
dict or None

Ensemble metrics dictionary, or None if ensembling failed.

Source code in src/ensemble/ensemble_classification.py
def ensemble_classification(
    ensemble_model_runs: pd.DataFrame,
    cfg: DictConfig,
    sources: dict[str, Any],
    ensemble_name: str,
) -> dict[str, Any] | None:
    """
    Create classification ensemble from multiple models.

    Main entry point for classification ensembling. Aggregates predictions
    from submodels and computes ensemble metrics.

    Parameters
    ----------
    ensemble_model_runs : pd.DataFrame
        DataFrame of MLflow classification runs to ensemble.
    cfg : DictConfig
        Main Hydra configuration.
    sources : dict
        Source data.
    ensemble_name : str
        Name for the ensemble.

    Returns
    -------
    dict or None
        Ensemble metrics dictionary, or None if ensembling failed.
    """
    # Get imputation mask and labels for each model
    metrics_iter = get_classification_preds(ensemble_model_runs, sources, cfg=cfg)

    # Compute the metrics for the ensemble
    if metrics_iter is not None:
        metrics = compute_cls_ensemble_metrics(metrics_iter, sources, cfg)
    else:
        metrics = None

    return metrics

ensemble_imputation

Imputation ensemble module.

Provides functionality to combine imputation outputs from multiple models by averaging reconstructions and computing ensemble statistics.

Cross-references: - src/metrics/evaluate_imputation_metrics.py for metric computation - src/ensemble/ensemble_utils.py for run retrieval

get_imputation_results_per_model

get_imputation_results_per_model(
    model_name, outlier_artifacts
)

Extract imputation reconstructions from model artifacts.

Handles different artifact structures and destandardizes values for proper metric computation.

PARAMETER DESCRIPTION
model_name

Name of the imputation model.

TYPE: str

outlier_artifacts

Loaded artifacts from MLflow.

TYPE: dict

RETURNS DESCRIPTION
dict

Reconstructions per split (destandardized).

dict

True pupil values per split (destandardized).

dict

Imputation indicating masks per split.

Source code in src/ensemble/ensemble_imputation.py
def get_imputation_results_per_model(model_name, outlier_artifacts):
    """
    Extract imputation reconstructions from model artifacts.

    Handles different artifact structures and destandardizes values
    for proper metric computation.

    Parameters
    ----------
    model_name : str
        Name of the imputation model.
    outlier_artifacts : dict
        Loaded artifacts from MLflow.

    Returns
    -------
    dict
        Reconstructions per split (destandardized).
    dict
        True pupil values per split (destandardized).
    dict
        Imputation indicating masks per split.
    """
    reconstructions = {"train": [], "test": []}
    true_pupil = {"train": [], "test": []}
    labels = {"train": [], "test": []}

    split = "train"
    imputation_dict = outlier_artifacts["model_artifacts"]["imputation"][split]
    # # (no_samples, no_timepoints, no_feats) -> (no_samples, no_timepoints) # e.g. (355,1981)
    if len(imputation_dict["imputation_dict"]["imputation"]["mean"].shape) == 3:
        reconstructions[split] = imputation_dict["imputation_dict"]["imputation"][
            "mean"
        ][:, :, 0]
    elif len(imputation_dict["imputation_dict"]["imputation"]["mean"].shape) == 3:
        reconstructions[split] = imputation_dict["imputation_dict"]["imputation"][
            "mean"
        ][:, :]

    try:
        true_pupil[split] = outlier_artifacts["source_data"]["df"][split]["data"]["X"]
    except Exception as e:
        logger.error(e)
        raise e

    if len(imputation_dict["imputation_dict"]["indicating_mask"].shape) == 3:
        labels[split] = imputation_dict["imputation_dict"]["indicating_mask"][
            :, :, 0
        ].astype(int)
    elif len(imputation_dict["imputation_dict"]["indicating_mask"].shape) == 2:
        labels[split] = imputation_dict["imputation_dict"]["indicating_mask"][
            :, :
        ].astype(int)
    else:
        logger.error(
            "Unknown shape of imputation indicating mask, shape = {}".format(
                true_pupil[split].shape
            )
        )
        raise ValueError(
            "Unknown shape of imputation indicating mask, shape = {}".format(
                true_pupil[split].shape
            )
        )

    assert true_pupil[split].shape == labels[split].shape, (
        f"true_pupil[split].shape: {true_pupil[split].shape}, "
        f"labels[split].shape: {labels[split].shape}"
    )

    split = "test"
    imputation_dict = outlier_artifacts["model_artifacts"]["imputation"][split]
    reconstructions[split] = imputation_dict["imputation_dict"]["imputation"]["mean"][
        :, :, 0
    ]
    true_pupil[split] = outlier_artifacts["source_data"]["df"][split]["data"]["X"]
    if len(imputation_dict["imputation_dict"]["indicating_mask"].shape) == 3:
        labels[split] = imputation_dict["imputation_dict"]["indicating_mask"][
            :, :, 0
        ].astype(int)
    elif len(imputation_dict["imputation_dict"]["indicating_mask"].shape) == 2:
        labels[split] = imputation_dict["imputation_dict"]["indicating_mask"][
            :, :
        ].astype(int)
    else:
        logger.error(
            "Unknown shape of imputation indicating mask, shape = {}".format(
                true_pupil[split].shape
            )
        )
        raise ValueError(
            "Unknown shape of imputation indicating mask, shape = {}".format(
                true_pupil[split].shape
            )
        )

    assert true_pupil[split].shape == labels[split].shape, (
        f"true_pupil[split].shape: {true_pupil[split].shape}, "
        f"labels[split].shape: {labels[split].shape}"
    )

    stdz_dict = outlier_artifacts["source_data"]["preprocess"]["standardization"]

    true_pupil["train"], reconstructions["train"] = destandardize_for_imputation_metric(
        true_pupil["train"], reconstructions["train"], stdz_dict
    )
    true_pupil["test"], reconstructions["test"] = destandardize_for_imputation_metric(
        true_pupil["test"], reconstructions["test"], stdz_dict
    )

    # if np.max(reconstructions["test"]) > 150:
    #     # check for stdz errors
    #     logger.error("Reconstruction values are too high, check for stdz errors")
    #     logger.error(
    #         f"Reconstruction values: min = {np.min(reconstructions)}, "
    #         f"max = {np.max(reconstructions)}"
    #     )
    #     raise ValueError("Reconstruction values are too high, check for stdz errors")

    return reconstructions, true_pupil, labels

get_imputation_preds_and_labels

get_imputation_preds_and_labels(
    ensemble_model_runs: DataFrame,
    gt_dict: dict,
    gt_preprocess: dict,
    cfg: DictConfig,
)

Load and stack imputation reconstructions from all submodels.

PARAMETER DESCRIPTION
ensemble_model_runs

DataFrame of MLflow imputation runs.

TYPE: DataFrame

gt_dict

Ground truth data dictionary.

TYPE: dict

gt_preprocess

Ground truth preprocessing parameters.

TYPE: dict

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Stacked reconstructions (n_models x n_subjects x n_timepoints).

dict

True pupil values (from last model).

dict

Imputation masks (from last model).

Source code in src/ensemble/ensemble_imputation.py
def get_imputation_preds_and_labels(
    ensemble_model_runs: pd.DataFrame,
    gt_dict: dict,
    gt_preprocess: dict,
    cfg: DictConfig,
):
    """
    Load and stack imputation reconstructions from all submodels.

    Parameters
    ----------
    ensemble_model_runs : pd.DataFrame
        DataFrame of MLflow imputation runs.
    gt_dict : dict
        Ground truth data dictionary.
    gt_preprocess : dict
        Ground truth preprocessing parameters.
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    dict
        Stacked reconstructions (n_models x n_subjects x n_timepoints).
    dict
        True pupil values (from last model).
    dict
        Imputation masks (from last model).
    """
    reconstructions = {"train": [], "test": []}
    no_submodel_runs = ensemble_model_runs.shape[0]
    no_submodel_ensembled = 0

    submodel_names = []
    for i, (idx, submodel_mlflow_run) in enumerate(ensemble_model_runs.iterrows()):
        run_id = submodel_mlflow_run["run_id"]
        run_name = submodel_mlflow_run["tags.mlflow.runName"]
        model_name = submodel_mlflow_run["params.model"]
        logger.info(
            f"{i + 1}/{no_submodel_runs}: Ensembling model: {model_name}, run_id: {run_id}, run_name: {run_name}"
        )
        try:
            outlier_artifacts_path = get_artifact(
                run_id, run_name, model_name, subdir="imputation"
            )
        except Exception:
            outlier_artifacts_path = None

        if outlier_artifacts_path is None:
            logger.warning(f"Could not load results for {model_name}")
            logger.warning(
                "Was there a glitch, and no results were saved? Or was this non-finished run?"
            )
            continue
        else:
            try:
                outlier_artifacts = load_results_dict(outlier_artifacts_path)
                try:
                    reconstructions_submodel, true_pupil_submodel, labels_submodel = (
                        get_imputation_results_per_model(model_name, outlier_artifacts)
                    )

                    _ = recompute_submodel_imputation_metrics(
                        run_id,
                        submodel_mlflow_run,
                        model_name,
                        gt_dict,
                        gt_preprocess,
                        reconstructions_submodel,
                        cfg,
                    )

                except Exception as e:
                    logger.error(
                        "Problem getting missingness mask and imputation, error = {}".format(
                            e
                        )
                    )
                    raise ValueError(
                        "Problem getting missingness mask and imputation, error = {}".format(
                            e
                        )
                    )

                no_submodel_ensembled += 1
                submodel_names.append(model_name)
                if len(reconstructions["train"]) == 0:
                    reconstructions["train"] = reconstructions_submodel["train"][
                        np.newaxis, ...
                    ]
                    reconstructions["test"] = reconstructions_submodel["test"][
                        np.newaxis, ...
                    ]
                else:
                    # stack numpy arrays to 3d arrays
                    reconstructions["train"] = np.vstack(
                        (
                            reconstructions["train"],
                            reconstructions_submodel["train"][np.newaxis, ...],
                        )
                    )
                    reconstructions["test"] = np.vstack(
                        (
                            reconstructions["test"],
                            reconstructions_submodel["test"][np.newaxis, ...],
                        )
                    )
            except Exception as e:
                logger.warning(f"Error: {e}")

    logger.info(
        "Out of {} submodels, {} were ensembled".format(
            no_submodel_runs, no_submodel_ensembled
        )
    )
    return reconstructions, true_pupil_submodel, labels_submodel

compute_ensemble_imputation_metrics

compute_ensemble_imputation_metrics(
    recons, true_pupil, labels, cfg, metadata_dict
)

Compute imputation metrics for ensemble (averaged) predictions.

PARAMETER DESCRIPTION
recons

Stacked reconstructions per split.

TYPE: dict

true_pupil

Ground truth values per split.

TYPE: dict

labels

Imputation masks per split.

TYPE: dict

cfg

Main Hydra configuration.

TYPE: DictConfig

metadata_dict

Metadata per split.

TYPE: dict

RETURNS DESCRIPTION
dict

Metrics per split.

ndarray

Ensemble predictions (mean of reconstructions).

Source code in src/ensemble/ensemble_imputation.py
def compute_ensemble_imputation_metrics(recons, true_pupil, labels, cfg, metadata_dict):
    """
    Compute imputation metrics for ensemble (averaged) predictions.

    Parameters
    ----------
    recons : dict
        Stacked reconstructions per split.
    true_pupil : dict
        Ground truth values per split.
    labels : dict
        Imputation masks per split.
    cfg : DictConfig
        Main Hydra configuration.
    metadata_dict : dict
        Metadata per split.

    Returns
    -------
    dict
        Metrics per split.
    np.ndarray
        Ensemble predictions (mean of reconstructions).
    """
    metrics = {}
    for split in true_pupil.keys():
        targets = true_pupil[split]
        missingness_mask = labels[split]
        predictions = np.mean(recons[split], axis=0)
        metrics[split] = compute_imputation_metrics(
            targets,
            predictions,
            missingness_mask,
            cfg=cfg,
            metadata_dict=metadata_dict[split],
        )

    return metrics, predictions

get_imputation_stats_dict

get_imputation_stats_dict(ensembled_recon, labels, p=0.05)

Compute ensemble statistics from stacked reconstructions.

Calculates mean, std, and confidence intervals across submodels.

PARAMETER DESCRIPTION
ensembled_recon

Stacked reconstructions (n_models x n_subjects x n_timepoints).

TYPE: ndarray

labels

Imputation indicating mask.

TYPE: ndarray

p

Percentile for confidence interval bounds.

TYPE: float DEFAULT: 0.05

RETURNS DESCRIPTION
dict

Dictionary with imputation statistics ready for downstream use.

Source code in src/ensemble/ensemble_imputation.py
def get_imputation_stats_dict(ensembled_recon, labels, p=0.05):
    """
    Compute ensemble statistics from stacked reconstructions.

    Calculates mean, std, and confidence intervals across submodels.

    Parameters
    ----------
    ensembled_recon : np.ndarray
        Stacked reconstructions (n_models x n_subjects x n_timepoints).
    labels : np.ndarray
        Imputation indicating mask.
    p : float, default 0.05
        Percentile for confidence interval bounds.

    Returns
    -------
    dict
        Dictionary with imputation statistics ready for downstream use.
    """
    warnings.simplefilter("ignore")
    ci = np.nanpercentile(ensembled_recon, [p, 100 - p], axis=0)
    stats_dict = {
        "imputation_dict": {
            "imputation": {
                "mean": np.mean(ensembled_recon, axis=0),
                "std": np.std(ensembled_recon, axis=0),
                "imputation_ci_pos": ci[1, :, :],
                "imputation_ci_neg": ci[0, :, :],
            },
            "indicating_mask": labels,
        }
    }
    warnings.resetwarnings()
    return stats_dict

add_imputation_dict

add_imputation_dict(recons, predictions, labels, stdz_dict)

Create imputation output dictionary for downstream processing.

Standardizes reconstructions and computes ensemble statistics in format expected by featurization code.

PARAMETER DESCRIPTION
recons

Stacked reconstructions per split.

TYPE: dict

predictions

Ensemble mean predictions.

TYPE: ndarray

labels

Imputation masks per split.

TYPE: dict

stdz_dict

Standardization parameters.

TYPE: dict

RETURNS DESCRIPTION
dict

Dictionary with 'model_artifacts' key containing imputation data.

See Also

src.featurization.get_arrays_for_splits_from_imputer_artifacts

Source code in src/ensemble/ensemble_imputation.py
def add_imputation_dict(recons, predictions, labels, stdz_dict):
    """
    Create imputation output dictionary for downstream processing.

    Standardizes reconstructions and computes ensemble statistics
    in format expected by featurization code.

    Parameters
    ----------
    recons : dict
        Stacked reconstructions per split.
    predictions : np.ndarray
        Ensemble mean predictions.
    labels : dict
        Imputation masks per split.
    stdz_dict : dict
        Standardization parameters.

    Returns
    -------
    dict
        Dictionary with 'model_artifacts' key containing imputation data.

    See Also
    --------
    src.featurization.get_arrays_for_splits_from_imputer_artifacts
    """
    logger.info("Compute Imputation ensemble stats")
    dict_out = {}
    dict_out["model_artifacts"] = {"imputation": {}}
    for split in recons.keys():
        logger.info(f"Split = {split}")
        ensembled_recon = recons[split]  # (no_submodels, no_samples, no_timepoints)
        ensembled_recon = standardize_recons_arrays(ensembled_recon, stdz_dict)
        dict_out["model_artifacts"]["imputation"][split] = get_imputation_stats_dict(
            ensembled_recon, labels[split]
        )

    return dict_out

ensemble_imputation

ensemble_imputation(
    ensemble_model_runs: DataFrame,
    cfg: DictConfig,
    sources: dict,
    ensemble_name: str,
    recompute_metrics: bool = False,
)

Create imputation ensemble from multiple models.

Main entry point for imputation ensembling. Loads reconstructions from submodels, averages them, and computes metrics.

PARAMETER DESCRIPTION
ensemble_model_runs

DataFrame of MLflow imputation runs to ensemble.

TYPE: DataFrame

cfg

Main Hydra configuration.

TYPE: DictConfig

sources

Source data including ground truth.

TYPE: dict

ensemble_name

Name for the ensemble.

TYPE: str

recompute_metrics

If True, only recompute submodel metrics without creating ensemble.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict or None

Ensemble output dictionary with metrics, reconstructions, and model artifacts, or None if recompute_metrics=True.

See Also

ensemble_anomaly_detection.get_anomaly_masks_and_labels

Source code in src/ensemble/ensemble_imputation.py
def ensemble_imputation(
    ensemble_model_runs: pd.DataFrame,
    cfg: DictConfig,
    sources: dict,
    ensemble_name: str,
    recompute_metrics: bool = False,
):
    """
    Create imputation ensemble from multiple models.

    Main entry point for imputation ensembling. Loads reconstructions from
    submodels, averages them, and computes metrics.

    Parameters
    ----------
    ensemble_model_runs : pd.DataFrame
        DataFrame of MLflow imputation runs to ensemble.
    cfg : DictConfig
        Main Hydra configuration.
    sources : dict
        Source data including ground truth.
    ensemble_name : str
        Name for the ensemble.
    recompute_metrics : bool, default False
        If True, only recompute submodel metrics without creating ensemble.

    Returns
    -------
    dict or None
        Ensemble output dictionary with metrics, reconstructions, and
        model artifacts, or None if recompute_metrics=True.

    See Also
    --------
    ensemble_anomaly_detection.get_anomaly_masks_and_labels
    """

    # Get imputation mask and labels for each model
    gt_dict = sources["pupil_gt"]["df"]
    gt_preprocess = sources["pupil_gt"]["preprocess"]
    recons, true_pupil, labels_imputation = get_imputation_preds_and_labels(
        ensemble_model_runs, gt_dict, gt_preprocess, cfg=cfg
    )

    if not recompute_metrics:
        # Compute the metrics for the ensemble(s)
        metadata_dict = get_metadata_dict_from_sources(sources)
        labels = get_gt_imputation_labels(sources)
        metrics, predictions = compute_ensemble_imputation_metrics(
            recons, true_pupil, labels, cfg, metadata_dict
        )

        # Predictions (mean of recons), recons is the ensemble with all the submodel predictions
        stdz_dict = sources["pupil_gt"]["preprocess"]["standardization"]
        ensemble_output = add_imputation_dict(recons, predictions, labels, stdz_dict)
        ensemble_output["metrics"] = metrics
        ensemble_output["recons"] = recons

    else:
        ensemble_output = None
        logger.info(
            "Skipping ensemble imputation as we are now just re-computing metrics for submodels on this pass"
        )

    return ensemble_output

ensemble_utils

get_unique_models_from_best_runs

get_unique_models_from_best_runs(
    best_runs: DataFrame,
) -> list[str]

Extract unique model architectures from MLflow run names.

PARAMETER DESCRIPTION
best_runs

DataFrame containing MLflow runs with 'tags.mlflow.runName' column.

TYPE: DataFrame

RETURNS DESCRIPTION
list

List of unique model architecture names extracted from run names.

Source code in src/ensemble/ensemble_utils.py
def get_unique_models_from_best_runs(best_runs: pd.DataFrame) -> list[str]:
    """
    Extract unique model architectures from MLflow run names.

    Parameters
    ----------
    best_runs : pd.DataFrame
        DataFrame containing MLflow runs with 'tags.mlflow.runName' column.

    Returns
    -------
    list
        List of unique model architecture names extracted from run names.
    """
    models = []
    model_cfg_names = best_runs["tags.mlflow.runName"].unique()
    for model in model_cfg_names:
        logger.debug(f"Model cfg: {model}")
        model_architecture = model.split("_")[0]
        models.append(model_architecture)

    return list(set(models))

get_best_run_of_the_model

get_best_run_of_the_model(
    best_runs: DataFrame,
    model: str,
    cfg: DictConfig,
    best_metric_cfg: DictConfig,
    task: str,
    include_all_variants: bool = False,
) -> tuple[Optional[Series], Optional[float]]

Get the best performing run for a specific model architecture.

PARAMETER DESCRIPTION
best_runs

DataFrame containing all MLflow runs to search.

TYPE: DataFrame

model

Model architecture name to filter for.

TYPE: str

cfg

Hydra configuration object.

TYPE: DictConfig

best_metric_cfg

Configuration specifying which metric to use for ranking.

TYPE: DictConfig

task

Task type ('anomaly_detection', 'imputation', or 'classification').

TYPE: str

include_all_variants

If True, return all variants of the model instead of just the best.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Series

Best run for the specified model.

float

Best metric value for that run.

Source code in src/ensemble/ensemble_utils.py
def get_best_run_of_the_model(
    best_runs: pd.DataFrame,
    model: str,
    cfg: DictConfig,
    best_metric_cfg: DictConfig,
    task: str,
    include_all_variants: bool = False,
) -> tuple[Optional[pd.Series], Optional[float]]:
    """
    Get the best performing run for a specific model architecture.

    Parameters
    ----------
    best_runs : pd.DataFrame
        DataFrame containing all MLflow runs to search.
    model : str
        Model architecture name to filter for.
    cfg : DictConfig
        Hydra configuration object.
    best_metric_cfg : DictConfig
        Configuration specifying which metric to use for ranking.
    task : str
        Task type ('anomaly_detection', 'imputation', or 'classification').
    include_all_variants : bool, default False
        If True, return all variants of the model instead of just the best.

    Returns
    -------
    pd.Series
        Best run for the specified model.
    float
        Best metric value for that run.
    """

    def parse_run_namme_for_model_name(run_col: pd.Series) -> list[str]:
        model_names = []
        for run_name in run_col:
            model_name = run_name.split("_")[0]
            model_names.append(model_name)
        return model_names

    # parse run name to get the model name -> easier boolean indexing
    def add_parsed_run_name_to_df(
        run_col: pd.Series, best_runs: pd.DataFrame
    ) -> pd.DataFrame:
        model_names = parse_run_namme_for_model_name(run_col)
        best_runs["model_name"] = model_names
        return best_runs

    best_runs = add_parsed_run_name_to_df(
        run_col=best_runs["tags.mlflow.runName"], best_runs=best_runs
    )

    model_best_runs = best_runs[best_runs["model_name"] == model]
    try:
        model_best_run, best_metric = get_best_run_of_pd_dataframe(
            model_best_runs,
            cfg,
            best_metric_cfg,
            task,
            model,
            include_all_variants=include_all_variants,
        )
        logger.debug(
            "Best run_name for model {}: {}".format(
                model, best_runs["tags.mlflow.runName"]
            )
        )
    except Exception as e:
        logger.error(f"Could not get best run for model {model} with error: {e}")
        raise e

    return model_best_run, best_metric

exclude_ensembles_from_mlflow_runs

exclude_ensembles_from_mlflow_runs(
    best_runs: DataFrame,
) -> Optional[DataFrame]

Filter out ensemble runs from MLflow runs DataFrame.

Removes runs that have 'ensemble' in their run name, keeping only individual submodel runs for ensemble creation.

PARAMETER DESCRIPTION
best_runs

DataFrame containing MLflow runs.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame or None

Filtered DataFrame without ensemble runs, or None if empty.

Source code in src/ensemble/ensemble_utils.py
def exclude_ensembles_from_mlflow_runs(
    best_runs: pd.DataFrame,
) -> Optional[pd.DataFrame]:
    """
    Filter out ensemble runs from MLflow runs DataFrame.

    Removes runs that have 'ensemble' in their run name, keeping only
    individual submodel runs for ensemble creation.

    Parameters
    ----------
    best_runs : pd.DataFrame
        DataFrame containing MLflow runs.

    Returns
    -------
    pd.DataFrame or None
        Filtered DataFrame without ensemble runs, or None if empty.
    """
    # you do not want to get already existing ensembled models, but only the submodels
    logger.info('Excluding runs with "ensemble" in the name')
    if best_runs.shape[0] > 0:
        best_runs = best_runs[
            ~best_runs["tags.mlflow.runName"].str.contains("ensemble")
        ]
    else:
        best_runs = None
    return best_runs

exclude_imputation_ensembles_from_mlflow_runs

exclude_imputation_ensembles_from_mlflow_runs(
    best_runs: DataFrame,
) -> DataFrame

Filter out imputation ensemble runs from MLflow runs.

Parses run names to identify and exclude imputation ensembles, keeping only single-model imputation runs.

PARAMETER DESCRIPTION
best_runs

DataFrame containing MLflow imputation runs.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

Filtered DataFrame without imputation ensemble runs.

Source code in src/ensemble/ensemble_utils.py
def exclude_imputation_ensembles_from_mlflow_runs(
    best_runs: pd.DataFrame,
) -> pd.DataFrame:
    """
    Filter out imputation ensemble runs from MLflow runs.

    Parses run names to identify and exclude imputation ensembles,
    keeping only single-model imputation runs.

    Parameters
    ----------
    best_runs : pd.DataFrame
        DataFrame containing MLflow imputation runs.

    Returns
    -------
    pd.DataFrame
        Filtered DataFrame without imputation ensemble runs.
    """
    runs_out = pd.DataFrame()
    for idx, row in best_runs.iterrows():
        run_name = row["tags.mlflow.runName"]
        model_name, anomaly_source = parse_imputation_run_name_for_ensemble(run_name)
        if "ensemble" not in model_name:
            runs_out = pd.concat([runs_out, pd.DataFrame(row).T])
        else:
            logger.info("Not recomputing for model_name = {}".format(model_name))

    return runs_out

keep_only_imputations_from_anomaly_ensembles

keep_only_imputations_from_anomaly_ensembles(
    best_runs: DataFrame,
) -> DataFrame

Filter to keep only imputation runs that use anomaly ensemble outputs.

PARAMETER DESCRIPTION
best_runs

DataFrame containing MLflow imputation runs.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

Filtered DataFrame with only runs using anomaly ensemble as input.

Source code in src/ensemble/ensemble_utils.py
def keep_only_imputations_from_anomaly_ensembles(
    best_runs: pd.DataFrame,
) -> pd.DataFrame:
    """
    Filter to keep only imputation runs that use anomaly ensemble outputs.

    Parameters
    ----------
    best_runs : pd.DataFrame
        DataFrame containing MLflow imputation runs.

    Returns
    -------
    pd.DataFrame
        Filtered DataFrame with only runs using anomaly ensemble as input.
    """
    runs_out = pd.DataFrame()
    for idx, row in best_runs.iterrows():
        run_name = row["tags.mlflow.runName"]
        model_name, anomaly_source = parse_imputation_run_name_for_ensemble(run_name)
        if "ensemble" in anomaly_source:
            runs_out = pd.concat([runs_out, pd.DataFrame(row).T])

    return runs_out

remove_worst_model

remove_worst_model(
    best_unique_models: dict[str, Series],
    best_metrics: list[float],
    best_metric_cfg: DictConfig,
) -> dict[str, Series]

Remove the worst performing model from the ensemble candidates.

Used to ensure odd number of models for majority voting in anomaly detection.

PARAMETER DESCRIPTION
best_unique_models

Dictionary mapping model names to their MLflow run data.

TYPE: dict

best_metrics

List of metric values corresponding to each model.

TYPE: list

best_metric_cfg

Configuration specifying metric direction ('DESC' or 'ASC').

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Updated dictionary with worst model removed.

Source code in src/ensemble/ensemble_utils.py
def remove_worst_model(
    best_unique_models: dict[str, pd.Series],
    best_metrics: list[float],
    best_metric_cfg: DictConfig,
) -> dict[str, pd.Series]:
    """
    Remove the worst performing model from the ensemble candidates.

    Used to ensure odd number of models for majority voting in anomaly detection.

    Parameters
    ----------
    best_unique_models : dict
        Dictionary mapping model names to their MLflow run data.
    best_metrics : list
        List of metric values corresponding to each model.
    best_metric_cfg : DictConfig
        Configuration specifying metric direction ('DESC' or 'ASC').

    Returns
    -------
    dict
        Updated dictionary with worst model removed.
    """
    if best_metric_cfg["direction"] == "DESC":
        # remove the lowest value when largest value is the best
        idx = np.nanargmin(best_metrics)
    elif best_metric_cfg["direction"] == "ASC":
        # remove the highest value when lowest value is the best
        idx = np.nanargmax(best_metrics)
    else:
        logger.error(f"Direction {best_metric_cfg['direction']} not implemented")
        raise NotImplementedError(
            f"Direction {best_metric_cfg['direction']} not implemented"
        )

    model_keys = list(best_unique_models.keys())
    model_to_remove = model_keys[idx]
    logger.info(f"Removing the worst model: {model_to_remove}")
    best_unique_models.pop(model_to_remove)

    return best_unique_models

exclude_pupil_orig_imputed

exclude_pupil_orig_imputed(
    best_unique_models: dict[str, Series],
    best_metrics: list[float],
) -> tuple[dict[str, Series], list[float]]

Exclude models trained on original (non-ground-truth) pupil data.

Removes models with 'orig' in their name to keep only ground-truth trained models.

PARAMETER DESCRIPTION
best_unique_models

Dictionary mapping model names to their MLflow run data.

TYPE: dict

best_metrics

List of metric values corresponding to each model.

TYPE: list

RETURNS DESCRIPTION
dict

Filtered dictionary without 'orig' models.

list

Corresponding filtered metrics list.

Source code in src/ensemble/ensemble_utils.py
def exclude_pupil_orig_imputed(
    best_unique_models: dict[str, pd.Series],
    best_metrics: list[float],
) -> tuple[dict[str, pd.Series], list[float]]:
    """
    Exclude models trained on original (non-ground-truth) pupil data.

    Removes models with 'orig' in their name to keep only ground-truth trained models.

    Parameters
    ----------
    best_unique_models : dict
        Dictionary mapping model names to their MLflow run data.
    best_metrics : list
        List of metric values corresponding to each model.

    Returns
    -------
    dict
        Filtered dictionary without 'orig' models.
    list
        Corresponding filtered metrics list.
    """
    # Don't include the models trained on "pupil_orig_imputed" data, use just the 'gt' ones
    model_names = list(best_unique_models.keys())
    metrics_out = []
    for metric, model in zip(best_metrics, model_names):
        if "orig" in model:
            logger.info(
                'Removing "pupil_orig_imputed" run from ensemble, model: {}'.format(
                    model
                )
            )
            best_unique_models.pop(model)
        else:
            metrics_out.append(metric)
    return best_unique_models, metrics_out

get_anomaly_runs

get_anomaly_runs(
    best_runs: DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
    return_odd_number_of_models: bool = False,
    exclude_orig_data: bool = True,
    include_all_variants: bool = False,
) -> dict[str, Series]

Get best anomaly detection runs for ensemble creation.

PARAMETER DESCRIPTION
best_runs

DataFrame containing MLflow anomaly detection runs.

TYPE: DataFrame

best_metric_cfg

Configuration for metric selection and thresholding.

TYPE: DictConfig

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type (should be 'anomaly_detection').

TYPE: str

return_odd_number_of_models

If True, ensures odd number of models for majority voting.

TYPE: bool DEFAULT: False

exclude_orig_data

If True, excludes models trained on original (non-GT) data.

TYPE: bool DEFAULT: True

include_all_variants

If True, includes all model variants instead of just best.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict

Dictionary mapping model names to their best MLflow run data.

Source code in src/ensemble/ensemble_utils.py
def get_anomaly_runs(
    best_runs: pd.DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
    return_odd_number_of_models: bool = False,
    exclude_orig_data: bool = True,
    include_all_variants: bool = False,
) -> dict[str, pd.Series]:
    """
    Get best anomaly detection runs for ensemble creation.

    Parameters
    ----------
    best_runs : pd.DataFrame
        DataFrame containing MLflow anomaly detection runs.
    best_metric_cfg : DictConfig
        Configuration for metric selection and thresholding.
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type (should be 'anomaly_detection').
    return_odd_number_of_models : bool, default False
        If True, ensures odd number of models for majority voting.
    exclude_orig_data : bool, default True
        If True, excludes models trained on original (non-GT) data.
    include_all_variants : bool, default False
        If True, includes all model variants instead of just best.

    Returns
    -------
    dict
        Dictionary mapping model names to their best MLflow run data.
    """
    unique_models: list = get_unique_models_from_best_runs(best_runs)
    best_unique_models = {}
    best_metrics = []
    for model in unique_models:
        logger.debug("Getting the best run for model: {}".format(model))
        best_run, best_metric = get_best_run_of_the_model(
            best_runs,
            model,
            cfg,
            best_metric_cfg=best_metric_cfg,
            task=task,
            include_all_variants=include_all_variants,
        )
        if best_run is not None:
            best_unique_models[model] = best_run
            best_metrics.append(best_metric)

    if len(best_metrics) == 0:
        logger.error(
            "No best runs were added? glitch somewhere? Ensemble thresholding too high?"
        )
        # e.g. cfg['OUTLIER_DETECTION']['best_metric']['ensemble_quality_threshold']
        raise RuntimeError("No best runs were added? glitch somewhere?")

    if not include_all_variants:
        if exclude_orig_data:
            best_unique_models, best_metrics = exclude_pupil_orig_imputed(
                best_unique_models, best_metrics
            )

    if return_odd_number_of_models:
        if len(best_unique_models) % 2 == 0:
            logger.info("Returning odd number of models for anomaly detection")
            best_unique_models = remove_worst_model(
                best_unique_models, best_metrics, best_metric_cfg=best_metric_cfg
            )

    # When you include all the variants, the dataframe in each submodel might contain multiple rows,
    # create new key for each row so that downstream code works
    if include_all_variants:
        best_unique_models = create_new_keys_for_all_variants(best_unique_models)

    return best_unique_models

create_new_keys_for_all_variants

create_new_keys_for_all_variants(
    best_unique_models: dict[str, DataFrame],
) -> dict[str, DataFrame]

Expand model dictionary to have separate keys for each model variant.

When include_all_variants is True, DataFrames may contain multiple rows. This function creates a new key for each row to support downstream processing.

PARAMETER DESCRIPTION
best_unique_models

Dictionary where values may be multi-row DataFrames.

TYPE: dict

RETURNS DESCRIPTION
dict

Dictionary with separate keys for each model variant.

Source code in src/ensemble/ensemble_utils.py
def create_new_keys_for_all_variants(
    best_unique_models: dict[str, pd.DataFrame],
) -> dict[str, pd.DataFrame]:
    """
    Expand model dictionary to have separate keys for each model variant.

    When include_all_variants is True, DataFrames may contain multiple rows.
    This function creates a new key for each row to support downstream processing.

    Parameters
    ----------
    best_unique_models : dict
        Dictionary where values may be multi-row DataFrames.

    Returns
    -------
    dict
        Dictionary with separate keys for each model variant.
    """
    logger.info("Creating new keys for all models (all variants)")
    best_unique_models_out = {}
    for submodel, model_df in best_unique_models.items():
        no_rows = model_df.shape[0]
        if no_rows > 1:
            for idx, row in model_df.iterrows():
                submodel_name = row["tags.mlflow.runName"]
                best_unique_models_out[submodel_name] = pd.DataFrame(row).T
        else:
            best_unique_models_out[submodel] = model_df
    logger.info("-> {} of model variants in total".format(len(best_unique_models_out)))
    logger.info(list(best_unique_models_out.keys()))

    return best_unique_models_out

get_best_imputation_col_name

get_best_imputation_col_name(
    best_metric_cfg: DictConfig,
) -> str

Construct MLflow column name for imputation metric.

PARAMETER DESCRIPTION
best_metric_cfg

Configuration containing 'split' and 'string' keys.

TYPE: DictConfig

RETURNS DESCRIPTION
str

MLflow column name in format 'metrics.{split}/{metric_name}'.

Source code in src/ensemble/ensemble_utils.py
def get_best_imputation_col_name(best_metric_cfg: DictConfig) -> str:
    """
    Construct MLflow column name for imputation metric.

    Parameters
    ----------
    best_metric_cfg : DictConfig
        Configuration containing 'split' and 'string' keys.

    Returns
    -------
    str
        MLflow column name in format 'metrics.{split}/{metric_name}'.
    """
    split = best_metric_cfg["split"]
    metric_name = best_metric_cfg["string"]
    return f"metrics.{split}/{metric_name}"

get_best_imputation_model_per_run_name

get_best_imputation_model_per_run_name(
    runs: DataFrame, best_metric_cfg: DictConfig
) -> DataFrame

Select the best run when multiple runs share the same run name.

PARAMETER DESCRIPTION
runs

DataFrame of runs with the same run name.

TYPE: DataFrame

best_metric_cfg

Configuration specifying which metric and direction to use.

TYPE: DictConfig

RETURNS DESCRIPTION
DataFrame

Single-row DataFrame with the best run.

Source code in src/ensemble/ensemble_utils.py
def get_best_imputation_model_per_run_name(
    runs: pd.DataFrame,
    best_metric_cfg: DictConfig,
) -> pd.DataFrame:
    """
    Select the best run when multiple runs share the same run name.

    Parameters
    ----------
    runs : pd.DataFrame
        DataFrame of runs with the same run name.
    best_metric_cfg : DictConfig
        Configuration specifying which metric and direction to use.

    Returns
    -------
    pd.DataFrame
        Single-row DataFrame with the best run.
    """
    if runs.shape[0] > 1:
        # e.g. 'metrics.test/mae'
        col_name = get_best_imputation_col_name(best_metric_cfg)
        if best_metric_cfg["direction"] == "DESC":
            runs = runs.sort_values(by=col_name, ascending=False)
        elif best_metric_cfg["direction"] == "ASC":
            runs = runs.sort_values(by=col_name, ascending=True)
        else:
            logger.error(f"Direction {best_metric_cfg['direction']} not implemented")
            raise NotImplementedError(
                f"Direction {best_metric_cfg['direction']} not implemented"
            )
        return runs.iloc[[0]]
    else:
        return runs

get_best_unique_imputation_models

get_best_unique_imputation_models(
    best_runs: DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
) -> DataFrame

Get unique best imputation models, one per run name.

PARAMETER DESCRIPTION
best_runs

DataFrame containing all MLflow imputation runs.

TYPE: DataFrame

best_metric_cfg

Configuration for metric selection and thresholding.

TYPE: DictConfig

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type (should be 'imputation').

TYPE: str

RETURNS DESCRIPTION
DataFrame

DataFrame with one row per unique model configuration.

Source code in src/ensemble/ensemble_utils.py
def get_best_unique_imputation_models(
    best_runs: pd.DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
) -> pd.DataFrame:
    """
    Get unique best imputation models, one per run name.

    Parameters
    ----------
    best_runs : pd.DataFrame
        DataFrame containing all MLflow imputation runs.
    best_metric_cfg : DictConfig
        Configuration for metric selection and thresholding.
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type (should be 'imputation').

    Returns
    -------
    pd.DataFrame
        DataFrame with one row per unique model configuration.
    """
    best_unique_runs = pd.DataFrame()
    unique_run_names = best_runs["tags.mlflow.runName"].unique()
    for run_name in unique_run_names:
        # if you have ran the same config multiple times, pick the best run, most likely you
        # just have one copy per run_name
        runs: pd.DataFrame = best_runs[best_runs["tags.mlflow.runName"] == run_name]
        best_run = get_best_imputation_model_per_run_name(runs, best_metric_cfg)
        col_name = get_best_imputation_col_name(best_metric_cfg)
        best_run = threshold_filter_run(best_run, col_name, best_metric_cfg)
        best_unique_runs = pd.concat([best_unique_runs, best_run])

    return best_unique_runs

parse_imputation_run_name_for_ensemble

parse_imputation_run_name_for_ensemble(
    run_name: str,
) -> tuple[str, str]

Parse imputation run name to extract model and anomaly source.

Run names follow format: '{model_name}__{anomaly_source}'

PARAMETER DESCRIPTION
run_name

MLflow run name for imputation model.

TYPE: str

RETURNS DESCRIPTION
str

Model name (e.g., 'SAITS', 'MOMENT-finetune').

str

Anomaly source (e.g., 'pupil_gt_', 'LOF').

RAISES DESCRIPTION
ValueError

If run name cannot be parsed.

Source code in src/ensemble/ensemble_utils.py
def parse_imputation_run_name_for_ensemble(run_name: str) -> tuple[str, str]:
    """
    Parse imputation run name to extract model and anomaly source.

    Run names follow format: '{model_name}__{anomaly_source}'

    Parameters
    ----------
    run_name : str
        MLflow run name for imputation model.

    Returns
    -------
    str
        Model name (e.g., 'SAITS', 'MOMENT-finetune').
    str
        Anomaly source (e.g., 'pupil_gt_', 'LOF').

    Raises
    ------
    ValueError
        If run name cannot be parsed.
    """
    fields = run_name.replace("___", "__").split("__")
    if len(fields) == 2:
        model_name, anomaly_source = run_name.split("__")
    elif len(fields) == 3:
        model_name, anomaly_source, extra = run_name.split("__")
        anomaly_source += extra
    else:
        logger.error("Unknown parsing for run name: {}".format(run_name))
        raise ValueError("Unknown parsing for run name: {}".format(run_name))

    return model_name, anomaly_source

filter_runs_for_gt

filter_runs_for_gt(
    best_runs: DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
    return_best_gt: bool = False,
    gt_on: Optional[str] = "anomaly",
) -> DataFrame

Filter runs based on ground truth usage.

PARAMETER DESCRIPTION
best_runs

DataFrame containing MLflow runs.

TYPE: DataFrame

best_metric_cfg

Configuration for metric selection.

TYPE: DictConfig

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type ('imputation' or 'classification').

TYPE: str

return_best_gt

If True, return only runs using ground truth. If False, return only runs NOT using ground truth.

TYPE: bool DEFAULT: False

gt_on

For classification, which component should have GT: - 'anomaly': only anomaly detection uses GT - 'imputation': only imputation uses GT - None: both must use GT

TYPE: str DEFAULT: 'anomaly'

RETURNS DESCRIPTION
DataFrame

Filtered runs based on GT criteria.

Source code in src/ensemble/ensemble_utils.py
def filter_runs_for_gt(
    best_runs: pd.DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
    return_best_gt: bool = False,
    gt_on: Optional[str] = "anomaly",
) -> pd.DataFrame:
    """
    Filter runs based on ground truth usage.

    Parameters
    ----------
    best_runs : pd.DataFrame
        DataFrame containing MLflow runs.
    best_metric_cfg : DictConfig
        Configuration for metric selection.
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type ('imputation' or 'classification').
    return_best_gt : bool, default False
        If True, return only runs using ground truth.
        If False, return only runs NOT using ground truth.
    gt_on : str, default 'anomaly'
        For classification, which component should have GT:
        - 'anomaly': only anomaly detection uses GT
        - 'imputation': only imputation uses GT
        - None: both must use GT

    Returns
    -------
    pd.DataFrame
        Filtered runs based on GT criteria.
    """
    if task == "classification":
        if return_best_gt and gt_on is None:
            logger.debug("When you want gt anomaly AND gt imputation")

    best_runs_filtered = pd.DataFrame()
    # iterate through dataframe rows
    for index, row in best_runs.iterrows():
        run_name = row["tags.mlflow.runName"]

        if task == "imputation":
            model_name, anomaly_source = parse_imputation_run_name_for_ensemble(
                run_name
            )

            on_gt: bool = anomaly_source.startswith("pupil_gt_")
            if return_best_gt:
                if on_gt:
                    logger.debug(f"Keeping the run with GT: {run_name}")
                    best_runs_filtered = pd.concat(
                        [best_runs_filtered, row.to_frame().T]
                    )
            else:
                if not on_gt:
                    logger.debug(f"Keeping the run without GT: {run_name}")
                    best_runs_filtered = pd.concat(
                        [best_runs_filtered, row.to_frame().T]
                    )

        elif task == "classification":
            fields = run_name.split("__")
            try:
                if len(fields) == 2:
                    # ensembling broke the naming convention
                    model_name, imputation_source = run_name.split("__")
                    anomaly_source = imputation_source
                    feature_name = "simple1.0"
                    logger.warning(
                        'Hard-coded feature name "{}" for multi-classifier ensemble'.format(
                            feature_name
                        )
                    )
                elif len(fields) == 4:
                    model_name, feature_name, imputation_source, anomaly_source = (
                        run_name.split("__")
                    )
            except Exception as e:
                logger.error(
                    "Could not get the run_name = {} with error: {}".format(run_name, e)
                )
                raise e

            anomaly_on_gt: bool = anomaly_source.startswith("pupil-gt")
            imputation_on_gt: bool = imputation_source.startswith("pupil-gt")
            on_gt: bool = anomaly_on_gt and imputation_on_gt
            some_gt: bool = anomaly_on_gt or imputation_on_gt

            if return_best_gt:
                if on_gt and gt_on is None:
                    logger.debug(
                        f"Keeping the run with GT (both Anomaly and Imputation): {run_name}"
                    )
                    best_runs_filtered = pd.concat(
                        [best_runs_filtered, row.to_frame().T]
                    )
                elif some_gt and not on_gt:
                    if gt_on == "anomaly" and anomaly_on_gt and not imputation_on_gt:
                        logger.debug(f"Keeping the run with Anomaly GT: {run_name}")
                        best_runs_filtered = pd.concat(
                            [best_runs_filtered, row.to_frame().T]
                        )
                    elif (
                        gt_on == "imputation" and imputation_on_gt and not anomaly_on_gt
                    ):
                        logger.debug(f"Keeping the run with Imputation GT: {run_name}")
                        best_runs_filtered = pd.concat(
                            [best_runs_filtered, row.to_frame().T]
                        )

            else:
                if not some_gt:
                    logger.debug(f"Keeping the run without GT: {run_name}")
                    best_runs_filtered = pd.concat(
                        [best_runs_filtered, row.to_frame().T]
                    )
        else:
            logger.error(f"Task {task} not implemented yet")
            raise NotImplementedError(f"Task {task} not implemented yet")

    if best_runs_filtered.shape[0] == 0:
        logger.warning("No runs after filtering!")

    return best_runs_filtered

filter_for_detection

filter_for_detection(
    detection_filter_reject: str, best_runs_out: DataFrame
) -> DataFrame

Filter out runs containing specified string (e.g., 'zeroshot').

PARAMETER DESCRIPTION
detection_filter_reject

String to filter out from run names (e.g., 'zeroshot').

TYPE: str

best_runs_out

DataFrame containing MLflow runs.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

Filtered runs without rejected string in name.

Source code in src/ensemble/ensemble_utils.py
def filter_for_detection(
    detection_filter_reject: str, best_runs_out: pd.DataFrame
) -> pd.DataFrame:
    """
    Filter out runs containing specified string (e.g., 'zeroshot').

    Parameters
    ----------
    detection_filter_reject : str
        String to filter out from run names (e.g., 'zeroshot').
    best_runs_out : pd.DataFrame
        DataFrame containing MLflow runs.

    Returns
    -------
    pd.DataFrame
        Filtered runs without rejected string in name.
    """
    logger.info("Rejecting runs with zeroshot in the name (and use the finetuned)")
    runs_out = pd.DataFrame()
    for index, row in best_runs_out.iterrows():
        run_name = row["tags.mlflow.runName"]
        if detection_filter_reject in run_name:
            logger.debug(f"Rejecting the run: {run_name}")
        else:
            runs_out = pd.concat([runs_out, row.to_frame().T])

    return runs_out

get_non_moment_models

get_non_moment_models(
    best_runs_out: DataFrame,
) -> DataFrame

Filter to get only non-MOMENT model runs.

PARAMETER DESCRIPTION
best_runs_out

DataFrame containing MLflow runs.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

Runs without 'MOMENT' in the model name.

Source code in src/ensemble/ensemble_utils.py
def get_non_moment_models(best_runs_out: pd.DataFrame) -> pd.DataFrame:
    """
    Filter to get only non-MOMENT model runs.

    Parameters
    ----------
    best_runs_out : pd.DataFrame
        DataFrame containing MLflow runs.

    Returns
    -------
    pd.DataFrame
        Runs without 'MOMENT' in the model name.
    """
    runs_out = pd.DataFrame()
    for index, row in best_runs_out.iterrows():
        run_name = row["tags.mlflow.runName"]
        model_name, _ = parse_imputation_run_name_for_ensemble(run_name)
        if "MOMENT" not in model_name:
            runs_out = pd.concat([runs_out, row.to_frame().T])
    return runs_out

get_best_moment

get_best_moment(
    best_metric_cfg: DictConfig, runs_moment: DataFrame
) -> Optional[DataFrame]

Get the best performing MOMENT variant.

PARAMETER DESCRIPTION
best_metric_cfg

Configuration specifying metric and sort direction.

TYPE: DictConfig

runs_moment

DataFrame containing only MOMENT model runs.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame or None

Single-row DataFrame with best MOMENT run, or None if no MOMENT models exist.

Source code in src/ensemble/ensemble_utils.py
def get_best_moment(
    best_metric_cfg: DictConfig, runs_moment: pd.DataFrame
) -> Optional[pd.DataFrame]:
    """
    Get the best performing MOMENT variant.

    Parameters
    ----------
    best_metric_cfg : DictConfig
        Configuration specifying metric and sort direction.
    runs_moment : pd.DataFrame
        DataFrame containing only MOMENT model runs.

    Returns
    -------
    pd.DataFrame or None
        Single-row DataFrame with best MOMENT run, or None if no MOMENT models exist.
    """
    if runs_moment is None or runs_moment.empty:
        logger.debug("No MOMENT models found, returning None")
        return None

    col_name = get_best_imputation_col_name(best_metric_cfg)
    if best_metric_cfg["direction"] == "DESC":
        runs_moment = runs_moment.sort_values(by=col_name, ascending=False)
    elif best_metric_cfg["direction"] == "ASC":
        runs_moment = runs_moment.sort_values(by=col_name, ascending=True)
    else:
        logger.error(f"Direction {best_metric_cfg['direction']} not implemented")
        raise NotImplementedError(
            f"Direction {best_metric_cfg['direction']} not implemented"
        )
    runs_moment = runs_moment.iloc[[0]]
    return runs_moment

get_unique_sources

get_unique_sources(best_runs_out: DataFrame) -> list[str]

Extract unique anomaly sources from imputation run names.

PARAMETER DESCRIPTION
best_runs_out

DataFrame containing MLflow imputation runs.

TYPE: DataFrame

RETURNS DESCRIPTION
list

List of unique anomaly source names.

Source code in src/ensemble/ensemble_utils.py
def get_unique_sources(best_runs_out: pd.DataFrame) -> list[str]:
    """
    Extract unique anomaly sources from imputation run names.

    Parameters
    ----------
    best_runs_out : pd.DataFrame
        DataFrame containing MLflow imputation runs.

    Returns
    -------
    list
        List of unique anomaly source names.
    """
    unique_sources, model_names = [], []
    for index, row in best_runs_out.iterrows():
        run_name = row["tags.mlflow.runName"]
        model_name = run_name.split("_")[0]
        source = run_name.split("__")[1]
        unique_sources.append(source)
        model_names.append(model_name)
    return list(set(unique_sources))

keep_moment_models

keep_moment_models(runs_source: DataFrame) -> DataFrame

Filter to keep only MOMENT model runs.

PARAMETER DESCRIPTION
runs_source

DataFrame containing MLflow runs.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

Runs with 'MOMENT' in the model name.

Source code in src/ensemble/ensemble_utils.py
def keep_moment_models(runs_source: pd.DataFrame) -> pd.DataFrame:
    """
    Filter to keep only MOMENT model runs.

    Parameters
    ----------
    runs_source : pd.DataFrame
        DataFrame containing MLflow runs.

    Returns
    -------
    pd.DataFrame
        Runs with 'MOMENT' in the model name.
    """
    df_runs = pd.DataFrame()
    for idx, row in runs_source.iterrows():
        run_name = row["tags.mlflow.runName"]
        model_name, source_name = parse_imputation_run_name_for_ensemble(run_name)
        if model_name.startswith("MOMENT"):
            df_runs = pd.concat([df_runs, row.to_frame().T])

    return df_runs

get_best_moments_per_source

get_best_moments_per_source(
    best_runs_out: DataFrame, best_metric_cfg: DictConfig
) -> Optional[DataFrame]

Get best MOMENT model for each unique anomaly source.

PARAMETER DESCRIPTION
best_runs_out

DataFrame containing MLflow imputation runs.

TYPE: DataFrame

best_metric_cfg

Configuration for metric selection.

TYPE: DictConfig

RETURNS DESCRIPTION
DataFrame or None

Best MOMENT run per source, or None if no MOMENT models found.

Source code in src/ensemble/ensemble_utils.py
def get_best_moments_per_source(
    best_runs_out: pd.DataFrame, best_metric_cfg: DictConfig
) -> Optional[pd.DataFrame]:
    """
    Get best MOMENT model for each unique anomaly source.

    Parameters
    ----------
    best_runs_out : pd.DataFrame
        DataFrame containing MLflow imputation runs.
    best_metric_cfg : DictConfig
        Configuration for metric selection.

    Returns
    -------
    pd.DataFrame or None
        Best MOMENT run per source, or None if no MOMENT models found.
    """
    runs_moment = pd.DataFrame()
    unique_sources = get_unique_sources(best_runs_out)
    for unique_source in unique_sources:
        runs_source = best_runs_out[
            best_runs_out["tags.mlflow.runName"].str.contains(unique_source)
        ]
        runs_moment_as_model = keep_moment_models(runs_source)
        if runs_moment_as_model.shape[0] > 0:
            runs_moment_per_source = get_best_moment(
                best_metric_cfg, runs_moment_as_model
            )
            if runs_moment_per_source is not None:
                runs_moment = pd.concat([runs_moment, runs_moment_per_source])
        # Note: If no MOMENT models for this source, just skip (don't set to None)

    # Return None if no MOMENT models were found across any source
    if runs_moment.empty:
        return None
    return runs_moment

get_best_moment_variant

get_best_moment_variant(
    best_runs_out: DataFrame,
    best_metric_cfg: DictConfig,
    return_best_gt: bool,
) -> DataFrame

Get best MOMENT variant while preserving non-MOMENT models.

Handles MOMENT variants (finetune, zeroshot) by selecting best one per anomaly source, then combines with non-MOMENT models.

PARAMETER DESCRIPTION
best_runs_out

DataFrame containing all imputation runs.

TYPE: DataFrame

best_metric_cfg

Configuration for metric selection.

TYPE: DictConfig

return_best_gt

Whether filtering for ground truth runs.

TYPE: bool

RETURNS DESCRIPTION
DataFrame

Combined DataFrame with best MOMENT variants and all non-MOMENT models.

Source code in src/ensemble/ensemble_utils.py
def get_best_moment_variant(
    best_runs_out: pd.DataFrame, best_metric_cfg: DictConfig, return_best_gt: bool
) -> pd.DataFrame:
    """
    Get best MOMENT variant while preserving non-MOMENT models.

    Handles MOMENT variants (finetune, zeroshot) by selecting best one
    per anomaly source, then combines with non-MOMENT models.

    Parameters
    ----------
    best_runs_out : pd.DataFrame
        DataFrame containing all imputation runs.
    best_metric_cfg : DictConfig
        Configuration for metric selection.
    return_best_gt : bool
        Whether filtering for ground truth runs.

    Returns
    -------
    pd.DataFrame
        Combined DataFrame with best MOMENT variants and all non-MOMENT models.
    """
    logger.info("Getting best MOMENT variant")

    # Early return for empty DataFrame to avoid KeyError on column access
    if best_runs_out is None or best_runs_out.empty:
        logger.warning(
            "Empty DataFrame passed to get_best_moment_variant, returning empty"
        )
        return pd.DataFrame()

    non_moment_runs = get_non_moment_models(best_runs_out)
    if return_best_gt:
        # easier task as just filter MOMENT as they all have the same source
        runs_moment = best_runs_out[
            best_runs_out["tags.mlflow.runName"].str.contains("MOMENT")
        ]
        runs_moment = get_best_moment(best_metric_cfg, runs_moment)
    else:
        runs_moment = get_best_moments_per_source(best_runs_out, best_metric_cfg)

    # Handle cases where MOMENT and/or non-MOMENT models may not exist
    has_moment = runs_moment is not None and not runs_moment.empty
    has_non_moment = non_moment_runs is not None and not non_moment_runs.empty

    if has_moment and has_non_moment:
        runs_out = pd.concat([non_moment_runs, runs_moment])
    elif has_moment:
        runs_out = runs_moment
    elif has_non_moment:
        runs_out = non_moment_runs
    else:
        logger.warning(
            "No MOMENT or non-MOMENT models found, returning empty DataFrame"
        )
        runs_out = pd.DataFrame()

    return runs_out

get_imputation_runs

get_imputation_runs(
    best_runs: DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
    return_best_gt: bool,
    detection_filter_reject: str = "zeroshot",
) -> Optional[DataFrame]

Get best imputation runs for ensemble creation.

Applies multiple filters: unique models, GT filtering, variant filtering, and MOMENT variant selection.

PARAMETER DESCRIPTION
best_runs

DataFrame containing all MLflow imputation runs.

TYPE: DataFrame

best_metric_cfg

Configuration for metric selection and thresholding.

TYPE: DictConfig

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type (should be 'imputation').

TYPE: str

return_best_gt

If True, return only runs using ground truth anomaly detection.

TYPE: bool

detection_filter_reject

String to filter out from run names.

TYPE: str DEFAULT: 'zeroshot'

RETURNS DESCRIPTION
DataFrame or None

Filtered runs for ensemble, or None if no runs pass filters.

Source code in src/ensemble/ensemble_utils.py
def get_imputation_runs(
    best_runs: pd.DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
    return_best_gt: bool,
    detection_filter_reject: str = "zeroshot",
) -> Optional[pd.DataFrame]:
    """
    Get best imputation runs for ensemble creation.

    Applies multiple filters: unique models, GT filtering, variant filtering,
    and MOMENT variant selection.

    Parameters
    ----------
    best_runs : pd.DataFrame
        DataFrame containing all MLflow imputation runs.
    best_metric_cfg : DictConfig
        Configuration for metric selection and thresholding.
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type (should be 'imputation').
    return_best_gt : bool
        If True, return only runs using ground truth anomaly detection.
    detection_filter_reject : str, default 'zeroshot'
        String to filter out from run names.

    Returns
    -------
    pd.DataFrame or None
        Filtered runs for ensemble, or None if no runs pass filters.
    """
    # Keep unique run_names
    best_unique_runs = get_best_unique_imputation_models(
        best_runs, best_metric_cfg, cfg, task
    )

    # Whether to keep only GT or kick out the GT
    best_runs_out = filter_runs_for_gt(
        best_unique_runs, best_metric_cfg, cfg, task, return_best_gt
    )

    if best_runs_out.shape[0] == 0:
        logger.warning("No runs after filtering!")
        return None

    # e.g. MOMENT has finetuned and zeroshot models, and let's just use the finetuned models here in the ensemble
    best_runs_out = filter_for_detection(detection_filter_reject, best_runs_out)

    # Keep only the best MOMENT model
    best_runs_out_final = get_best_moment_variant(
        best_runs_out, best_metric_cfg, return_best_gt
    )

    return best_runs_out_final

get_best_unique_classification_models

get_best_unique_classification_models(
    best_runs: DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
) -> DataFrame

Get unique best classification models, one per run name.

PARAMETER DESCRIPTION
best_runs

Series containing all MLflow classification runs.

TYPE: Series

best_metric_cfg

Configuration for metric selection.

TYPE: DictConfig

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type (should be 'classification').

TYPE: str

RETURNS DESCRIPTION
DataFrame

DataFrame with one row per unique model configuration.

Source code in src/ensemble/ensemble_utils.py
def get_best_unique_classification_models(
    best_runs: pd.DataFrame, best_metric_cfg: DictConfig, cfg: DictConfig, task: str
) -> pd.DataFrame:
    """
    Get unique best classification models, one per run name.

    Parameters
    ----------
    best_runs : pd.Series
        Series containing all MLflow classification runs.
    best_metric_cfg : DictConfig
        Configuration for metric selection.
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type (should be 'classification').

    Returns
    -------
    pd.DataFrame
        DataFrame with one row per unique model configuration.
    """
    best_unique_runs = pd.DataFrame()
    unique_run_names = best_runs["tags.mlflow.runName"].unique()
    for run_name in unique_run_names:
        # if you have ran the same config multiple times, pick the best run, most likely you
        # just have one copy per run_name
        runs: pd.DataFrame = best_runs[best_runs["tags.mlflow.runName"] == run_name]
        # the same logic for classification and imputation:
        best_run = get_best_imputation_model_per_run_name(runs, best_metric_cfg)
        best_unique_runs = pd.concat([best_unique_runs, best_run])

    return best_unique_runs

drop_embedding_cls_runs

drop_embedding_cls_runs(
    best_runs_out: DataFrame,
) -> DataFrame

Filter out classification runs using embedding features.

PARAMETER DESCRIPTION
best_runs_out

DataFrame containing classification runs.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

Runs without 'embedding' in the run name.

Source code in src/ensemble/ensemble_utils.py
def drop_embedding_cls_runs(best_runs_out: pd.DataFrame) -> pd.DataFrame:
    """
    Filter out classification runs using embedding features.

    Parameters
    ----------
    best_runs_out : pd.DataFrame
        DataFrame containing classification runs.

    Returns
    -------
    pd.DataFrame
        Runs without 'embedding' in the run name.
    """
    runs_out = pd.DataFrame()
    for idx, row in best_runs_out.iterrows():
        run_name = row["tags.mlflow.runName"]
        if "embedding" not in run_name:
            runs_out = pd.concat([runs_out, pd.DataFrame(row).T])

    return runs_out

get_list_of_good_models

get_list_of_good_models() -> list[str]

Get list of classifier models to include in ensembles.

RETURNS DESCRIPTION
list

List of classifier names considered 'good' for ensembling.

Source code in src/ensemble/ensemble_utils.py
def get_list_of_good_models() -> list[str]:
    """
    Get list of classifier models to include in ensembles.

    Returns
    -------
    list
        List of classifier names considered 'good' for ensembling.
    """
    return ["TabPFN", "TabM", "XGBOOST", "CATBOOST"]

keep_the_good_models

keep_the_good_models(best_runs_out: DataFrame) -> DataFrame

Filter to keep only runs from approved classifier list.

PARAMETER DESCRIPTION
best_runs_out

DataFrame containing classification runs.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

Runs using classifiers from the approved list.

Source code in src/ensemble/ensemble_utils.py
def keep_the_good_models(best_runs_out: pd.DataFrame) -> pd.DataFrame:
    """
    Filter to keep only runs from approved classifier list.

    Parameters
    ----------
    best_runs_out : pd.DataFrame
        DataFrame containing classification runs.

    Returns
    -------
    pd.DataFrame
        Runs using classifiers from the approved list.
    """
    good_models = get_list_of_good_models()
    runs_out = pd.DataFrame()
    for idx, row in best_runs_out.iterrows():
        run_name = row["tags.mlflow.runName"]
        if any(model in run_name for model in good_models):
            runs_out = pd.concat([runs_out, pd.DataFrame(row).T])

    return runs_out

keep_cls_runs_when_both_imputation_and_outlier_are_ensemble

keep_cls_runs_when_both_imputation_and_outlier_are_ensemble(
    best_runs_out: DataFrame,
) -> DataFrame

Filter to keep classification runs where both preprocessing steps are ensembles.

Used for full-chain ensemble evaluation where both anomaly detection and imputation were done with ensembled models.

PARAMETER DESCRIPTION
best_runs_out

DataFrame containing classification runs.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

Runs where both imputation and outlier detection used ensembles.

Source code in src/ensemble/ensemble_utils.py
def keep_cls_runs_when_both_imputation_and_outlier_are_ensemble(
    best_runs_out: pd.DataFrame,
) -> pd.DataFrame:
    """
    Filter to keep classification runs where both preprocessing steps are ensembles.

    Used for full-chain ensemble evaluation where both anomaly detection
    and imputation were done with ensembled models.

    Parameters
    ----------
    best_runs_out : pd.DataFrame
        DataFrame containing classification runs.

    Returns
    -------
    pd.DataFrame
        Runs where both imputation and outlier detection used ensembles.
    """
    runs_out = pd.DataFrame()
    for idx, row in best_runs_out.iterrows():
        run_name = row["tags.mlflow.runName"]
        fields = run_name.split("__")
        if len(fields) == 2:
            cls, imput = run_name.split("__")
            outlier = "anomaly"
        elif len(fields) == 4:
            cls, feat, imput, outlier = run_name.split("__")
        else:
            logger.error(
                "Unknown number of fields in run_name, n = {}".format(len(fields))
            )
            raise ValueError(
                "Unknown number of fields in run_name, n = {}".format(len(fields))
            )

        if "ensemble" in imput:
            if outlier == "anomaly":  # "anomaly_ensemble"
                runs_out = pd.concat([runs_out, pd.DataFrame(row).T])

    return runs_out

get_classification_runs

get_classification_runs(
    best_runs: DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
    return_best_gt: bool = True,
    gt_on: Optional[str] = "anomaly",
    return_only_ensembled_inputs: bool = False,
) -> DataFrame

Get best classification runs for ensemble creation.

Applies filters for unique models, GT usage, embedding exclusion, and approved classifiers.

PARAMETER DESCRIPTION
best_runs

DataFrame containing all classification runs.

TYPE: DataFrame

best_metric_cfg

Configuration for metric selection.

TYPE: DictConfig

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type (should be 'classification').

TYPE: str

return_best_gt

If True, return only runs using ground truth preprocessing.

TYPE: bool DEFAULT: True

gt_on

Which component should use GT ('anomaly', 'imputation', or None for both).

TYPE: str DEFAULT: 'anomaly'

return_only_ensembled_inputs

If True, only return runs where both preprocessing steps were ensembles.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
DataFrame

Filtered classification runs for ensemble.

Source code in src/ensemble/ensemble_utils.py
def get_classification_runs(
    best_runs: pd.DataFrame,
    best_metric_cfg: DictConfig,
    cfg: DictConfig,
    task: str,
    return_best_gt: bool = True,
    gt_on: Optional[str] = "anomaly",
    return_only_ensembled_inputs: bool = False,
) -> pd.DataFrame:
    """
    Get best classification runs for ensemble creation.

    Applies filters for unique models, GT usage, embedding exclusion,
    and approved classifiers.

    Parameters
    ----------
    best_runs : pd.DataFrame
        DataFrame containing all classification runs.
    best_metric_cfg : DictConfig
        Configuration for metric selection.
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type (should be 'classification').
    return_best_gt : bool, default True
        If True, return only runs using ground truth preprocessing.
    gt_on : str, default 'anomaly'
        Which component should use GT ('anomaly', 'imputation', or None for both).
    return_only_ensembled_inputs : bool, default False
        If True, only return runs where both preprocessing steps were ensembles.

    Returns
    -------
    pd.DataFrame
        Filtered classification runs for ensemble.
    """
    # Keep unique run_names
    best_unique_runs = get_best_unique_classification_models(
        best_runs, best_metric_cfg, cfg, task
    )

    # Whether to keep only GT or kick out the GT
    best_runs_out = filter_runs_for_gt(
        best_unique_runs, best_metric_cfg, cfg, task, return_best_gt, gt_on=gt_on
    )

    # Kick out embeddings
    best_runs_out = drop_embedding_cls_runs(best_runs_out)

    # Keep just the "good models", not the sanity check ones loke LogisticRegression
    best_runs_out = keep_the_good_models(best_runs_out)

    if return_only_ensembled_inputs:
        best_runs_out = keep_cls_runs_when_both_imputation_and_outlier_are_ensemble(
            best_runs_out
        )

    return best_runs_out

get_used_models_from_mlflow

get_used_models_from_mlflow(
    experiment_name: str,
    cfg: DictConfig,
    task: str = "anomaly_detection",
    exclude_ensemble: bool = True,
    return_odd_number_of_models: bool = False,
    return_best_gt: bool = False,
    return_anomaly_ensembles: bool = False,
    gt_on: str = None,
    include_all_variants: bool = False,
    return_all_runs: bool = False,
    return_only_ensembled_inputs: bool = False,
) -> Union[dict[str, Series], DataFrame]

Retrieve best models from MLflow for ensemble creation.

Main entry point for getting submodels to ensemble. Queries MLflow and applies task-specific filtering and selection logic.

PARAMETER DESCRIPTION
experiment_name

MLflow experiment name to query.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type: 'anomaly_detection', 'imputation', or 'classification'.

TYPE: str DEFAULT: 'anomaly_detection'

exclude_ensemble

If True, exclude existing ensemble runs from results.

TYPE: bool DEFAULT: True

return_odd_number_of_models

If True, ensure odd number of models (for majority voting).

TYPE: bool DEFAULT: False

return_best_gt

If True, return only runs using ground truth.

TYPE: bool DEFAULT: False

return_anomaly_ensembles

If True, return imputation runs that used anomaly ensembles.

TYPE: bool DEFAULT: False

gt_on

For classification, which component uses GT.

TYPE: str DEFAULT: None

include_all_variants

If True, include all model variants.

TYPE: bool DEFAULT: False

return_all_runs

If True, return all runs without filtering.

TYPE: bool DEFAULT: False

return_only_ensembled_inputs

If True, only return runs with ensembled preprocessing.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict or DataFrame

Dictionary mapping model names to run data, or DataFrame if return_all_runs.

Source code in src/ensemble/ensemble_utils.py
def get_used_models_from_mlflow(
    experiment_name: str,
    cfg: DictConfig,
    task: str = "anomaly_detection",
    exclude_ensemble: bool = True,
    return_odd_number_of_models: bool = False,
    return_best_gt: bool = False,
    return_anomaly_ensembles: bool = False,
    gt_on: str = None,
    include_all_variants: bool = False,
    return_all_runs: bool = False,
    return_only_ensembled_inputs: bool = False,
) -> Union[dict[str, pd.Series], pd.DataFrame]:
    """
    Retrieve best models from MLflow for ensemble creation.

    Main entry point for getting submodels to ensemble. Queries MLflow
    and applies task-specific filtering and selection logic.

    Parameters
    ----------
    experiment_name : str
        MLflow experiment name to query.
    cfg : DictConfig
        Main Hydra configuration.
    task : str, default 'anomaly_detection'
        Task type: 'anomaly_detection', 'imputation', or 'classification'.
    exclude_ensemble : bool, default True
        If True, exclude existing ensemble runs from results.
    return_odd_number_of_models : bool, default False
        If True, ensure odd number of models (for majority voting).
    return_best_gt : bool, default False
        If True, return only runs using ground truth.
    return_anomaly_ensembles : bool, default False
        If True, return imputation runs that used anomaly ensembles.
    gt_on : str, optional
        For classification, which component uses GT.
    include_all_variants : bool, default False
        If True, include all model variants.
    return_all_runs : bool, default False
        If True, return all runs without filtering.
    return_only_ensembled_inputs : bool, default False
        If True, only return runs with ensembled preprocessing.

    Returns
    -------
    dict or pd.DataFrame
        Dictionary mapping model names to run data, or DataFrame if return_all_runs.
    """
    if task == "anomaly_detection":
        best_metric_cfg = cfg["OUTLIER_DETECTION"]["best_metric"]
    elif task == "imputation":
        best_metric_cfg = cfg["IMPUTATION_METRICS"]["best_metric"]
    elif task == "classification":
        best_metric_cfg = cfg["CLASSIFICATION_SETTINGS"]["BEST_METRIC"]
    else:
        logger.error(f"Task {task} not implemented yet")
        raise NotImplementedError(f"Task {task} not implemented yet")

    # best_runs contain all the hyperparameter combinations
    best_runs: pd.Series = mlflow.search_runs(experiment_names=[experiment_name])
    if not return_all_runs:
        if best_runs.shape[0] > 0:
            if exclude_ensemble:
                if task == "anomaly_detection":
                    best_runs = exclude_ensembles_from_mlflow_runs(best_runs)
                elif task == "imputation":
                    best_runs = exclude_imputation_ensembles_from_mlflow_runs(best_runs)
                    if return_anomaly_ensembles:
                        best_runs = keep_only_imputations_from_anomaly_ensembles(
                            best_runs
                        )
                    else:
                        best_runs = exclude_ensembles_from_mlflow_runs(best_runs)
            else:
                logger.info(
                    "Including runs with 'ensemble' in the name (i.e. when you want to featurize)"
                )
            if task == "anomaly_detection":
                best_unique_models = get_anomaly_runs(
                    best_runs,
                    best_metric_cfg,
                    cfg,
                    task,
                    return_odd_number_of_models,
                    include_all_variants=include_all_variants,
                )

            elif task == "imputation":
                # Get two types of ensemble:
                # 1) Imputation models trained on the gt (sets baseline for ensemble performance)
                # return_best_gt=True
                # 2) All possible imputation models excluding the gt (this is the "real-world" scenario as you might
                #    not have the gt available)
                # return_best_gt=False
                best_unique_models = get_imputation_runs(
                    best_runs, best_metric_cfg, cfg, task, return_best_gt
                )
            elif task == "classification":
                best_unique_models = get_classification_runs(
                    best_runs,
                    best_metric_cfg,
                    cfg,
                    task,
                    return_best_gt,
                    gt_on=gt_on,
                    return_only_ensembled_inputs=return_only_ensembled_inputs,
                )
            else:
                logger.error(f"Task {task} not implemented yet")
                raise NotImplementedError(f"Task {task} not implemented yet")

            return best_unique_models

        else:
            logger.warning(
                "Did not find previous runs, experiment name = {}".format(
                    experiment_name
                )
            )
            return {}
    else:
        logger.info("Returning all runs")
        if exclude_ensemble:
            best_runs = exclude_imputation_ensembles_from_mlflow_runs(best_runs)
        return best_runs

ensemble_the_imputation_output_dicts

ensemble_the_imputation_output_dicts(
    results_per_model: dict,
    ensembled_outputs: dict,
    i: int,
    submodel: str,
) -> dict

Aggregate imputation outputs from multiple submodels.

Stacks imputation arrays from each submodel into a 4D array (subjects x timepoints x features x submodels) for later ensemble statistics.

PARAMETER DESCRIPTION
results_per_model

Imputation results from a single submodel.

TYPE: dict

ensembled_outputs

Accumulated ensemble outputs (modified in place).

TYPE: dict

i

Index of current submodel.

TYPE: int

submodel

Name of current submodel.

TYPE: str

RETURNS DESCRIPTION
dict

Updated ensembled_outputs with new submodel added.

Source code in src/ensemble/ensemble_utils.py
def ensemble_the_imputation_output_dicts(
    results_per_model: dict, ensembled_outputs: dict, i: int, submodel: str
) -> dict:
    """
    Aggregate imputation outputs from multiple submodels.

    Stacks imputation arrays from each submodel into a 4D array
    (subjects x timepoints x features x submodels) for later ensemble statistics.

    Parameters
    ----------
    results_per_model : dict
        Imputation results from a single submodel.
    ensembled_outputs : dict
        Accumulated ensemble outputs (modified in place).
    i : int
        Index of current submodel.
    submodel : str
        Name of current submodel.

    Returns
    -------
    dict
        Updated ensembled_outputs with new submodel added.
    """
    if len(ensembled_outputs) == 0:
        ensembled_outputs = results_per_model

    for split in results_per_model["imputation"].keys():
        for split_key in results_per_model["imputation"][split].keys():
            # You are adding input (3d array, no_subjects, no_timepoints, no_features)
            # to the output (4d array, 4th dimension contains the submodel)
            input_array = results_per_model["imputation"][split][split_key][
                "imputation_dict"
            ]["imputation"]["mean"]
            output_array = ensembled_outputs["imputation"][split][split_key][
                "imputation_dict"
            ]["imputation"]["mean"]
            # TODO! CIpos/CIneg

            input_array = input_array[
                :, :, :, None
            ]  # add 4th dimension for the first submodel
            if len(output_array.shape) == 3:
                output_array = input_array  # for first submodel these are the same, or 0th 3rd axis
                ensembled_outputs["imputation"][split][split_key]["imputation_dict"][
                    "imputation"
                ]["mean"] = output_array

            elif len(output_array.shape) == 4:
                assert input_array.shape[0] == output_array.shape[0], (
                    "Number of shapes do not match"
                )
                output_array = np.concatenate((output_array, input_array), axis=3)
                ensembled_outputs["imputation"][split][split_key]["imputation_dict"][
                    "imputation"
                ]["mean"] = output_array
                logger.debug("Ensemble output shape: {}".format(output_array.shape))
            else:
                logger.error(f"Input array shape is not 4d, but {input_array.shape}")
                raise ValueError(
                    f"Input array shape is not 4d, but {input_array.shape}"
                )

    # Drop unwanted keys
    ensembled_outputs.pop("train", None)
    ensembled_outputs.pop("timing", None)
    ensembled_outputs.pop("model_info", None)
    ensembled_outputs.pop("mlflow", None)

    return ensembled_outputs

compute_ensemble_stats

compute_ensemble_stats(
    ensembled_outputs: dict,
    ensemble_name: str,
    n: int,
    cfg: DictConfig,
) -> dict

Compute ensemble statistics (mean, std, CI) from stacked submodel outputs.

PARAMETER DESCRIPTION
ensembled_outputs

Dictionary with 4D arrays from stacked submodel predictions.

TYPE: dict

ensemble_name

Name of the ensemble.

TYPE: str

n

Number of submodels.

TYPE: int

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Ensemble outputs with computed statistics (mean, std, CI).

Source code in src/ensemble/ensemble_utils.py
def compute_ensemble_stats(
    ensembled_outputs: dict, ensemble_name: str, n: int, cfg: DictConfig
) -> dict:
    """
    Compute ensemble statistics (mean, std, CI) from stacked submodel outputs.

    Parameters
    ----------
    ensembled_outputs : dict
        Dictionary with 4D arrays from stacked submodel predictions.
    ensemble_name : str
        Name of the ensemble.
    n : int
        Number of submodels.
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    dict
        Ensemble outputs with computed statistics (mean, std, CI).
    """

    def compute_numpy_array_stats(input_array: np.ndarray):
        assert len(input_array.shape) == 4, "Input array is not 4d"
        dict_out = {}
        dict_out["mean"] = np.nanmean(input_array, axis=3)
        assert len(dict_out["mean"].shape) == 3, "Mean array is not 3d"
        dict_out["std"] = np.nanstd(input_array, axis=3)
        dict_out["n"] = input_array.shape[3]
        assert dict_out["n"] == n, (
            "Number of submodels used for stats does not match the number of submodel names"
        )
        # assuming this is now normally distributed, you could test this as well? TODO!
        dict_out["CI"] = 1.96 * dict_out["std"] / np.sqrt(input_array.shape[3])
        return dict_out

    ensembled_output = ensembled_outputs
    for split in ensembled_outputs["imputation"].keys():
        if split not in ensembled_output["imputation"]:
            ensembled_output["imputation"][split] = {}
        for split_key in ensembled_outputs["imputation"][split].keys():
            if split_key not in ensembled_output["imputation"][split]:
                ensembled_output["imputation"][split][split_key] = {}
            input_array = ensembled_outputs["imputation"][split][split_key][
                "imputation_dict"
            ]["imputation"]["mean"]
            output_dict = compute_numpy_array_stats(input_array=input_array)
            ensembled_output["imputation"][split][split_key]["imputation_dict"][
                "imputation"
            ] = output_dict

    return ensembled_output

ensemble_the_imputation_results

ensemble_the_imputation_results(
    ensemble_name: str,
    mlflow_ensemble: dict[str, Series],
    cfg: DictConfig,
) -> dict

Create ensemble from multiple imputation model outputs.

Loads imputation results from each submodel, stacks predictions, and computes ensemble statistics.

PARAMETER DESCRIPTION
ensemble_name

Name for the ensemble.

TYPE: str

mlflow_ensemble

Dictionary mapping submodel names to their MLflow run data.

TYPE: dict

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Ensembled imputation output with statistics and metadata.

Source code in src/ensemble/ensemble_utils.py
def ensemble_the_imputation_results(
    ensemble_name: str, mlflow_ensemble: dict[str, pd.Series], cfg: DictConfig
) -> dict:
    """
    Create ensemble from multiple imputation model outputs.

    Loads imputation results from each submodel, stacks predictions,
    and computes ensemble statistics.

    Parameters
    ----------
    ensemble_name : str
        Name for the ensemble.
    mlflow_ensemble : dict
        Dictionary mapping submodel names to their MLflow run data.
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    dict
        Ensembled imputation output with statistics and metadata.
    """
    ensembled_outputs = {}
    submodel_run_names = []

    for i, submodel in enumerate(mlflow_ensemble.keys()):
        logger.info(
            f"Getting the results of the model: {submodel} (#{i + 1}/{len(mlflow_ensemble.keys())})"
        )
        results_per_model = get_imputation_results_from_mlflow(
            mlflow_run=mlflow_ensemble[submodel], model_name=submodel, cfg=cfg
        )

        # quick'n'dirty save of the submodel run
        submodel_run_names.append(results_per_model["mlflow"]["run_info"]["run_name"])

        ensembled_outputs = ensemble_the_imputation_output_dicts(
            results_per_model=results_per_model,
            ensembled_outputs=ensembled_outputs,
            i=i,
            submodel=submodel,
        )
        # average the metrics from submodels, not metrics from averaged imputations! TODO!
        # ensembled_outputs['averaged_metrics']

    # Add some params (as in MLflow params)
    ensembled_outputs["params"] = {
        "model": ensemble_name,
        "submodels": submodel_run_names,
    }

    # You have now 4d Numpy arrays that you can compute whatever stats you like from all the submodels
    ensembled_output = compute_ensemble_stats(
        ensembled_outputs, ensemble_name, n=len(mlflow_ensemble.keys()), cfg=cfg
    )

    # Add the MLflow run info for the submodels
    ensembled_output["mlflow_ensemble"] = mlflow_ensemble
    # Results of how many submodels were used for computing the stats, should be as many as oyu had submodel names
    n_out = ensembled_outputs["imputation"]["train"]["gt"]["imputation_dict"][
        "imputation"
    ]["n"]
    assert len(submodel_run_names) == n_out, (
        f"It seems that you computed stats from {n_out} even though you had "
        f"{len(submodel_run_names)} submodels"
    )

    return ensembled_output

get_ensemble_permutations

get_ensemble_permutations(
    best_unique_models: dict[str, Series],
    _ensemble_cfg: DictConfig,
    cfg: DictConfig,
) -> dict[str, dict[str, Series]]

Generate ensemble configurations from available submodels.

Currently creates a single ensemble using all available models. Placeholder for future permutation logic.

PARAMETER DESCRIPTION
best_unique_models

Dictionary of available submodels.

TYPE: dict

_ensemble_cfg

Ensemble-specific configuration (currently unused).

TYPE: DictConfig

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Dictionary mapping ensemble names to their submodel dictionaries.

Source code in src/ensemble/ensemble_utils.py
def get_ensemble_permutations(
    best_unique_models: dict[str, pd.Series], _ensemble_cfg: DictConfig, cfg: DictConfig
) -> dict[str, dict[str, pd.Series]]:
    """
    Generate ensemble configurations from available submodels.

    Currently creates a single ensemble using all available models.
    Placeholder for future permutation logic.

    Parameters
    ----------
    best_unique_models : dict
        Dictionary of available submodels.
    _ensemble_cfg : DictConfig
        Ensemble-specific configuration (currently unused).
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    dict
        Dictionary mapping ensemble names to their submodel dictionaries.
    """

    def get_ensemble_name(submodel_names, delimiter="-"):
        name_out = f"ensemble{delimiter}"
        for i, name in enumerate(submodel_names):
            if i > 0:
                name_out += delimiter + name
            else:
                name_out += name
        return name_out

    # placeholder now, when you start to have models. Like 10 models and you want to create all possible ensembles
    # with 5 submodels, return some indices here TODO!
    submodel_names = sorted(list(best_unique_models.keys()))
    ensembles = {}
    ensemble_names = [get_ensemble_name(submodel_names)]

    # TODO! placeholder now
    for name in ensemble_names:
        ensembles[name] = best_unique_models

    return ensembles

get_imputation_results_from_for_ensembling

get_imputation_results_from_for_ensembling(
    experiment_name: str, cfg: DictConfig
) -> dict[str, dict]

Get imputation results and create ensembles from MLflow experiment.

High-level function that retrieves best submodels and creates imputation ensemble(s).

PARAMETER DESCRIPTION
experiment_name

MLflow experiment name.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Dictionary mapping ensemble names to their ensembled outputs. Empty dict if insufficient models for ensembling.

Source code in src/ensemble/ensemble_utils.py
def get_imputation_results_from_for_ensembling(
    experiment_name: str, cfg: DictConfig
) -> dict[str, dict]:
    """
    Get imputation results and create ensembles from MLflow experiment.

    High-level function that retrieves best submodels and creates
    imputation ensemble(s).

    Parameters
    ----------
    experiment_name : str
        MLflow experiment name.
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    dict
        Dictionary mapping ensemble names to their ensembled outputs.
        Empty dict if insufficient models for ensembling.
    """
    # Get the best hyperparameter combination of each model architecture
    best_unique_models = get_used_models_from_mlflow(experiment_name, cfg)

    if len(best_unique_models) == 0:
        logger.warning("No models found for ensembling, returning empty dict")
        return {}
    elif len(best_unique_models) == 1:
        logger.warning("Only one model found, no need for ensembling")
        return {}
    else:
        # Define the permutations of the (sub)models for the ensemble
        mlflow_ensembles = get_ensemble_permutations(
            best_unique_models=best_unique_models,
            ensemble_cfg=cfg["IMPUTATION_ENSEMBLING"],
            cfg=cfg,
        )

        # Get the imputation outputs (forward passes) of the submodels and average the responses, so you can
        # compute the ensemble metrics, and use downstream for PLR featurization and later for classification
        ensembled_output = {}
        for ensemble_name in mlflow_ensembles.keys():
            if if_recreate_ensemble(ensemble_name, experiment_name, cfg):
                ensembled_output[ensemble_name] = ensemble_the_imputation_results(
                    ensemble_name=ensemble_name,
                    mlflow_ensemble=mlflow_ensembles[ensemble_name],
                    cfg=cfg,
                )
            else:
                logger.info(
                    f"Ens model {ensemble_name} already exists, skipping creation"
                )

    return ensembled_output

get_gt_imputation_labels

get_gt_imputation_labels(
    sources: dict,
) -> dict[str, ndarray]

Extract ground truth imputation masks from source data.

PARAMETER DESCRIPTION
sources

Dictionary containing 'pupil_gt' with ground truth data.

TYPE: dict

RETURNS DESCRIPTION
dict

Dictionary with train/test split imputation masks as int arrays.

Source code in src/ensemble/ensemble_utils.py
def get_gt_imputation_labels(sources: dict) -> dict[str, np.ndarray]:
    """
    Extract ground truth imputation masks from source data.

    Parameters
    ----------
    sources : dict
        Dictionary containing 'pupil_gt' with ground truth data.

    Returns
    -------
    dict
        Dictionary with train/test split imputation masks as int arrays.
    """
    labels = {}
    df = sources["pupil_gt"]["df"]
    for split in df.keys():
        labels[split] = df[split]["labels"]["imputation_mask"].astype(int)
    return labels

get_metadata_dict_from_sources

get_metadata_dict_from_sources(sources: dict) -> dict

Extract metadata dictionary from source data.

PARAMETER DESCRIPTION
sources

Dictionary containing data sources.

TYPE: dict

RETURNS DESCRIPTION
dict

Dictionary with train/test split metadata.

Source code in src/ensemble/ensemble_utils.py
def get_metadata_dict_from_sources(sources: dict) -> dict:
    """
    Extract metadata dictionary from source data.

    Parameters
    ----------
    sources : dict
        Dictionary containing data sources.

    Returns
    -------
    dict
        Dictionary with train/test split metadata.
    """
    first_source_key = list(sources.keys())[0]
    df = sources[first_source_key]["df"]
    metadata_dict = {}
    for split in df.keys():
        metadata_dict[split] = df[split]["metadata"]
    return metadata_dict

combine_ensembles_into_one_df

combine_ensembles_into_one_df(
    best_unique_models: dict,
) -> Optional[DataFrame]

Combine multiple ensemble DataFrames into a single DataFrame.

PARAMETER DESCRIPTION
best_unique_models

Dictionary where values may be DataFrames of model runs.

TYPE: dict

RETURNS DESCRIPTION
DataFrame or None

Combined DataFrame of all models, or None if empty.

Source code in src/ensemble/ensemble_utils.py
def combine_ensembles_into_one_df(best_unique_models: dict) -> Optional[pd.DataFrame]:
    """
    Combine multiple ensemble DataFrames into a single DataFrame.

    Parameters
    ----------
    best_unique_models : dict
        Dictionary where values may be DataFrames of model runs.

    Returns
    -------
    pd.DataFrame or None
        Combined DataFrame of all models, or None if empty.
    """
    df_out = pd.DataFrame()
    for ensemble_name in best_unique_models.keys():
        if isinstance(best_unique_models[ensemble_name], pd.DataFrame):
            df_out = pd.concat([df_out, best_unique_models[ensemble_name]], axis=0)
    if df_out.empty:
        return None
    else:
        return df_out

aggregate_codes

aggregate_codes(df: DataFrame) -> dict[str, DataFrame]

Aggregate subject codes from multiple MLflow runs.

Extracts train/test subject codes from each run to verify all models used same data splits.

PARAMETER DESCRIPTION
df

DataFrame of MLflow runs with 'params.codes_train' and 'params.codes_test'.

TYPE: DataFrame

RETURNS DESCRIPTION
dict

Dictionary with 'train' and 'test' DataFrames of subject codes.

Source code in src/ensemble/ensemble_utils.py
def aggregate_codes(df: pd.DataFrame) -> dict[str, pd.DataFrame]:
    """
    Aggregate subject codes from multiple MLflow runs.

    Extracts train/test subject codes from each run to verify
    all models used same data splits.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame of MLflow runs with 'params.codes_train' and 'params.codes_test'.

    Returns
    -------
    dict
        Dictionary with 'train' and 'test' DataFrames of subject codes.
    """
    codes = {"train": None, "test": None}
    cols = []

    for idx, row in df.iterrows():
        if row["params.codes_train"] is not None:
            train_codes = row["params.codes_train"].split(" ")  # e.g. 145 codes
            test_codes = row["params.codes_test"].split(" ")  # e.g. 63 codes
            cols.append(row["tags.mlflow.runName"])
            if codes["test"] is None:
                codes["train"] = np.array(train_codes)[:, np.newaxis]
                codes["test"] = np.array(test_codes)[:, np.newaxis]
            else:
                codes["train"] = np.concatenate(
                    (codes["train"], np.array(train_codes)[:, np.newaxis]), axis=1
                )
                codes["test"] = np.concatenate(
                    (codes["test"], np.array(test_codes)[:, np.newaxis]), axis=1
                )
        else:
            logger.warning(
                f'run_name: "{row["tags.mlflow.runName"]}" does not have codes saved'
            )

    codes["train"] = pd.DataFrame(codes["train"], columns=cols)
    codes["test"] = pd.DataFrame(codes["test"], columns=cols)

    return codes

are_codes_the_same

are_codes_the_same(df: DataFrame) -> bool

Check if all columns in DataFrame have identical values.

Used to verify all submodels were trained on same subjects.

PARAMETER DESCRIPTION
df

DataFrame where each column represents codes from a model.

TYPE: DataFrame

RETURNS DESCRIPTION
bool

True if all columns have identical values, False otherwise.

Source code in src/ensemble/ensemble_utils.py
def are_codes_the_same(df: pd.DataFrame) -> bool:
    """
    Check if all columns in DataFrame have identical values.

    Used to verify all submodels were trained on same subjects.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame where each column represents codes from a model.

    Returns
    -------
    bool
        True if all columns have identical values, False otherwise.
    """
    same_codes = df.eq(df.iloc[:, 0], axis=0)
    run_names = list(same_codes.columns)
    nonmatching_codes = same_codes.sum(axis=0) != same_codes.shape[0]
    all_submodels_have_same_codes = np.all(same_codes)
    if not all_submodels_have_same_codes:
        logger.error("All the submodels do not have the same codes")
        for i, nonmatch_code in enumerate(nonmatching_codes):
            if nonmatch_code:
                logger.error(" run_name = {}".format(run_names[i]))

    return all_submodels_have_same_codes

check_codes_used

check_codes_used(
    best_unique_models: dict,
) -> Optional[dict]

Verify all ensemble submodels were trained on the same subjects.

PARAMETER DESCRIPTION
best_unique_models

Dictionary of submodels to check.

TYPE: dict

RETURNS DESCRIPTION
dict or None

Input dictionary if checks pass, None if no valid data.

Source code in src/ensemble/ensemble_utils.py
def check_codes_used(best_unique_models: dict) -> Optional[dict]:
    """
    Verify all ensemble submodels were trained on the same subjects.

    Parameters
    ----------
    best_unique_models : dict
        Dictionary of submodels to check.

    Returns
    -------
    dict or None
        Input dictionary if checks pass, None if no valid data.
    """
    df_mlflow = combine_ensembles_into_one_df(best_unique_models)
    if df_mlflow is not None:
        codes = aggregate_codes(df_mlflow)
        for split in codes.keys():
            are_codes_the_same(df=codes[split])
        return best_unique_models
    else:
        return None

get_grouped_classification_runs

get_grouped_classification_runs(
    best_unique_models: dict,
    experiment_name: str,
    cfg: DictConfig,
    task: str,
) -> dict

Group classification runs by ground truth usage pattern.

Creates groups for: - pupil_gt: Both anomaly and imputation use GT - anomaly_gt: Only anomaly detection uses GT - ensembled_input: Both use ensemble outputs

PARAMETER DESCRIPTION
best_unique_models

Dictionary to populate with grouped runs.

TYPE: dict

experiment_name

MLflow experiment name.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type (should be 'classification').

TYPE: str

RETURNS DESCRIPTION
dict

Dictionary with runs grouped by GT usage pattern.

Source code in src/ensemble/ensemble_utils.py
def get_grouped_classification_runs(
    best_unique_models: dict, experiment_name: str, cfg: DictConfig, task: str
) -> dict:
    """
    Group classification runs by ground truth usage pattern.

    Creates groups for:
    - pupil_gt: Both anomaly and imputation use GT
    - anomaly_gt: Only anomaly detection uses GT
    - ensembled_input: Both use ensemble outputs

    Parameters
    ----------
    best_unique_models : dict
        Dictionary to populate with grouped runs.
    experiment_name : str
        MLflow experiment name.
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type (should be 'classification').

    Returns
    -------
    dict
        Dictionary with runs grouped by GT usage pattern.
    """
    # When both anomaly detection and imputation come from ground truth
    best_unique_models["pupil_gt"] = get_used_models_from_mlflow(
        experiment_name, cfg, task, return_best_gt=True, gt_on=None
    )

    # Only anomaly is ground truth
    best_unique_models["anomaly_gt"] = get_used_models_from_mlflow(
        experiment_name, cfg, task, return_best_gt=True, gt_on="anomaly"
    )

    # Kinda useless this configuration in practice?
    # best_unique_models["exclude_gt"] = get_used_models_from_mlflow(
    #     experiment_name, cfg, task, return_best_gt=False
    # )

    # When both anomaly and imputation results come from ensembles
    # i.e. you have here ensembled anomaly detection, used that as input for imputation methods,
    # and then ensembled those imputation methods, and now we are then ensembling different classifiers
    # for a "full-chain" of ensembled models
    best_unique_models["ensembled_input"] = get_used_models_from_mlflow(
        experiment_name,
        cfg,
        task,
        return_best_gt=False,
        return_only_ensembled_inputs=True,
    )

    best_unique_models = check_codes_used(best_unique_models)

    return best_unique_models

get_results_from_mlflow_for_ensembling

get_results_from_mlflow_for_ensembling(
    experiment_name: str,
    cfg: DictConfig,
    task: str,
    recompute_metrics: bool = False,
) -> Optional[dict]

Get MLflow results organized for ensemble creation.

Main entry point for retrieving submodels for ensembling across all tasks. Handles task-specific logic for anomaly detection, imputation, and classification.

PARAMETER DESCRIPTION
experiment_name

MLflow experiment name.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type: 'anomaly_detection', 'imputation', or 'classification'.

TYPE: str

recompute_metrics

If True, only retrieve runs for metric recomputation.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict or None

Dictionary of grouped submodel runs, or None if no valid runs found.

Source code in src/ensemble/ensemble_utils.py
def get_results_from_mlflow_for_ensembling(
    experiment_name: str, cfg: DictConfig, task: str, recompute_metrics: bool = False
) -> Optional[dict]:
    """
    Get MLflow results organized for ensemble creation.

    Main entry point for retrieving submodels for ensembling across all tasks.
    Handles task-specific logic for anomaly detection, imputation, and classification.

    Parameters
    ----------
    experiment_name : str
        MLflow experiment name.
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type: 'anomaly_detection', 'imputation', or 'classification'.
    recompute_metrics : bool, default False
        If True, only retrieve runs for metric recomputation.

    Returns
    -------
    dict or None
        Dictionary of grouped submodel runs, or None if no valid runs found.
    """
    best_unique_models = {}
    if task == "anomaly_detection":
        # hacky way to get the granular metrics computed, note also that this does not compute granular metric
        # for all the models (non-best MOMENTs and worst model if there are even number of models)
        cfg_tmp = copy.deepcopy(cfg)
        with open_dict(cfg_tmp):
            cfg_tmp["OUTLIER_DETECTION"]["best_metric"][
                "ensemble_quality_threshold"
            ] = None

        # recompute all the metrics (and get the granular metrics that were not included in the "first pass" of
        # evaluation (during the training
        if recompute_metrics:
            best_unique_models["pupil_gt_all_variants"] = get_used_models_from_mlflow(
                experiment_name, cfg_tmp, task, include_all_variants=True
            )

        else:
            # we have only outlier mask (0 or 1), so return odd number of models to have a "winner vote" for each timepoint
            best_unique_models["pupil_gt_thresholded"] = get_used_models_from_mlflow(
                experiment_name, cfg, task, return_odd_number_of_models=True
            )

            best_unique_models["pupil_gt"] = get_used_models_from_mlflow(
                experiment_name, cfg_tmp, task, return_odd_number_of_models=True
            )

    elif task == "imputation":
        if recompute_metrics:
            # recompute the metrics
            cfg_tmp = copy.deepcopy(cfg)
            with open_dict(cfg_tmp):
                cfg_tmp["IMPUTATION_METRICS"]["best_metric"][
                    "ensemble_quality_threshold"
                ] = None
            best_unique_models["all_runs"] = get_used_models_from_mlflow(
                experiment_name, cfg_tmp, task, return_all_runs=True
            )

        else:
            best_unique_models["pupil_gt"] = get_used_models_from_mlflow(
                experiment_name, cfg, task, return_best_gt=True
            )
            best_unique_models["anomaly_ensemble"] = get_used_models_from_mlflow(
                experiment_name,
                cfg,
                task,
                return_best_gt=False,
                return_anomaly_ensembles=True,
            )
            cfg_tmp = copy.deepcopy(cfg)
            with open_dict(cfg_tmp):
                cfg_tmp["IMPUTATION_METRICS"]["best_metric"][
                    "ensemble_quality_threshold"
                ] *= cfg_tmp["IMPUTATION_METRICS"]["best_metric"][
                    "gt_exclude_multiplier"
                ]
            best_unique_models["exclude_gt"] = get_used_models_from_mlflow(
                experiment_name,
                cfg_tmp,
                task,
                return_best_gt=False,
                return_anomaly_ensembles=False,
            )

    elif task == "classification":
        best_unique_models = get_grouped_classification_runs(
            best_unique_models, experiment_name, cfg, task
        )
    else:
        logger.error(f"Task {task} not implemented yet")
        raise NotImplementedError(f"Task {task} not implemented yet")

    # Display the submodel runs of the ensemble
    if best_unique_models is not None:
        ensemble_names = list(best_unique_models.keys())
        for ensemble_name in ensemble_names:
            if best_unique_models[ensemble_name] is None:
                logger.warning(
                    f"No models found for ensemble {ensemble_name}, skipping the ensemble creation (pop out)"
                )
                best_unique_models.pop(ensemble_name)

            else:
                logger.info(
                    f"Ensemble name {ensemble_name} has {len(best_unique_models[ensemble_name])} submodels"
                )

                if isinstance(best_unique_models[ensemble_name], pd.Series):
                    for key, runs in best_unique_models[ensemble_name].items():
                        logger.info(f" {key}: {len(runs)}")

                elif isinstance(best_unique_models[ensemble_name], pd.DataFrame):
                    for idx, row in best_unique_models[ensemble_name].iterrows():
                        logger.info(f"run_name: {row['tags.mlflow.runName']}")

                logger.info("")
    else:
        # Happens when you run your demo data for example
        logger.warning("None of the ensembles had any submodules found!")
        logger.warning("OK for the demo data running case")

    return best_unique_models

ensemble_logging

Ensemble MLflow logging module.

Provides utilities for logging ensemble results to MLflow, including metrics, artifacts, and run naming.

get_ensemble_pickle_name

get_ensemble_pickle_name(ensemble_name: str) -> str

Generate pickle filename for ensemble results.

PARAMETER DESCRIPTION
ensemble_name

Name of the ensemble.

TYPE: str

RETURNS DESCRIPTION
str

Filename in format 'ensemble_{name}_results.pickle'.

Source code in src/ensemble/ensemble_logging.py
def get_ensemble_pickle_name(ensemble_name: str) -> str:
    """
    Generate pickle filename for ensemble results.

    Parameters
    ----------
    ensemble_name : str
        Name of the ensemble.

    Returns
    -------
    str
        Filename in format 'ensemble_{name}_results.pickle'.
    """
    return f"ensemble_{ensemble_name}_results.pickle"

get_source_runs

get_source_runs(
    ensemble_mlflow_runs_per_name: Union[
        Dict[str, Dict[str, Any]], DataFrame
    ],
) -> List[str]

Extract run IDs from ensemble submodel data.

PARAMETER DESCRIPTION
ensemble_mlflow_runs_per_name

Submodel data (dict of dicts or DataFrame).

TYPE: dict or DataFrame

RETURNS DESCRIPTION
list

List of MLflow run IDs.

Source code in src/ensemble/ensemble_logging.py
def get_source_runs(
    ensemble_mlflow_runs_per_name: Union[Dict[str, Dict[str, Any]], pd.DataFrame],
) -> List[str]:
    """
    Extract run IDs from ensemble submodel data.

    Parameters
    ----------
    ensemble_mlflow_runs_per_name : dict or pd.DataFrame
        Submodel data (dict of dicts or DataFrame).

    Returns
    -------
    list
        List of MLflow run IDs.
    """
    run_ids = []
    if isinstance(ensemble_mlflow_runs_per_name, dict):
        for model_name, model_dict in ensemble_mlflow_runs_per_name.items():
            run_ids.append(model_dict["run_id"])
    elif isinstance(ensemble_mlflow_runs_per_name, pd.DataFrame):
        for idx, row in ensemble_mlflow_runs_per_name.iterrows():
            run_ids.append(row["run_id"])

    return run_ids

get_ensemble_name

get_ensemble_name(
    runs_per_name: Union[DataFrame, Dict[str, Any]],
    ensemble_name_base: str,
    ensemble_prefix_str: str,
    sort_name: str = "params.model",
) -> str

Generate ensemble run name from submodel names.

PARAMETER DESCRIPTION
runs_per_name

Submodel runs data.

TYPE: DataFrame or dict

ensemble_name_base

Base name (e.g., source like 'pupil_gt').

TYPE: str

ensemble_prefix_str

Prefix (e.g., 'ensemble' or 'ensembleThresholded').

TYPE: str

sort_name

Column/key to sort models by.

TYPE: str DEFAULT: 'params.model'

RETURNS DESCRIPTION
str

Ensemble name in format '{prefix}-{model1}-{model2}...{__{base}}'.

Source code in src/ensemble/ensemble_logging.py
def get_ensemble_name(
    runs_per_name: Union[pd.DataFrame, Dict[str, Any]],
    ensemble_name_base: str,
    ensemble_prefix_str: str,
    sort_name: str = "params.model",
) -> str:
    """
    Generate ensemble run name from submodel names.

    Parameters
    ----------
    runs_per_name : pd.DataFrame or dict
        Submodel runs data.
    ensemble_name_base : str
        Base name (e.g., source like 'pupil_gt').
    ensemble_prefix_str : str
        Prefix (e.g., 'ensemble' or 'ensembleThresholded').
    sort_name : str, default 'params.model'
        Column/key to sort models by.

    Returns
    -------
    str
        Ensemble name in format '{prefix}-{model1}-{model2}...{__{base}}'.
    """
    if isinstance(runs_per_name, pd.DataFrame):
        ensemble_name = ""
        runs_per_name = runs_per_name.sort_values(by=sort_name)
        # runs_Series = runs_per_name.iloc[0]
        model_names = runs_per_name[sort_name].unique()
        for i, name in enumerate(model_names):
            if i == len(model_names) - 1:
                ensemble_name += name
            else:
                ensemble_name += name + "-"
    elif isinstance(runs_per_name, dict):
        runs_per_name = dict(sorted(runs_per_name.items()))
        ensemble_name = "-".join([model_name for model_name in runs_per_name.keys()])
    else:
        raise ValueError("runs_per_name must be a DataFrame or a dictionary")

    # following steps expect imputer_model__data_source type of logic
    return f"{ensemble_prefix_str}-" + ensemble_name + "__" + ensemble_name_base

log_ensemble_metrics

log_ensemble_metrics(
    metrics: Dict[str, Any], task: str
) -> None

Log ensemble metrics to MLflow.

Handles different metric structures for anomaly detection and imputation.

PARAMETER DESCRIPTION
metrics

Metrics dictionary.

TYPE: dict

task

Task type ('anomaly_detection' or 'imputation').

TYPE: str

Source code in src/ensemble/ensemble_logging.py
def log_ensemble_metrics(metrics: Dict[str, Any], task: str) -> None:
    """
    Log ensemble metrics to MLflow.

    Handles different metric structures for anomaly detection and imputation.

    Parameters
    ----------
    metrics : dict
        Metrics dictionary.
    task : str
        Task type ('anomaly_detection' or 'imputation').
    """
    if task == "anomaly_detection":
        write_granular_outlier_metrics(metrics)
        # for metric, value in metrics[split].items():
        #     if value is not None:
        #         mlflow.log_metric(f"{split}/{metric}", value)
    elif task == "imputation":
        splits = metrics.keys()
        for split in splits:
            metric_dict = metrics[split]["global"]
            for metric, value in metric_dict.items():
                if value is not None:
                    try:
                        if isinstance(value, np.ndarray):
                            mlflow.log_metric(f"{split}/{metric}_lo", value[0])
                            mlflow.log_metric(f"{split}/{metric}_hi", value[1])
                        else:
                            mlflow.log_metric(f"{split}/{metric}", value)
                    except Exception as e:
                        logger.error(f"Failed to log metric {metric}: {e}")
                        raise e

log_ensemble_arrays

log_ensemble_arrays(
    pred_masks: Dict[str, Any],
    task: str,
    ensemble_name: str,
) -> None

Save and log ensemble arrays as MLflow artifact.

PARAMETER DESCRIPTION
pred_masks

Prediction data to save.

TYPE: dict

task

Task type for artifact subdirectory.

TYPE: str

ensemble_name

Name for the pickle file.

TYPE: str

Source code in src/ensemble/ensemble_logging.py
def log_ensemble_arrays(
    pred_masks: Dict[str, Any], task: str, ensemble_name: str
) -> None:
    """
    Save and log ensemble arrays as MLflow artifact.

    Parameters
    ----------
    pred_masks : dict
        Prediction data to save.
    task : str
        Task type for artifact subdirectory.
    ensemble_name : str
        Name for the pickle file.
    """
    artifact_dir = Path(get_artifacts_dir(service_name=task))
    artifact_dir.mkdir(parents=True, exist_ok=True)
    results_path = artifact_dir / get_ensemble_pickle_name(ensemble_name)
    save_results_dict(results_dict=pred_masks, results_path=str(results_path))
    if task == "anomaly_detection":
        mlflow.log_artifact(
            results_path, "outlier_detection"
        )  # TODO! pick which to call this
    elif task == "imputation":
        mlflow.log_artifact(results_path, task)
    else:
        raise NotImplementedError("Classification metrics not yet implemented")

ensemble_is_empty

ensemble_is_empty(
    ensemble_mlflow_runs: Dict[
        str, Union[Dict[str, Any], DataFrame]
    ],
    ensemble_name: str,
) -> bool

Check if ensemble has no submodels.

PARAMETER DESCRIPTION
ensemble_mlflow_runs

Dictionary of ensemble runs.

TYPE: dict

ensemble_name

Name of ensemble to check.

TYPE: str

RETURNS DESCRIPTION
bool

True if ensemble is empty.

Source code in src/ensemble/ensemble_logging.py
def ensemble_is_empty(
    ensemble_mlflow_runs: Dict[str, Union[Dict[str, Any], pd.DataFrame]],
    ensemble_name: str,
) -> bool:
    """
    Check if ensemble has no submodels.

    Parameters
    ----------
    ensemble_mlflow_runs : dict
        Dictionary of ensemble runs.
    ensemble_name : str
        Name of ensemble to check.

    Returns
    -------
    bool
        True if ensemble is empty.
    """
    if isinstance(ensemble_mlflow_runs[ensemble_name], dict):
        ensemble_is_empty = len(ensemble_mlflow_runs[ensemble_name]) == 0
    elif isinstance(ensemble_mlflow_runs[ensemble_name], pd.DataFrame):
        ensemble_is_empty = ensemble_mlflow_runs[ensemble_name].empty
    else:
        logger.error(
            "ensemble_mlflow_runs[ensemble_name] must be a DataFrame or a dictionary"
        )
        raise ValueError(
            "ensemble_mlflow_runs[ensemble_name] must be a DataFrame or a dictionary"
        )

    return ensemble_is_empty

get_sort_name

get_sort_name(task: str) -> str

Get parameter name for sorting models by task.

PARAMETER DESCRIPTION
task

Task type.

TYPE: str

RETURNS DESCRIPTION
str

MLflow parameter column name for model name.

Source code in src/ensemble/ensemble_logging.py
def get_sort_name(task: str) -> str:
    """
    Get parameter name for sorting models by task.

    Parameters
    ----------
    task : str
        Task type.

    Returns
    -------
    str
        MLflow parameter column name for model name.
    """
    if task == "classification":
        sort_name = "params.model_name"
    else:
        sort_name = "params.model"

    return sort_name

get_ensemble_quality_threshold

get_ensemble_quality_threshold(
    task: str, cfg: DictConfig
) -> Optional[float]

Get quality threshold for ensemble submodel selection.

PARAMETER DESCRIPTION
task

Task type.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
float or None

Quality threshold value, or None if not applicable.

Source code in src/ensemble/ensemble_logging.py
def get_ensemble_quality_threshold(task: str, cfg: DictConfig) -> Optional[float]:
    """
    Get quality threshold for ensemble submodel selection.

    Parameters
    ----------
    task : str
        Task type.
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    float or None
        Quality threshold value, or None if not applicable.
    """
    if task == "anomaly_detection":
        ensemble_quality_threshold = cfg["OUTLIER_DETECTION"]["best_metric"][
            "ensemble_quality_threshold"
        ]
    elif task == "imputation":
        ensemble_quality_threshold = cfg["IMPUTATION_METRICS"]["best_metric"][
            "ensemble_quality_threshold"
        ]
    else:
        ensemble_quality_threshold = None

    return ensemble_quality_threshold

get_ensemble_prefix

get_ensemble_prefix(task: str, cfg: DictConfig) -> str

Get prefix string for ensemble name based on quality thresholding.

PARAMETER DESCRIPTION
task

Task type.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
str

'ensembleThresholded' if threshold set, 'ensemble' otherwise.

Source code in src/ensemble/ensemble_logging.py
def get_ensemble_prefix(task: str, cfg: DictConfig) -> str:
    """
    Get prefix string for ensemble name based on quality thresholding.

    Parameters
    ----------
    task : str
        Task type.
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    str
        'ensembleThresholded' if threshold set, 'ensemble' otherwise.
    """
    threshold = get_ensemble_quality_threshold(task, cfg)
    if threshold is not None:
        ensemble_prefix_str = "ensembleThresholded"
    else:
        ensemble_prefix_str = "ensemble"

    return ensemble_prefix_str

get_mlflow_ensemble_name

get_mlflow_ensemble_name(
    task: str,
    ensemble_mlflow_runs: Dict[
        str, Union[Dict[str, Any], DataFrame]
    ],
    ensemble_name: str,
    cfg: DictConfig,
) -> Optional[str]

Generate full MLflow run name for ensemble.

PARAMETER DESCRIPTION
task

Task type.

TYPE: str

ensemble_mlflow_runs

Dictionary of ensemble submodel runs.

TYPE: dict

ensemble_name

Source name (e.g., 'pupil_gt').

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
str or None

Full ensemble name (e.g., 'ensemble-CatBoost-XGBoost__pupil_gt'), or None if ensemble is empty.

Source code in src/ensemble/ensemble_logging.py
def get_mlflow_ensemble_name(
    task: str,
    ensemble_mlflow_runs: Dict[str, Union[Dict[str, Any], pd.DataFrame]],
    ensemble_name: str,
    cfg: DictConfig,
) -> Optional[str]:
    """
    Generate full MLflow run name for ensemble.

    Parameters
    ----------
    task : str
        Task type.
    ensemble_mlflow_runs : dict
        Dictionary of ensemble submodel runs.
    ensemble_name : str
        Source name (e.g., 'pupil_gt').
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    str or None
        Full ensemble name (e.g., 'ensemble-CatBoost-XGBoost__pupil_gt'),
        or None if ensemble is empty.
    """
    sort_name = get_sort_name(task)
    ensemble_prefix_str = get_ensemble_prefix(task, cfg)

    if not ensemble_is_empty(ensemble_mlflow_runs, ensemble_name):
        mlflow_ensemble_name = get_ensemble_name(
            runs_per_name=ensemble_mlflow_runs[ensemble_name],
            ensemble_name_base=ensemble_name,
            sort_name=sort_name,
            ensemble_prefix_str=ensemble_prefix_str,
        )
    else:
        mlflow_ensemble_name = None

    return mlflow_ensemble_name

get_existing_runs

get_existing_runs(
    experiment_name: str, mlflow_ensemble_name: str
) -> Tuple[DataFrame, bool]

Check for existing MLflow runs with same name.

PARAMETER DESCRIPTION
experiment_name

MLflow experiment name.

TYPE: str

mlflow_ensemble_name

Ensemble run name to search for.

TYPE: str

RETURNS DESCRIPTION
DataFrame

Matching runs.

bool

True if matching runs exist.

Source code in src/ensemble/ensemble_logging.py
def get_existing_runs(
    experiment_name: str, mlflow_ensemble_name: str
) -> Tuple[pd.DataFrame, bool]:
    """
    Check for existing MLflow runs with same name.

    Parameters
    ----------
    experiment_name : str
        MLflow experiment name.
    mlflow_ensemble_name : str
        Ensemble run name to search for.

    Returns
    -------
    pd.DataFrame
        Matching runs.
    bool
        True if matching runs exist.
    """
    mlflow_runs = mlflow.search_runs(experiment_names=[experiment_name])
    if mlflow_runs.shape[0] > 0:
        runs = mlflow_runs[mlflow_runs["tags.mlflow.runName"] == mlflow_ensemble_name]
        if runs.shape[0] > 0:
            old_exists = True
        else:
            old_exists = False
        return runs, old_exists

check_for_old_run

check_for_old_run(
    experiment_name: str,
    mlflow_ensemble_name: str,
    cfg: DictConfig,
    delete_old_mlflow_run: bool = True,
) -> bool

Check for and optionally delete existing ensemble runs.

PARAMETER DESCRIPTION
experiment_name

MLflow experiment name.

TYPE: str

mlflow_ensemble_name

Ensemble run name.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

delete_old_mlflow_run

If True, delete existing runs with same name.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
bool

True if logging should continue.

Source code in src/ensemble/ensemble_logging.py
def check_for_old_run(
    experiment_name: str,
    mlflow_ensemble_name: str,
    cfg: DictConfig,
    delete_old_mlflow_run: bool = True,
) -> bool:
    """
    Check for and optionally delete existing ensemble runs.

    Parameters
    ----------
    experiment_name : str
        MLflow experiment name.
    mlflow_ensemble_name : str
        Ensemble run name.
    cfg : DictConfig
        Main Hydra configuration.
    delete_old_mlflow_run : bool, default True
        If True, delete existing runs with same name.

    Returns
    -------
    bool
        True if logging should continue.
    """
    runs, old_exists = get_existing_runs(experiment_name, mlflow_ensemble_name)
    if old_exists:
        if delete_old_mlflow_run:
            for run_id in runs["run_id"]:
                logger.warning(
                    'Delete old run = "{}", id = "{}"'.format(
                        mlflow_ensemble_name, run_id
                    )
                )
                mlflow.delete_run(run_id)
        else:
            logger.warning(
                f"Run {mlflow_ensemble_name} already exists and you chose not to delete it!"
            )

    if mlflow_ensemble_name is not None:
        continue_with_logging = True
    else:
        continue_with_logging = False
    logger.warning("continue_with_logging always now True!")

    return continue_with_logging

log_ensembling_to_mlflow

log_ensembling_to_mlflow(
    experiment_name: str,
    ensemble_mlflow_runs: Dict[
        str, Union[Dict[str, Any], DataFrame]
    ],
    ensemble_name: str,
    cfg: DictConfig,
    task: str,
    metrics: Optional[Dict[str, Any]] = None,
    pred_masks: Optional[Dict[str, Any]] = None,
    output_dict: Optional[Dict[str, Any]] = None,
) -> None

Log ensemble results to MLflow.

Creates a new MLflow run for the ensemble, logs metrics, parameters, and artifacts based on task type.

PARAMETER DESCRIPTION
experiment_name

MLflow experiment name.

TYPE: str

ensemble_mlflow_runs

Dictionary of ensemble submodel runs.

TYPE: dict

ensemble_name

Source name for the ensemble.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type ('anomaly_detection', 'imputation', or 'classification').

TYPE: str

metrics

Pre-computed metrics (for anomaly detection).

TYPE: dict DEFAULT: None

pred_masks

Prediction masks (for anomaly detection).

TYPE: dict DEFAULT: None

output_dict

Full output dictionary (for imputation/classification).

TYPE: dict DEFAULT: None

Source code in src/ensemble/ensemble_logging.py
def log_ensembling_to_mlflow(
    experiment_name: str,
    ensemble_mlflow_runs: Dict[str, Union[Dict[str, Any], pd.DataFrame]],
    ensemble_name: str,
    cfg: DictConfig,
    task: str,
    metrics: Optional[Dict[str, Any]] = None,
    pred_masks: Optional[Dict[str, Any]] = None,
    output_dict: Optional[Dict[str, Any]] = None,
) -> None:
    """
    Log ensemble results to MLflow.

    Creates a new MLflow run for the ensemble, logs metrics, parameters,
    and artifacts based on task type.

    Parameters
    ----------
    experiment_name : str
        MLflow experiment name.
    ensemble_mlflow_runs : dict
        Dictionary of ensemble submodel runs.
    ensemble_name : str
        Source name for the ensemble.
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type ('anomaly_detection', 'imputation', or 'classification').
    metrics : dict, optional
        Pre-computed metrics (for anomaly detection).
    pred_masks : dict, optional
        Prediction masks (for anomaly detection).
    output_dict : dict, optional
        Full output dictionary (for imputation/classification).
    """
    # Log the ensembling results to MLflow
    run_ids = get_source_runs(
        ensemble_mlflow_runs_per_name=ensemble_mlflow_runs[ensemble_name]
    )

    if not ensemble_is_empty(ensemble_mlflow_runs, ensemble_name):
        mlflow_ensemble_name = get_mlflow_ensemble_name(
            task=task,
            ensemble_mlflow_runs=ensemble_mlflow_runs,
            ensemble_name=ensemble_name,
            cfg=cfg,
        )
        continue_with_logging = check_for_old_run(
            experiment_name, mlflow_ensemble_name, cfg
        )
        if continue_with_logging:
            with mlflow.start_run(run_name=mlflow_ensemble_name):
                mlflow.log_param("ensemble_run_ids", run_ids)
                mlflow.log_param("model", "ensemble")
                if task == "anomaly_detection":
                    dict_of_arrays_out = pred_masks
                elif task == "imputation":
                    dict_of_arrays_out = output_dict
                    metrics = dict_of_arrays_out["metrics"]
                elif task == "classification":
                    dict_of_arrays_out = output_dict
                    metrics = output_dict
                else:
                    logger.error(f"Unknown task: {task}")
                    raise ValueError(f"Unknown task: {task}")

                if task == "classification":
                    classifier_log_cls_evaluation_to_mlflow(
                        None,
                        None,
                        None,
                        metrics,
                        None,
                        None,
                        run_name=mlflow_ensemble_name,
                        model_name="ensemble",
                    )
                else:
                    log_ensemble_metrics(metrics, task)
                    log_ensemble_arrays(dict_of_arrays_out, task, mlflow_ensemble_name)
                mlflow.end_run()

    else:
        logger.warning(f"No runs found for ensemble {ensemble_name}")
        return None

tasks_ensembling

Ensemble task orchestration module.

Provides high-level functions to coordinate ensemble creation and logging across anomaly detection, imputation, and classification tasks.

check_if_for_reprocess

check_if_for_reprocess(mlflow_ensemble_name, cfg)

Check whether to reprocess an existing ensemble.

For anomaly detection and imputation, ensembles are fast to compute. For classification, may want to skip for debugging.

PARAMETER DESCRIPTION
mlflow_ensemble_name

Name of the ensemble.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
bool

True if ensemble should be (re)processed.

Source code in src/ensemble/tasks_ensembling.py
def check_if_for_reprocess(mlflow_ensemble_name, cfg):
    """
    Check whether to reprocess an existing ensemble.

    For anomaly detection and imputation, ensembles are fast to compute.
    For classification, may want to skip for debugging.

    Parameters
    ----------
    mlflow_ensemble_name : str
        Name of the ensemble.
    cfg : DictConfig
        Main Hydra configuration.

    Returns
    -------
    bool
        True if ensemble should be (re)processed.
    """
    # TODO! the logic
    reprocess = True
    logger.warning("Reprocess always now True!")
    return reprocess

get_ensembled_prediction

get_ensembled_prediction(
    ensemble_mlflow_runs: dict,
    experiment_name: str,
    cfg: DictConfig,
    task: str,
    sources: dict,
    recompute_metrics: bool = False,
)

Create ensemble predictions for all ensemble configurations.

PARAMETER DESCRIPTION
ensemble_mlflow_runs

Dictionary where keys are ensemble names (e.g., 'pupil_gt') and values are dicts/DataFrames of submodel runs.

TYPE: dict

experiment_name

MLflow experiment name.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type ('anomaly_detection', 'imputation', or 'classification').

TYPE: str

sources

Source data.

TYPE: dict

recompute_metrics

If True, only recompute submodel metrics.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict

Dictionary mapping ensemble names to their outputs.

Source code in src/ensemble/tasks_ensembling.py
def get_ensembled_prediction(
    ensemble_mlflow_runs: dict,
    experiment_name: str,
    cfg: DictConfig,
    task: str,
    sources: dict,
    recompute_metrics: bool = False,
):
    """
    Create ensemble predictions for all ensemble configurations.

    Parameters
    ----------
    ensemble_mlflow_runs : dict
        Dictionary where keys are ensemble names (e.g., 'pupil_gt') and
        values are dicts/DataFrames of submodel runs.
    experiment_name : str
        MLflow experiment name.
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type ('anomaly_detection', 'imputation', or 'classification').
    sources : dict
        Source data.
    recompute_metrics : bool, default False
        If True, only recompute submodel metrics.

    Returns
    -------
    dict
        Dictionary mapping ensemble names to their outputs.
    """
    if len(ensemble_mlflow_runs) == 0:
        logger.warning(
            "No models to ensemble, exiting the Ensembling Prefect task (task = {})".format(
                task
            )
        )
        return
    else:
        # Compute the metrics for the ensemble(s)
        logger.info("Computing metrics for the ensemble(s)")
        ensemble_output = {}
        for ensemble_name in ensemble_mlflow_runs.keys():
            mlflow_ensemble_name = get_mlflow_ensemble_name(
                task=task,
                ensemble_mlflow_runs=ensemble_mlflow_runs,
                ensemble_name=ensemble_name,
                cfg=cfg,
            )
            reprocess = check_if_for_reprocess(mlflow_ensemble_name, cfg)
            if reprocess:
                ensemble_output[ensemble_name] = {}
                if task == "anomaly_detection":
                    metrics, pred_masks = ensemble_anomaly_detection(
                        ensemble_mlflow_runs[ensemble_name],
                        cfg,
                        experiment_name=experiment_name,
                        ensemble_name=ensemble_name,
                        sources=sources,
                    )
                    ensemble_output[ensemble_name] = {
                        "metrics": metrics,
                        "pred_masks": pred_masks,
                    }
                elif task == "imputation":
                    ensemble_output[ensemble_name] = ensemble_imputation(
                        ensemble_model_runs=ensemble_mlflow_runs[ensemble_name],
                        cfg=cfg,
                        sources=sources,
                        ensemble_name=ensemble_name,
                        recompute_metrics=recompute_metrics,
                    )

                elif task == "classification":
                    ensemble_output[ensemble_name] = ensemble_classification(
                        ensemble_model_runs=ensemble_mlflow_runs[ensemble_name],
                        cfg=cfg,
                        sources=sources,
                        ensemble_name=ensemble_name,
                    )
                else:
                    logger.error(f"Unknown task: {task}")
                    raise ValueError(f"Unknown task: {task}")

    return ensemble_output

log_ensemble_to_mlflow

log_ensemble_to_mlflow(
    ensemble_mlflow_runs: dict,
    experiment_name: str,
    cfg: DictConfig,
    ensemble_output: dict,
    task: str,
    recompute_metrics: bool = False,
)

Log all ensemble outputs to MLflow.

PARAMETER DESCRIPTION
ensemble_mlflow_runs

Dictionary of ensemble submodel runs.

TYPE: dict

experiment_name

MLflow experiment name.

TYPE: str

cfg

Main Hydra configuration.

TYPE: DictConfig

ensemble_output

Dictionary mapping ensemble names to their outputs.

TYPE: dict

task

Task type.

TYPE: str

recompute_metrics

If True, metrics were recomputed (affects logging).

TYPE: bool DEFAULT: False

Source code in src/ensemble/tasks_ensembling.py
def log_ensemble_to_mlflow(
    ensemble_mlflow_runs: dict,
    experiment_name: str,
    cfg: DictConfig,
    ensemble_output: dict,
    task: str,
    recompute_metrics: bool = False,
):
    """
    Log all ensemble outputs to MLflow.

    Parameters
    ----------
    ensemble_mlflow_runs : dict
        Dictionary of ensemble submodel runs.
    experiment_name : str
        MLflow experiment name.
    cfg : DictConfig
        Main Hydra configuration.
    ensemble_output : dict
        Dictionary mapping ensemble names to their outputs.
    task : str
        Task type.
    recompute_metrics : bool, default False
        If True, metrics were recomputed (affects logging).
    """
    init_mlflow_experiment(mlflow_cfg=cfg["MLFLOW"], experiment_name=experiment_name)

    for ensemble_name, output_dict in ensemble_output.items():
        if task == "anomaly_detection":
            if "all_variants" not in ensemble_name:
                log_ensembling_to_mlflow(
                    experiment_name=experiment_name,
                    ensemble_mlflow_runs=ensemble_mlflow_runs,
                    ensemble_name=ensemble_name,
                    metrics=output_dict["metrics"],
                    pred_masks=output_dict["pred_masks"],
                    cfg=cfg,
                    task=task,
                )
        elif task == "imputation":
            log_ensembling_to_mlflow(
                experiment_name=experiment_name,
                ensemble_mlflow_runs=ensemble_mlflow_runs,
                ensemble_name=ensemble_name,
                output_dict=output_dict,
                cfg=cfg,
                task=task,
            )
        elif task == "classification":
            log_ensembling_to_mlflow(
                experiment_name=experiment_name,
                ensemble_mlflow_runs=ensemble_mlflow_runs,
                ensemble_name=ensemble_name,
                output_dict=output_dict,
                cfg=cfg,
                task=task,
            )
        else:
            logger.error(f"Unknown task: {task}")
            raise ValueError(f"Unknown task: {task}")

task_ensemble

task_ensemble(
    cfg: DictConfig,
    task: str,
    sources: dict,
    recompute_metrics: bool = False,
)

Main ensemble task: create and log ensembles for a given task.

Orchestrates the full ensembling pipeline: 1. Retrieve submodel runs from MLflow 2. Create ensemble predictions 3. Log results to MLflow

PARAMETER DESCRIPTION
cfg

Main Hydra configuration.

TYPE: DictConfig

task

Task type ('anomaly_detection', 'imputation', or 'classification').

TYPE: str

sources

Source data.

TYPE: dict

recompute_metrics

If True, only recompute submodel metrics without creating ensembles.

TYPE: bool DEFAULT: False

Source code in src/ensemble/tasks_ensembling.py
def task_ensemble(
    cfg: DictConfig, task: str, sources: dict, recompute_metrics: bool = False
):
    """
    Main ensemble task: create and log ensembles for a given task.

    Orchestrates the full ensembling pipeline:
    1. Retrieve submodel runs from MLflow
    2. Create ensemble predictions
    3. Log results to MLflow

    Parameters
    ----------
    cfg : DictConfig
        Main Hydra configuration.
    task : str
        Task type ('anomaly_detection', 'imputation', or 'classification').
    sources : dict
        Source data.
    recompute_metrics : bool, default False
        If True, only recompute submodel metrics without creating ensembles.
    """
    # Computationally independent so could be run as a flow, but seems more coherent on flow diagram
    # to be run inside the "imputation" flow
    # TODO! harmonize the naming, they could be the same without all this if/elif/else
    if task == "anomaly_detection":
        task_key = "OUTLIER_DETECTION"
    elif task == "imputation":
        task_key = "IMPUTATION"
    elif task == "classification":
        task_key = "CLASSIFICATION"
    else:
        logger.error(f"Unknown task: {task}")
        raise ValueError(f"Unknown task: {task}")

    prev_experiment_name = experiment_name_wrapper(
        experiment_name=cfg["PREFECT"]["FLOW_NAMES"][task_key], cfg=cfg
    )

    logger.info(
        "TASK (flow-like) | Name: {}".format(cfg["PREFECT"]["FLOW_NAMES"][task_key])
    )
    logger.info("=====================")

    # Ensemble from the imputation models
    ensemble_mlflow_runs = get_results_from_mlflow_for_ensembling(
        experiment_name=prev_experiment_name,
        cfg=cfg,
        task=task,
        recompute_metrics=recompute_metrics,
    )

    # Get ensemble predictions
    # 1) read artifacts from MLflow
    # 2) ensemble the predictions
    # 3) compute the metrics
    if ensemble_mlflow_runs is not None:
        ensemble_output = get_ensembled_prediction(
            ensemble_mlflow_runs=ensemble_mlflow_runs,
            experiment_name=prev_experiment_name,
            cfg=cfg,
            task=task,
            sources=sources,
            recompute_metrics=recompute_metrics,
        )

        if not recompute_metrics:
            # Finally log to MLflow
            log_ensemble_to_mlflow(
                ensemble_mlflow_runs,
                experiment_name=prev_experiment_name,
                cfg=cfg,
                ensemble_output=ensemble_output,
                task=task,
                recompute_metrics=recompute_metrics,
            )
    else:
        logger.warning(
            "Skipping ensembled prediction as no submodel sources were found"
        )