Skip to content

anomaly_detection

Outlier detection methods for PLR signals.

Overview

This module provides 15 different outlier detection methods:

  • Ground Truth: Human-annotated masks
  • Foundation Models: MOMENT, UniTS, TimesNet
  • Traditional: LOF, OneClassSVM, PROPHET
  • Ensembles: Voting combinations

Main Entry Point

flow_anomaly_detection

flow_anomaly_detection

flow_anomaly_detection(
    cfg: DictConfig, df: DataFrame
) -> DataFrame

Run the complete anomaly detection flow for PLR data.

Orchestrates outlier detection across multiple hyperparameter configurations and creates ensembles of the individual models.

PARAMETER DESCRIPTION
cfg

Hydra configuration containing model and experiment parameters.

TYPE: DictConfig

df

Input PLR data with the following columns: - 'pupil_orig': Original recording from the pupillometer. The software has rejected some clear artifacts (null), but blink artifacts remain. - 'pupil_raw': Output from anomaly detection (ground truth for modeling). All outliers are set to null for subsequent imputation. - 'gt': Ground truth for imputation containing manually-supervised imputation (manually placed points + MissForest), denoised with CEEMD. This signal lacks the high-frequency noise present in raw signal.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

The input DataFrame (currently unchanged; results logged to MLflow).

Notes

This function: 1. Runs outlier detection for each hyperparameter configuration 2. Logs results to MLflow 3. Creates ensemble models from individual detectors

Source code in src/anomaly_detection/flow_anomaly_detection.py
def flow_anomaly_detection(cfg: DictConfig, df: pl.DataFrame) -> pl.DataFrame:
    """
    Run the complete anomaly detection flow for PLR data.

    Orchestrates outlier detection across multiple hyperparameter configurations
    and creates ensembles of the individual models.

    Parameters
    ----------
    cfg : DictConfig
        Hydra configuration containing model and experiment parameters.
    df : pl.DataFrame
        Input PLR data with the following columns:
        - 'pupil_orig': Original recording from the pupillometer. The software
          has rejected some clear artifacts (null), but blink artifacts remain.
        - 'pupil_raw': Output from anomaly detection (ground truth for modeling).
          All outliers are set to null for subsequent imputation.
        - 'gt': Ground truth for imputation containing manually-supervised
          imputation (manually placed points + MissForest), denoised with CEEMD.
          This signal lacks the high-frequency noise present in raw signal.

    Returns
    -------
    pl.DataFrame
        The input DataFrame (currently unchanged; results logged to MLflow).

    Notes
    -----
    This function:
    1. Runs outlier detection for each hyperparameter configuration
    2. Logs results to MLflow
    3. Creates ensemble models from individual detectors
    """
    experiment_name = get_outlier_detection_experiment_name(cfg)
    logger.info("FLOW | Name: {}".format(experiment_name))
    logger.info("=====================")

    hyperparams_group = define_hyperparam_group(cfg, task="outlier_detection")
    for cfg_key, cfg_hyperparam in hyperparams_group.items():
        outlier_detection_PLR_workflow(
            df=df,
            cfg=cfg_hyperparam,
            experiment_name=experiment_name,
            run_name=cfg_key,
        )

    # The ensembling part will fetch the trained imputation models from MLflow
    data_dict = convert_df_to_dict(data_df=df, cfg=cfg)
    task_ensemble(
        cfg=cfg, task="anomaly_detection", sources=data_dict, recompute_metrics=True
    )
    task_ensemble(
        cfg=cfg, task="anomaly_detection", sources=data_dict, recompute_metrics=False
    )

Core Functions

anomaly_detection

outlier_detection_selector

outlier_detection_selector(
    df: DataFrame,
    cfg: DictConfig,
    experiment_name: str,
    run_name: str,
    model_name: str,
)

Select and execute outlier detection method.

Dispatches to the appropriate outlier detection implementation based on model_name. Supports foundation models (MOMENT, UniTS, TimesNet), traditional methods (LOF, OneClassSVM, SubPCA), and Prophet.

PARAMETER DESCRIPTION
df

Input PLR data with columns: pupil_raw, pupil_gt, etc.

TYPE: DataFrame

cfg

Full Hydra configuration.

TYPE: DictConfig

experiment_name

MLflow experiment name for tracking.

TYPE: str

run_name

MLflow run name.

TYPE: str

model_name

Name of outlier detection method. One of: 'MOMENT', 'TimesNet', 'UniTS', 'LOF', 'OneClassSVM', 'PROPHET', 'SigLLM'.

TYPE: str

Notes

SubPCA/EIF (TSB_AD) was archived - see archived/TSB_AD/ (not used in final paper).

RETURNS DESCRIPTION
tuple

(outlier_artifacts, model) where: - outlier_artifacts: dict with detection results and metrics - model: trained outlier detection model

RAISES DESCRIPTION
NotImplementedError

If model_name is not supported.

Source code in src/anomaly_detection/anomaly_detection.py
def outlier_detection_selector(
    df: pl.DataFrame,
    cfg: DictConfig,
    experiment_name: str,
    run_name: str,
    model_name: str,
):
    """
    Select and execute outlier detection method.

    Dispatches to the appropriate outlier detection implementation based on
    model_name. Supports foundation models (MOMENT, UniTS, TimesNet),
    traditional methods (LOF, OneClassSVM, SubPCA), and Prophet.

    Parameters
    ----------
    df : pl.DataFrame
        Input PLR data with columns: pupil_raw, pupil_gt, etc.
    cfg : DictConfig
        Full Hydra configuration.
    experiment_name : str
        MLflow experiment name for tracking.
    run_name : str
        MLflow run name.
    model_name : str
        Name of outlier detection method. One of:
        'MOMENT', 'TimesNet', 'UniTS', 'LOF', 'OneClassSVM',
        'PROPHET', 'SigLLM'.

    Notes
    -----
    SubPCA/EIF (TSB_AD) was archived - see archived/TSB_AD/ (not used in final paper).

    Returns
    -------
    tuple
        (outlier_artifacts, model) where:
        - outlier_artifacts: dict with detection results and metrics
        - model: trained outlier detection model

    Raises
    ------
    NotImplementedError
        If model_name is not supported.
    """
    # Init the MLflow experiment
    init_mlflow_experiment(experiment_name=experiment_name)

    # quick fix TODO! move somewhere before
    if run_name == "TimesNet":
        if cfg["OUTLIER_MODELS"][run_name]["MODEL"]["train_on"] == "pupil_orig_imputed":
            run_name += "-orig"

    # Init MLflow run
    init_mlflow_run(
        mlflow_cfg=cfg["MLFLOW"],
        run_name=run_name,
        cfg=cfg,
        experiment_name=experiment_name,
    )

    # Log MLflow parameters
    log_mlflow_params(
        mlflow_params=cfg["OUTLIER_MODELS"][model_name]["MODEL"],
        model_name=model_name,
        run_name=run_name,
    )

    logger.info("Outlier Detection with model {}".format(model_name))
    if model_name == "MOMENT":
        outlier_artifacts, model = momentfm_outlier_main(
            df=df,
            cfg=cfg,
            outlier_model_cfg=cfg["OUTLIER_MODELS"][model_name],
            experiment_name=experiment_name,
            run_name=run_name,
        )
    elif model_name == "TimesNet":
        outlier_artifacts, model = timesnet_outlier_wrapper(
            df=df,
            cfg=cfg,
            outlier_model_cfg=cfg["OUTLIER_MODELS"][model_name],
            experiment_name=experiment_name,
            run_name=run_name,
        )
    elif model_name == "UniTS":
        outlier_artifacts, model = units_outlier_wrapper(
            df=df,
            cfg=cfg,
            model_cfg=cfg["OUTLIER_MODELS"][model_name],
            experiment_name=experiment_name,
            run_name=run_name,
        )
    elif model_name == "LOF":
        outlier_artifacts, model = outlier_sklearn_wrapper(
            df=df,
            cfg=cfg,
            model_cfg=cfg["OUTLIER_MODELS"][model_name],
            experiment_name=experiment_name,
            run_name=run_name,
            model_name=model_name,
        )
    elif model_name == "OneClassSVM":
        outlier_artifacts, model = outlier_sklearn_wrapper(
            df=df,
            cfg=cfg,
            model_cfg=cfg["OUTLIER_MODELS"][model_name],
            experiment_name=experiment_name,
            run_name=run_name,
            model_name=model_name,
        )
    elif model_name == "PROPHET":
        outlier_artifacts, model = outlier_prophet_wrapper(
            df=df,
            cfg=cfg,
            model_cfg=cfg["OUTLIER_MODELS"][model_name],
            experiment_name=experiment_name,
            run_name=run_name,
        )
    elif model_name == "SigLLM":
        outlier_artifacts, model = outlier_sigllm_wrapper(
            df=df,
            cfg=cfg,
            model_cfg=cfg["OUTLIER_MODELS"][model_name],
            experiment_name=experiment_name,
            run_name=run_name,
        )
    # NOTE: TSB_AD archived - see archived/TSB_AD/ (not used in final paper)
    # elif model_name == "SubPCA" or model_name == "EIF":
    #     outlier_artifacts, model = outlier_tsb_ad_wrapper(
    #         df=df,
    #         cfg=cfg,
    #         model_cfg=cfg["OUTLIER_MODELS"][model_name],
    #         experiment_name=experiment_name,
    #         run_name=run_name,
    #         model_name=model_name,
    #     )
    else:
        logger.error(f"{model_name} Model not implemented yet")
        raise NotImplementedError(f"{model_name} Model not implemented yet")

    return outlier_artifacts, model

outlier_detection_PLR_workflow

outlier_detection_PLR_workflow(
    df: DataFrame,
    cfg: DictConfig,
    experiment_name: str,
    run_name: str,
) -> dict

Run the complete outlier detection workflow for PLR data.

Orchestrates the outlier detection pipeline: 1. Check if recomputation is needed (vs loading existing results) 2. Run outlier detection if needed 3. Log results to MLflow

PARAMETER DESCRIPTION
df

Input PLR data.

TYPE: DataFrame

cfg

Full Hydra configuration.

TYPE: DictConfig

experiment_name

MLflow experiment name.

TYPE: str

run_name

MLflow run name.

TYPE: str

RETURNS DESCRIPTION
dict

Outlier detection artifacts including masks and metrics.

Source code in src/anomaly_detection/anomaly_detection.py
def outlier_detection_PLR_workflow(
    df: pl.DataFrame, cfg: DictConfig, experiment_name: str, run_name: str
) -> dict:
    """
    Run the complete outlier detection workflow for PLR data.

    Orchestrates the outlier detection pipeline:
    1. Check if recomputation is needed (vs loading existing results)
    2. Run outlier detection if needed
    3. Log results to MLflow

    Parameters
    ----------
    df : pl.DataFrame
        Input PLR data.
    cfg : DictConfig
        Full Hydra configuration.
    experiment_name : str
        MLflow experiment name.
    run_name : str
        MLflow run name.

    Returns
    -------
    dict
        Outlier detection artifacts including masks and metrics.
    """
    # Set-up the workflow
    model_name = list(cfg["OUTLIER_MODELS"].keys())[0]

    # Debug: set the epochs to 1
    if cfg["EXPERIMENT"]["debug"]:
        logger.warning("Debug mode is on, training (finetuning) only for one epoch")
        cfg = debug_train_only_for_one_epoch(cfg)

    # If you wish to skip the recomputation, but no previous runs are found
    recompute_anomaly_detection = if_remote_anomaly_detection(
        try_to_recompute=cfg["OUTLIER_DETECTION"]["re_compute"],
        _anomaly_cfg=cfg["OUTLIER_DETECTION"],
        experiment_name=experiment_name,
        cfg=cfg,
    )

    if recompute_anomaly_detection:
        outlier_artifacts, _ = outlier_detection_selector(
            df=df,
            cfg=cfg,
            experiment_name=experiment_name,
            run_name=run_name,
            model_name=model_name,
        )
    else:
        logger.info(f"{run_name} was found trained already")
        # Atm nothing happens after this, but when you have some viz or something, you could read these
        read_from_mlflow = False
        if read_from_mlflow:
            logger.info("Reading Anomaly Detection results from MLflow")
            run_name = update_outlier_detection_run_name(cfg)
            outlier_artifacts, _ = get_anomaly_detection_results_from_mlflow(
                experiment_name=experiment_name,
                cfg=cfg,
                run_name=run_name,
                model_name=model_name,
            )

anomaly_utils

pick_just_one_light_vector

pick_just_one_light_vector(
    light: Series,
) -> Dict[str, ndarray]

Extract a single light vector from the dataset.

Since all subjects share the same light stimulus timing, we only need one representative light vector for analysis.

PARAMETER DESCRIPTION
light

Series containing light stimulus arrays for each color channel. Each value is a 2D array where the first dimension is subjects.

TYPE: Series

RETURNS DESCRIPTION
dict

Dictionary with the same keys as input, but with 1D arrays (single subject's light vector for each channel).

Source code in src/anomaly_detection/anomaly_utils.py
def pick_just_one_light_vector(light: pd.Series) -> Dict[str, np.ndarray]:
    """
    Extract a single light vector from the dataset.

    Since all subjects share the same light stimulus timing, we only need
    one representative light vector for analysis.

    Parameters
    ----------
    light : pd.Series
        Series containing light stimulus arrays for each color channel.
        Each value is a 2D array where the first dimension is subjects.

    Returns
    -------
    dict
        Dictionary with the same keys as input, but with 1D arrays
        (single subject's light vector for each channel).
    """
    light_out: Dict[str, np.ndarray] = {}
    for key, array in light.items():
        light_out[key] = array[0, :]

    return light_out

get_data_for_sklearn_anomaly_models

get_data_for_sklearn_anomaly_models(
    df: DataFrame, cfg: DictConfig, train_on: str
) -> Tuple[
    ndarray, ndarray, ndarray, ndarray, Dict[str, ndarray]
]

Prepare data for sklearn-based anomaly detection models.

Extracts and formats training and test data from a Polars DataFrame for use with traditional machine learning outlier detection methods.

PARAMETER DESCRIPTION
df

Input PLR data containing pupil signals and labels.

TYPE: DataFrame

cfg

Hydra configuration containing data processing parameters.

TYPE: DictConfig

train_on

Column name specifying which pupil signal to use for training (e.g., 'pupil_orig', 'pupil_raw').

TYPE: str

RETURNS DESCRIPTION
tuple

A tuple containing: - X : np.ndarray Training data array of shape (n_subjects, n_timepoints). - y : np.ndarray Training labels (outlier mask) of same shape as X. - X_test : np.ndarray Test data array. - y_test : np.ndarray Test labels (outlier mask). - light : dict Light stimulus timing vectors for each color channel.

Source code in src/anomaly_detection/anomaly_utils.py
def get_data_for_sklearn_anomaly_models(
    df: pl.DataFrame, cfg: DictConfig, train_on: str
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, Dict[str, np.ndarray]]:
    """
    Prepare data for sklearn-based anomaly detection models.

    Extracts and formats training and test data from a Polars DataFrame
    for use with traditional machine learning outlier detection methods.

    Parameters
    ----------
    df : pl.DataFrame
        Input PLR data containing pupil signals and labels.
    cfg : DictConfig
        Hydra configuration containing data processing parameters.
    train_on : str
        Column name specifying which pupil signal to use for training
        (e.g., 'pupil_orig', 'pupil_raw').

    Returns
    -------
    tuple
        A tuple containing:
        - X : np.ndarray
            Training data array of shape (n_subjects, n_timepoints).
        - y : np.ndarray
            Training labels (outlier mask) of same shape as X.
        - X_test : np.ndarray
            Test data array.
        - y_test : np.ndarray
            Test labels (outlier mask).
        - light : dict
            Light stimulus timing vectors for each color channel.
    """
    data_dict = convert_df_to_dict(data_df=df, cfg=cfg)
    X = data_dict["df"]["train"]["data"][train_on]
    y = data_dict["df"]["train"]["labels"]["outlier_mask"]
    assert X.shape == y.shape, f"X.shape: {X.shape}, y.shape: {y.shape}"

    X_test = data_dict["df"]["test"]["data"][train_on]
    y_test = data_dict["df"]["test"]["labels"]["outlier_mask"]
    assert X_test.shape == y_test.shape, (
        f"X_test.shape: {X_test.shape}, y_test.shape: {y_test.shape}"
    )

    # the timing should be the same for both train and test, and for all the subjects
    light = data_dict["df"]["train"]["light"]
    light = pick_just_one_light_vector(light)

    return X, y, X_test, y_test, light

sort_anomaly_detection_runs_ensemble

sort_anomaly_detection_runs_ensemble(
    mlflow_runs: DataFrame,
    best_metric_cfg: DictConfig,
    sort_by: str,
    task: str,
) -> Series

Sort MLflow runs for ensemble anomaly detection by specified metric.

PARAMETER DESCRIPTION
mlflow_runs

DataFrame containing MLflow run information.

TYPE: DataFrame

best_metric_cfg

Configuration specifying which metric to use and sort direction.

TYPE: DictConfig

sort_by

Sorting strategy. Currently only 'best_metric' is supported.

TYPE: str

task

Task name for metric column lookup (e.g., 'outlier_detection').

TYPE: str

RETURNS DESCRIPTION
Series

The best run according to the specified sorting criteria.

RAISES DESCRIPTION
ValueError

If sort_by is not 'best_metric' or direction is unknown.

Source code in src/anomaly_detection/anomaly_utils.py
def sort_anomaly_detection_runs_ensemble(
    mlflow_runs: pd.DataFrame, best_metric_cfg: DictConfig, sort_by: str, task: str
) -> pd.Series:
    """
    Sort MLflow runs for ensemble anomaly detection by specified metric.

    Parameters
    ----------
    mlflow_runs : pd.DataFrame
        DataFrame containing MLflow run information.
    best_metric_cfg : DictConfig
        Configuration specifying which metric to use and sort direction.
    sort_by : str
        Sorting strategy. Currently only 'best_metric' is supported.
    task : str
        Task name for metric column lookup (e.g., 'outlier_detection').

    Returns
    -------
    pd.Series
        The best run according to the specified sorting criteria.

    Raises
    ------
    ValueError
        If sort_by is not 'best_metric' or direction is unknown.
    """
    if sort_by == "best_metric":
        col_name = get_col_for_for_best_anomaly_detection_metric(best_metric_cfg, task)
        if best_metric_cfg["direction"] == "DESC":
            mlflow_runs = mlflow_runs.sort_values(by=col_name, ascending=True)
        elif best_metric_cfg["direction"] == "ASC":
            mlflow_runs = mlflow_runs.sort_values(by=col_name, ascending=True)
        else:
            logger.error(f"Unknown direction: {best_metric_cfg['direction']}")
            raise ValueError(f"Unknown direction: {best_metric_cfg['direction']}")
    else:
        logger.error(f"Unknown sort_by: {sort_by}")
        raise ValueError(f"Unknown sort_by: {sort_by}")

    # get the first run, as in the latest/best run
    return mlflow_runs.iloc[0]

sort_anomaly_detection_runs

sort_anomaly_detection_runs(
    mlflow_runs: DataFrame, best_string: str, sort_by: str
) -> Series

Sort MLflow anomaly detection runs by time or loss metric.

PARAMETER DESCRIPTION
mlflow_runs

DataFrame containing MLflow run information.

TYPE: DataFrame

best_string

Column name for the loss metric when sorting by 'best_loss'.

TYPE: str

sort_by

Sorting strategy: 'start_time' for most recent, 'best_loss' for lowest loss.

TYPE: str

RETURNS DESCRIPTION
Series

The best run according to the specified sorting criteria.

RAISES DESCRIPTION
ValueError

If sort_by is not 'start_time' or 'best_loss'.

Notes

To be combined eventually with newer sort_anomaly_detection_runs_ensemble().

Source code in src/anomaly_detection/anomaly_utils.py
def sort_anomaly_detection_runs(
    mlflow_runs: pd.DataFrame, best_string: str, sort_by: str
) -> pd.Series:
    """
    Sort MLflow anomaly detection runs by time or loss metric.

    Parameters
    ----------
    mlflow_runs : pd.DataFrame
        DataFrame containing MLflow run information.
    best_string : str
        Column name for the loss metric when sorting by 'best_loss'.
    sort_by : str
        Sorting strategy: 'start_time' for most recent, 'best_loss' for lowest loss.

    Returns
    -------
    pd.Series
        The best run according to the specified sorting criteria.

    Raises
    ------
    ValueError
        If sort_by is not 'start_time' or 'best_loss'.

    Notes
    -----
    To be combined eventually with newer sort_anomaly_detection_runs_ensemble().
    """
    # sort based on the start time
    if sort_by == "start_time":
        mlflow_runs = mlflow_runs.sort_values(by="start_time", ascending=False)
    elif sort_by == "best_loss":
        mlflow_runs = mlflow_runs.sort_values(by=best_string, ascending=True)
    else:
        logger.error(f"Unknown sort_by: {sort_by}")
        raise ValueError(f"Unknown sort_by: {sort_by}")

    # get the first run, as in the latest/best run
    return mlflow_runs.iloc[0]

get_anomaly_detection_run

get_anomaly_detection_run(
    experiment_name: str,
    cfg: DictConfig,
    sort_by: str = "start_time",
    best_string: str = "best_loss",
    best_metric_cfg: Optional[DictConfig] = None,
) -> Optional[Series]

Retrieve a previous anomaly detection run from MLflow.

Searches for existing runs matching the current configuration and returns the best one according to the specified sorting criteria.

PARAMETER DESCRIPTION
experiment_name

Name of the MLflow experiment to search.

TYPE: str

cfg

Hydra configuration for determining run name.

TYPE: DictConfig

sort_by

Sorting strategy: 'start_time' or 'best_loss'. Default is 'start_time'.

TYPE: str DEFAULT: 'start_time'

best_string

Column name for loss metric. Default is 'best_loss'.

TYPE: str DEFAULT: 'best_loss'

best_metric_cfg

Configuration for ensemble metric sorting. Default is None.

TYPE: DictConfig DEFAULT: None

RETURNS DESCRIPTION
Series or None

The matching MLflow run as a Series, or None if no matching run found.

Source code in src/anomaly_detection/anomaly_utils.py
def get_anomaly_detection_run(
    experiment_name: str,
    cfg: DictConfig,
    sort_by: str = "start_time",
    best_string: str = "best_loss",
    best_metric_cfg: Optional[DictConfig] = None,
) -> Optional[pd.Series]:
    """
    Retrieve a previous anomaly detection run from MLflow.

    Searches for existing runs matching the current configuration and returns
    the best one according to the specified sorting criteria.

    Parameters
    ----------
    experiment_name : str
        Name of the MLflow experiment to search.
    cfg : DictConfig
        Hydra configuration for determining run name.
    sort_by : str, optional
        Sorting strategy: 'start_time' or 'best_loss'. Default is 'start_time'.
    best_string : str, optional
        Column name for loss metric. Default is 'best_loss'.
    best_metric_cfg : DictConfig, optional
        Configuration for ensemble metric sorting. Default is None.

    Returns
    -------
    pd.Series or None
        The matching MLflow run as a Series, or None if no matching run found.
    """
    mlflow_runs: pd.DataFrame = mlflow.search_runs(experiment_names=[experiment_name])
    run_name = update_outlier_detection_run_name(cfg)
    if len(mlflow_runs) > 0:
        mlflow_runs_model = mlflow_runs[mlflow_runs["tags.mlflow.runName"] == run_name]
        if len(mlflow_runs_model) > 0:
            logger.info(
                "You wanted to skip anomaly detection, and previous run was found -> skipping"
            )
            if best_metric_cfg is not None:
                mlflow_run = sort_anomaly_detection_runs_ensemble(
                    mlflow_runs_model,
                    sort_by=sort_by,
                    best_metric_cfg=best_metric_cfg,
                    task="outlier_detection",
                )
            else:
                mlflow_run = sort_anomaly_detection_runs(
                    mlflow_runs_model, sort_by=sort_by, best_string=best_string
                )
            logger.debug(f"Previous run: {mlflow_run}")
            return mlflow_run
        else:
            return None
    else:
        return None

if_remote_anomaly_detection

if_remote_anomaly_detection(
    try_to_recompute: bool,
    _anomaly_cfg: DictConfig,
    experiment_name: str,
    cfg: DictConfig,
) -> bool

Determine whether to recompute anomaly detection or use cached results.

PARAMETER DESCRIPTION
try_to_recompute

If True, always recompute regardless of cached results.

TYPE: bool

_anomaly_cfg

Anomaly detection configuration (currently unused).

TYPE: DictConfig

experiment_name

MLflow experiment name to check for existing runs.

TYPE: str

cfg

Full Hydra configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
bool

True if anomaly detection should be (re)computed, False if cached results should be used.

Source code in src/anomaly_detection/anomaly_utils.py
def if_remote_anomaly_detection(
    try_to_recompute: bool,
    _anomaly_cfg: DictConfig,
    experiment_name: str,
    cfg: DictConfig,
) -> bool:
    """
    Determine whether to recompute anomaly detection or use cached results.

    Parameters
    ----------
    try_to_recompute : bool
        If True, always recompute regardless of cached results.
    _anomaly_cfg : DictConfig
        Anomaly detection configuration (currently unused).
    experiment_name : str
        MLflow experiment name to check for existing runs.
    cfg : DictConfig
        Full Hydra configuration.

    Returns
    -------
    bool
        True if anomaly detection should be (re)computed, False if cached
        results should be used.
    """
    if try_to_recompute:
        logger.info("Recomputing the anomaly detection (as you explicitly want it)")
        return True
    else:
        mlflow_run = get_anomaly_detection_run(experiment_name, cfg)
        if mlflow_run is not None:
            return False
        else:
            logger.info(
                "You wanted to skip anomaly detection, but no previous run was found -> computing"
            )
            return True

save_outlier_detection_dataframe_to_mlflow

save_outlier_detection_dataframe_to_mlflow(
    df: DataFrame,
    experiment_name: str,
    _previous_experiment_name: str,
    cfg: DictConfig,
    copy_orig_db: bool = False,
) -> None

Save outlier detection results as a DuckDB database to MLflow.

Exports the dataframe to DuckDB format and logs it as an MLflow artifact for later retrieval and analysis.

PARAMETER DESCRIPTION
df

Polars DataFrame containing outlier detection results.

TYPE: DataFrame

experiment_name

MLflow experiment name for logging.

TYPE: str

_previous_experiment_name

Name of the previous experiment (currently unused, for future reference).

TYPE: str

cfg

Hydra configuration.

TYPE: DictConfig

copy_orig_db

Whether to copy the original database. Default is False.

TYPE: bool DEFAULT: False

Notes

TODO: Not needed for outlier detection as is, but could be useful for saving results as DuckDB for easy inspection without re-running.

Source code in src/anomaly_detection/anomaly_utils.py
def save_outlier_detection_dataframe_to_mlflow(
    df: pl.DataFrame,
    experiment_name: str,
    _previous_experiment_name: str,
    cfg: DictConfig,
    copy_orig_db: bool = False,
) -> None:
    """
    Save outlier detection results as a DuckDB database to MLflow.

    Exports the dataframe to DuckDB format and logs it as an MLflow artifact
    for later retrieval and analysis.

    Parameters
    ----------
    df : pl.DataFrame
        Polars DataFrame containing outlier detection results.
    experiment_name : str
        MLflow experiment name for logging.
    _previous_experiment_name : str
        Name of the previous experiment (currently unused, for future reference).
    cfg : DictConfig
        Hydra configuration.
    copy_orig_db : bool, optional
        Whether to copy the original database. Default is False.

    Notes
    -----
    TODO: Not needed for outlier detection as is, but could be useful for
    saving results as DuckDB for easy inspection without re-running.
    """
    db_name = f"{experiment_name}_modelDummy.db"
    db_path = export_dataframe_to_duckdb(
        df=df, db_name=db_name, cfg=cfg, name="anomaly", copy_orig_db=copy_orig_db
    )
    init_mlflow_experiment(experiment_name=experiment_name)
    run_name = "Original Data"
    if not check_if_run_exists(experiment_name, run_name):
        # A bit of a temp solution, no necessarily need to exist here at all?
        with mlflow.start_run(run_name=run_name):
            try:
                mlflow.log_artifact(db_path, "data")
            except Exception as e:
                logger.error(f"Could not log the artifact: {e}")
                raise e

get_best_run

get_best_run(experiment_name: str) -> Series

Get the best (first) run from an MLflow experiment.

PARAMETER DESCRIPTION
experiment_name

Name of the MLflow experiment to search.

TYPE: str

RETURNS DESCRIPTION
Series

The first run found in the experiment.

RAISES DESCRIPTION
ValueError

If no runs are found in the experiment.

Notes

Currently picks the first run found (often the only one). Add filters if you have multiple dataset versions or different filter requirements.

Source code in src/anomaly_detection/anomaly_utils.py
def get_best_run(experiment_name: str) -> pd.Series:
    """
    Get the best (first) run from an MLflow experiment.

    Parameters
    ----------
    experiment_name : str
        Name of the MLflow experiment to search.

    Returns
    -------
    pd.Series
        The first run found in the experiment.

    Raises
    ------
    ValueError
        If no runs are found in the experiment.

    Notes
    -----
    Currently picks the first run found (often the only one). Add filters
    if you have multiple dataset versions or different filter requirements.
    """
    # Pick the first run found (often the only). Add some filters if you start
    # actually doing outlier detection, and if you have multiple dataset versions,
    # different filters to retrieve data, etc.
    best_runs: pd.Series = mlflow.search_runs(experiment_names=[experiment_name])
    if len(best_runs) == 0:
        logger.error(
            f"No (outlier detection / data import) runs found for experiment: {experiment_name}"
        )
        raise ValueError(f"No runs found for experiment: {experiment_name}")
    else:
        logger.info(f"Found {len(best_runs)} runs for experiment: {experiment_name}")
        logger.info(f"Best (data/outlier detection) run: {best_runs.loc[0, :]}")
        return best_runs.loc[0, :]

log_anomaly_model_as_mlflow_artifact

log_anomaly_model_as_mlflow_artifact(
    checkpoint_file: str, run_name: str
) -> None

Log a trained anomaly detection model to MLflow as an artifact.

PARAMETER DESCRIPTION
checkpoint_file

Path to the model checkpoint file.

TYPE: str

run_name

Name of the MLflow run (for logging purposes).

TYPE: str

RAISES DESCRIPTION
Exception

If the artifact cannot be logged to MLflow.

Notes

This can be slow for large models (e.g., 1.3GB).

Source code in src/anomaly_detection/anomaly_utils.py
def log_anomaly_model_as_mlflow_artifact(checkpoint_file: str, run_name: str) -> None:
    """
    Log a trained anomaly detection model to MLflow as an artifact.

    Parameters
    ----------
    checkpoint_file : str
        Path to the model checkpoint file.
    run_name : str
        Name of the MLflow run (for logging purposes).

    Raises
    ------
    Exception
        If the artifact cannot be logged to MLflow.

    Notes
    -----
    This can be slow for large models (e.g., 1.3GB).
    """
    logger.info("Logging Anomaly Detection model as an artifact to MLflow")
    # Note! this can be a bit slow if you need to upload 1.3G models
    try:
        mlflow.log_artifact(local_path=checkpoint_file, artifact_path="model")
    except Exception as e:
        logger.error(f"Could not log the artifact: {e}")
        raise e

print_available_artifacts

print_available_artifacts(path: str) -> None

Print available artifacts at the given path.

PARAMETER DESCRIPTION
path

Full path to an artifact file.

TYPE: str

Notes

Currently a stub function that extracts directory and filename.

Source code in src/anomaly_detection/anomaly_utils.py
def print_available_artifacts(path: str) -> None:
    """
    Print available artifacts at the given path.

    Parameters
    ----------
    path : str
        Full path to an artifact file.

    Notes
    -----
    Currently a stub function that extracts directory and filename.
    """
    dir, fname = os.path.split(path)

get_artifact

get_artifact(
    run_id: str,
    run_name: str,
    model_name: str,
    subdir: str = "outlier_detection",
) -> Optional[str]

Download an artifact from MLflow by run ID and subdirectory.

PARAMETER DESCRIPTION
run_id

MLflow run ID.

TYPE: str

run_name

Name of the MLflow run.

TYPE: str

model_name

Name of the model (used for filename generation).

TYPE: str

subdir

Artifact subdirectory: 'outlier_detection', 'model', 'imputation', 'baseline_model', or 'metrics'. Default is 'outlier_detection'.

TYPE: str DEFAULT: 'outlier_detection'

RETURNS DESCRIPTION
str or None

Local path to the downloaded artifact, or None for baseline_model with ensembled input.

RAISES DESCRIPTION
ValueError

If subdir is unknown.

Exception

If artifact cannot be downloaded.

Source code in src/anomaly_detection/anomaly_utils.py
def get_artifact(
    run_id: str, run_name: str, model_name: str, subdir: str = "outlier_detection"
) -> Optional[str]:
    """
    Download an artifact from MLflow by run ID and subdirectory.

    Parameters
    ----------
    run_id : str
        MLflow run ID.
    run_name : str
        Name of the MLflow run.
    model_name : str
        Name of the model (used for filename generation).
    subdir : str, optional
        Artifact subdirectory: 'outlier_detection', 'model', 'imputation',
        'baseline_model', or 'metrics'. Default is 'outlier_detection'.

    Returns
    -------
    str or None
        Local path to the downloaded artifact, or None for baseline_model
        with ensembled input.

    Raises
    ------
    ValueError
        If subdir is unknown.
    Exception
        If artifact cannot be downloaded.
    """
    try:
        if subdir == "outlier_detection":
            fname = get_outlier_pickle_name(model_name)
            # HACK fir this, the pickle has been named after run_name and not model_name
            if fname == "outlierDetection_UniTS.pickle":
                logger.warning('Hacky fix for "outlierDetection_UniTS.pickle"')
                fname = get_outlier_pickle_name(run_name)
            if "ensemble" in model_name:
                fname = get_ensemble_pickle_name(ensemble_name=run_name)
        elif subdir == "model":
            fname = get_torch_model_name(run_name)
        elif subdir == "imputation":
            fname = get_imputation_pickle_name(model_name)
            if "ensemble" in model_name:
                # e.g. ensemble_ensemble-CSDI-SAITS-TimesNet__exclude_gt_results.pickle
                fname = get_ensemble_pickle_name(ensemble_name=run_name)
        elif subdir == "baseline_model":
            # if you want the Classifier object
            # fname = get_model_fname(run_name, prefix="baseline")
            fname = get_cls_metrics_fname(run_name, prefix="baseline")
        elif subdir == "metrics":
            # if you want the Classifier object
            # fname = get_model_fname(run_name, prefix="baseline")
            fname = get_cls_metrics_fname(run_name)
        else:
            logger.error(f"Unknown subdir: {subdir}")
            raise ValueError(f"Unknown subdir: {subdir}")

        path = f"runs:/{run_id}/{subdir}/{fname}"

        if subdir == "baseline_model" and "ensembled_input" in run_name:
            logger.warning(
                "No baseline available for the ensembled diverse classifiers"
            )
            return None
        else:
            try:
                artifact = mlflow.artifacts.download_artifacts(artifact_uri=path)
            except Exception as e:
                logger.error(f"Could not download the artifact: {e}")
                logger.error(
                    f"run_id: {run_id}, model_name: {model_name}, path: {path}"
                )
                raise e
            return artifact
    except Exception as e:
        logger.error(f"Problem getting the artifact: {e}")
        logger.error(f"run_id: {run_id}, model_name: {model_name}, path: {path}")
        raise e

check_outlier_detection_artifact

check_outlier_detection_artifact(
    outlier_artifacts: Dict[str, Any],
) -> None

Validate the structure of outlier detection artifacts.

PARAMETER DESCRIPTION
outlier_artifacts

Dictionary containing outlier detection results with 'outlier_results' key.

TYPE: dict

RAISES DESCRIPTION
AssertionError

If the artifact structure is invalid.

Source code in src/anomaly_detection/anomaly_utils.py
def check_outlier_detection_artifact(outlier_artifacts: Dict[str, Any]) -> None:
    """
    Validate the structure of outlier detection artifacts.

    Parameters
    ----------
    outlier_artifacts : dict
        Dictionary containing outlier detection results with 'outlier_results' key.

    Raises
    ------
    AssertionError
        If the artifact structure is invalid.
    """
    first_epoch_key = list(outlier_artifacts["outlier_results"].keys())[0]
    outlier_results = outlier_artifacts["outlier_results"][first_epoch_key]
    check_outlier_results(outlier_results=outlier_results)

check_outlier_results

check_outlier_results(
    outlier_results: Dict[str, Any],
) -> None

Validate the structure of outlier results dictionary.

PARAMETER DESCRIPTION
outlier_results

Dictionary containing per-split outlier detection results.

TYPE: dict

RAISES DESCRIPTION
AssertionError

If the results structure is invalid.

Source code in src/anomaly_detection/anomaly_utils.py
def check_outlier_results(outlier_results: Dict[str, Any]) -> None:
    """
    Validate the structure of outlier results dictionary.

    Parameters
    ----------
    outlier_results : dict
        Dictionary containing per-split outlier detection results.

    Raises
    ------
    AssertionError
        If the results structure is invalid.
    """
    first_split = list(outlier_results.keys())[0]
    split_results = outlier_results[first_split]["results_dict"]["split_results"]
    check_split_results(split_results)

check_split_results

check_split_results(split_results: Dict[str, Any]) -> None

Validate consistency between flat and array representations.

Ensures that the number of samples in flattened arrays matches the total size of the original arrays.

PARAMETER DESCRIPTION
split_results

Dictionary containing 'arrays_flat' and 'arrays' with prediction results.

TYPE: dict

RAISES DESCRIPTION
AssertionError

If sample counts do not match between representations.

Source code in src/anomaly_detection/anomaly_utils.py
def check_split_results(split_results: Dict[str, Any]) -> None:
    """
    Validate consistency between flat and array representations.

    Ensures that the number of samples in flattened arrays matches the
    total size of the original arrays.

    Parameters
    ----------
    split_results : dict
        Dictionary containing 'arrays_flat' and 'arrays' with prediction results.

    Raises
    ------
    AssertionError
        If sample counts do not match between representations.
    """
    no_samples_in_flat = split_results["arrays_flat"]["trues_valid"].shape[0]
    no_samples_in_array = split_results["arrays"]["trues"].size
    # A bit bizarre issue with samples being dropped somewhere?
    assert no_samples_in_flat == no_samples_in_array, (
        f"no_samples_in_flat: {no_samples_in_flat}, no_samples_in_array: {no_samples_in_array}, should be equal"
    )

get_no_subjects_in_outlier_artifacts

get_no_subjects_in_outlier_artifacts(
    outlier_artifacts: Dict[str, Any],
) -> int

Get the number of subjects from outlier detection artifacts.

PARAMETER DESCRIPTION
outlier_artifacts

Dictionary containing outlier detection results.

TYPE: dict

RETURNS DESCRIPTION
int

Number of subjects in the training split.

Source code in src/anomaly_detection/anomaly_utils.py
def get_no_subjects_in_outlier_artifacts(outlier_artifacts: Dict[str, Any]) -> int:
    """
    Get the number of subjects from outlier detection artifacts.

    Parameters
    ----------
    outlier_artifacts : dict
        Dictionary containing outlier detection results.

    Returns
    -------
    int
        Number of subjects in the training split.
    """
    first_epoch_key = list(outlier_artifacts["outlier_results"].keys())[0]
    split_results = outlier_artifacts["outlier_results"][first_epoch_key]["train"][
        "results_dict"
    ]["split_results"]
    no_subjects = split_results["arrays"]["trues"].shape[0]
    return no_subjects

outlier_detection_artifacts_dict

outlier_detection_artifacts_dict(
    mlflow_run: Series, model_name: str, task: str
) -> Dict[str, Any]

Load outlier detection artifacts from an MLflow run.

PARAMETER DESCRIPTION
mlflow_run

MLflow run information containing run_id and run name.

TYPE: Series

model_name

Name of the outlier detection model.

TYPE: str

task

Task subdirectory for artifacts.

TYPE: str

RETURNS DESCRIPTION
dict

Loaded outlier detection artifacts dictionary.

Warnings

Logs a warning if artifact file size exceeds 2GB.

Source code in src/anomaly_detection/anomaly_utils.py
def outlier_detection_artifacts_dict(
    mlflow_run: pd.Series, model_name: str, task: str
) -> Dict[str, Any]:
    """
    Load outlier detection artifacts from an MLflow run.

    Parameters
    ----------
    mlflow_run : pd.Series
        MLflow run information containing run_id and run name.
    model_name : str
        Name of the outlier detection model.
    task : str
        Task subdirectory for artifacts.

    Returns
    -------
    dict
        Loaded outlier detection artifacts dictionary.

    Warnings
    --------
    Logs a warning if artifact file size exceeds 2GB.
    """
    run_id = mlflow_run["run_id"]
    run_name = mlflow_run["tags.mlflow.runName"]

    if model_name == "TimesNet-orig":
        model_name = "TimesNet"

    outlier_artifacts_path = get_artifact(
        run_id=run_id,
        run_name=run_name,
        model_name=model_name,
        subdir=task,
    )
    file_size_MB = os.path.getsize(outlier_artifacts_path) / (1024 * 1024)
    logger.debug("Artifact file size = {:.2f} MB".format(file_size_MB))
    if file_size_MB > 2048:
        # Obviously tune this threshold if you start using massive models
        logger.warning(
            f"File size is over 2GB ({file_size_MB / 1024:.2f} GB), is this correct? "
            f"something went wrong in the previous step?"
        )
        logger.warning(f"Artifact path: {outlier_artifacts_path}")
    outlier_artifacts = load_results_dict(outlier_artifacts_path)

    return outlier_artifacts

get_moment_model_from_mlflow_artifacts

get_moment_model_from_mlflow_artifacts(
    run_id: str,
    run_name: str,
    model: Module,
    device: str,
    cfg: DictConfig,
    task: str,
    model_name: str = "MOMENT",
) -> Module

Load a MOMENT model from MLflow artifacts.

Downloads the model checkpoint and loads it into the provided model object, verifying that the weights have changed from the initial state.

PARAMETER DESCRIPTION
run_id

MLflow run ID.

TYPE: str

run_name

Name of the MLflow run.

TYPE: str

model

Model object to load weights into.

TYPE: Module

device

Device to load the model onto ('cpu' or 'cuda').

TYPE: str

cfg

Hydra configuration.

TYPE: DictConfig

task

Task name (e.g., 'outlier_detection', 'imputation').

TYPE: str

model_name

Name of the model. Default is 'MOMENT'.

TYPE: str DEFAULT: 'MOMENT'

RETURNS DESCRIPTION
Module

Model with loaded weights.

RAISES DESCRIPTION
AssertionError

If loaded weights are identical to pretrained weights.

Source code in src/anomaly_detection/anomaly_utils.py
def get_moment_model_from_mlflow_artifacts(
    run_id: str,
    run_name: str,
    model: torch.nn.Module,
    device: str,
    cfg: DictConfig,
    task: str,
    model_name: str = "MOMENT",
) -> torch.nn.Module:
    """
    Load a MOMENT model from MLflow artifacts.

    Downloads the model checkpoint and loads it into the provided model object,
    verifying that the weights have changed from the initial state.

    Parameters
    ----------
    run_id : str
        MLflow run ID.
    run_name : str
        Name of the MLflow run.
    model : torch.nn.Module
        Model object to load weights into.
    device : str
        Device to load the model onto ('cpu' or 'cuda').
    cfg : DictConfig
        Hydra configuration.
    task : str
        Task name (e.g., 'outlier_detection', 'imputation').
    model_name : str, optional
        Name of the model. Default is 'MOMENT'.

    Returns
    -------
    torch.nn.Module
        Model with loaded weights.

    Raises
    ------
    AssertionError
        If loaded weights are identical to pretrained weights.
    """
    model_path = get_artifact(run_id, run_name, model_name, subdir="model")
    file_stats = os.stat(model_path)
    logger.info(
        f"Model size: {file_stats.st_size / (1024 * 1024):.0f} MB | {model_path}"
    )
    state_dict_in = model.state_dict().__str__()
    model = load_model_from_disk(model, model_path, device, cfg, task)
    state_dict_out = model.state_dict().__str__()
    compare_state_dicts(state_dict_out, state_dict_in, same_ok=False)
    return model

get_anomaly_detection_results_from_mlflow

get_anomaly_detection_results_from_mlflow(
    experiment_name: str,
    cfg: DictConfig,
    run_name: str,
    model_name: str,
    get_model: bool = False,
) -> Tuple[Dict[str, Any], Optional[Module]]

Retrieve anomaly detection results and optionally the model from MLflow.

PARAMETER DESCRIPTION
experiment_name

MLflow experiment name.

TYPE: str

cfg

Hydra configuration.

TYPE: DictConfig

run_name

Name of the MLflow run.

TYPE: str

model_name

Name of the outlier detection model.

TYPE: str

get_model

Whether to also load the trained model. Default is False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
tuple

A tuple containing: - outlier_artifacts : dict Loaded outlier detection artifacts. - model : torch.nn.Module or None The trained model if get_model=True, otherwise None.

RAISES DESCRIPTION
ValueError

If no matching run is found.

NotImplementedError

If get_model=True for finetuned models (not yet implemented).

Source code in src/anomaly_detection/anomaly_utils.py
def get_anomaly_detection_results_from_mlflow(
    experiment_name: str,
    cfg: DictConfig,
    run_name: str,
    model_name: str,
    get_model: bool = False,
) -> Tuple[Dict[str, Any], Optional[torch.nn.Module]]:
    """
    Retrieve anomaly detection results and optionally the model from MLflow.

    Parameters
    ----------
    experiment_name : str
        MLflow experiment name.
    cfg : DictConfig
        Hydra configuration.
    run_name : str
        Name of the MLflow run.
    model_name : str
        Name of the outlier detection model.
    get_model : bool, optional
        Whether to also load the trained model. Default is False.

    Returns
    -------
    tuple
        A tuple containing:
        - outlier_artifacts : dict
            Loaded outlier detection artifacts.
        - model : torch.nn.Module or None
            The trained model if get_model=True, otherwise None.

    Raises
    ------
    ValueError
        If no matching run is found.
    NotImplementedError
        If get_model=True for finetuned models (not yet implemented).
    """
    sort_by = "best_loss"
    mlflow_run = get_anomaly_detection_run(
        experiment_name,
        cfg,
        sort_by=sort_by,
        best_string=cfg["OUTLIER_DETECTION"][sort_by],
    )

    if len(mlflow_run) == 0:
        logger.error(f"No run found for experiment: {experiment_name}")
        raise ValueError(f"No run found for experiment: {experiment_name}")

    run_id = mlflow_run["run_id"]
    run_name = mlflow_run["tags.mlflow.runName"]
    outlier_artifacts_path = get_artifact(
        run_id, run_name, model_name, subdir="outlier_detection"
    )
    outlier_artifacts = load_results_dict(outlier_artifacts_path)

    if "finetune" in run_name:
        # Now every different model have their different way to load them (possibly)
        if get_model:
            # You don't need the model until you are doing imputation (reconstruction) with these models?
            # model_path = get_artifact(run_id, model_name, subdir="model")
            # load_model_from_disk(model, model_path, device, cfg)
            raise NotImplementedError("Loading the model is not implemented yet")
        else:
            model = None
    else:
        # e.g. when you did zero-shot, no model was saved
        model = None

    return outlier_artifacts, model

get_source_dataframe_from_mlflow

get_source_dataframe_from_mlflow(
    experiment_name: str, cfg: DictConfig
) -> DataFrame

Import anomaly detection results from MLflow as a Polars DataFrame.

Downloads the DuckDB artifact from MLflow and loads it as a DataFrame.

PARAMETER DESCRIPTION
experiment_name

MLflow experiment name to retrieve data from.

TYPE: str

cfg

Hydra configuration for data loading.

TYPE: DictConfig

RETURNS DESCRIPTION
DataFrame

Polars DataFrame containing the outlier-detected data with train and test splits.

RAISES DESCRIPTION
AssertionError

If the DataFrame does not contain exactly 2 splits (train and val).

Notes

Placeholder task for importing anomaly detection results from MLflow.

Source code in src/anomaly_detection/anomaly_utils.py
def get_source_dataframe_from_mlflow(
    experiment_name: str, cfg: DictConfig
) -> pl.DataFrame:
    """
    Import anomaly detection results from MLflow as a Polars DataFrame.

    Downloads the DuckDB artifact from MLflow and loads it as a DataFrame.

    Parameters
    ----------
    experiment_name : str
        MLflow experiment name to retrieve data from.
    cfg : DictConfig
        Hydra configuration for data loading.

    Returns
    -------
    pl.DataFrame
        Polars DataFrame containing the outlier-detected data with
        train and test splits.

    Raises
    ------
    AssertionError
        If the DataFrame does not contain exactly 2 splits (train and val).

    Notes
    -----
    Placeholder task for importing anomaly detection results from MLflow.
    """
    # Get the best run
    best_run = get_best_run(experiment_name=experiment_name)

    # get the artifact path
    artifact_uri: str = mlflow.get_run(best_run["run_id"]).info.artifact_uri

    # Get the db path (refers to downloaded path)
    db_path = get_duckdb_from_mlflow(artifact_uri=artifact_uri)

    # Load the Polars dataframe from the DuckDB
    df = load_from_duckdb_as_dataframe(db_path=db_path, cfg=cfg)
    # df = load_both_splits_from_duckdb(db_path, cfg=cfg)
    logger.info(
        "Download Outlier Detected dataframe from MLflow, pl.DataFrame shape = {}".format(
            df.shape
        )
    )
    logger.info(f"db_path = {db_path}")
    splits = list(get_unique_polars_rows(df, unique_col="split")["split"])
    assert len(splits) == 2, "You should now have train and val, but you had {}".format(
        splits
    )

    return df

Traditional Methods

outlier_sklearn

subjectwise_LOF

subjectwise_LOF(
    X_subj: ndarray, clf: LocalOutlierFactor
) -> ndarray

Apply Local Outlier Factor to a single subject's data.

PARAMETER DESCRIPTION
X_subj

Time series data for one subject, shape (n_timepoints, 1).

TYPE: ndarray

clf

Configured LOF classifier instance.

TYPE: LocalOutlierFactor

RETURNS DESCRIPTION
ndarray

Binary outlier predictions (1 = outlier, 0 = inlier).

Source code in src/anomaly_detection/outlier_sklearn.py
def subjectwise_LOF(X_subj: np.ndarray, clf: LocalOutlierFactor) -> np.ndarray:
    """
    Apply Local Outlier Factor to a single subject's data.

    Parameters
    ----------
    X_subj : np.ndarray
        Time series data for one subject, shape (n_timepoints, 1).
    clf : LocalOutlierFactor
        Configured LOF classifier instance.

    Returns
    -------
    np.ndarray
        Binary outlier predictions (1 = outlier, 0 = inlier).
    """
    out = clf.fit_predict(X_subj)
    pred = np.where(out == -1, 1, 0)
    # anomaly_scores = clf.negative_outlier_factor_
    return pred

subjectwise_HPO

subjectwise_HPO(
    X: ndarray, clf: Any, model_name: str
) -> ndarray

Apply hyperparameter-optimized outlier detection subject by subject.

PARAMETER DESCRIPTION
X

Input data array of shape (n_subjects, n_timepoints).

TYPE: ndarray

clf

Configured outlier detection classifier.

TYPE: sklearn estimator

model_name

Name of the model ('LOF' currently supported).

TYPE: str

RETURNS DESCRIPTION
ndarray

Binary outlier predictions of shape (n_subjects, n_timepoints).

RAISES DESCRIPTION
ValueError

If model_name is not supported.

Source code in src/anomaly_detection/outlier_sklearn.py
def subjectwise_HPO(X: np.ndarray, clf: Any, model_name: str) -> np.ndarray:
    """
    Apply hyperparameter-optimized outlier detection subject by subject.

    Parameters
    ----------
    X : np.ndarray
        Input data array of shape (n_subjects, n_timepoints).
    clf : sklearn estimator
        Configured outlier detection classifier.
    model_name : str
        Name of the model ('LOF' currently supported).

    Returns
    -------
    np.ndarray
        Binary outlier predictions of shape (n_subjects, n_timepoints).

    Raises
    ------
    ValueError
        If model_name is not supported.
    """
    preds = None
    no_subjects = X.shape[0]
    for subj_idx in range(no_subjects):
        X_subj = X[subj_idx, :].reshape(-1, 1)
        if model_name == "LOF":
            pred_subj = subjectwise_LOF(X_subj, clf)
        else:
            raise ValueError(f"Model {model_name} not supported")
        if preds is None:
            preds = pred_subj
        else:
            preds = np.vstack((preds, pred_subj))
    return preds

datasetwise_HPO

datasetwise_HPO(
    X: ndarray, clf: Any, model_name: str
) -> ndarray

Apply outlier detection on the entire dataset at once.

Flattens all subjects' data and fits a single model, then reshapes predictions back to original shape.

PARAMETER DESCRIPTION
X

Input data array of shape (n_subjects, n_timepoints).

TYPE: ndarray

clf

Configured outlier detection classifier.

TYPE: sklearn estimator

model_name

Name of the model ('LOF' currently supported).

TYPE: str

RETURNS DESCRIPTION
ndarray

Binary outlier predictions of shape (n_subjects, n_timepoints).

RAISES DESCRIPTION
ValueError

If model_name is not supported.

Source code in src/anomaly_detection/outlier_sklearn.py
def datasetwise_HPO(X: np.ndarray, clf: Any, model_name: str) -> np.ndarray:
    """
    Apply outlier detection on the entire dataset at once.

    Flattens all subjects' data and fits a single model, then reshapes
    predictions back to original shape.

    Parameters
    ----------
    X : np.ndarray
        Input data array of shape (n_subjects, n_timepoints).
    clf : sklearn estimator
        Configured outlier detection classifier.
    model_name : str
        Name of the model ('LOF' currently supported).

    Returns
    -------
    np.ndarray
        Binary outlier predictions of shape (n_subjects, n_timepoints).

    Raises
    ------
    ValueError
        If model_name is not supported.
    """
    X_flat = X.reshape(-1, 1)
    if model_name == "LOF":
        preds = clf.fit_predict(X_flat)
        preds = np.where(preds == -1, 1, 0)
        preds = preds.reshape(-1, X.shape[1])
    else:
        raise ValueError(f"Model {model_name} not supported")
    return preds

get_LOF

get_LOF(
    X: ndarray,
    params: Dict[str, Any],
    subjectwise: bool = True,
) -> Tuple[ndarray, LocalOutlierFactor]

Run Local Outlier Factor outlier detection.

PARAMETER DESCRIPTION
X

Input data array of shape (n_subjects, n_timepoints).

TYPE: ndarray

params

Parameters for LocalOutlierFactor (n_neighbors, contamination, etc.).

TYPE: dict

subjectwise

If True, fit LOF independently for each subject. If False, fit on all data at once. Default is True (more relevant for deployment).

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
tuple

A tuple containing: - preds : np.ndarray Binary outlier predictions of shape (n_subjects, n_timepoints). - clf : LocalOutlierFactor The configured LOF classifier instance.

Source code in src/anomaly_detection/outlier_sklearn.py
def get_LOF(
    X: np.ndarray, params: Dict[str, Any], subjectwise: bool = True
) -> Tuple[np.ndarray, LocalOutlierFactor]:
    """
    Run Local Outlier Factor outlier detection.

    Parameters
    ----------
    X : np.ndarray
        Input data array of shape (n_subjects, n_timepoints).
    params : dict
        Parameters for LocalOutlierFactor (n_neighbors, contamination, etc.).
    subjectwise : bool, optional
        If True, fit LOF independently for each subject. If False, fit on
        all data at once. Default is True (more relevant for deployment).

    Returns
    -------
    tuple
        A tuple containing:
        - preds : np.ndarray
            Binary outlier predictions of shape (n_subjects, n_timepoints).
        - clf : LocalOutlierFactor
            The configured LOF classifier instance.
    """
    clf = LocalOutlierFactor(**params)
    if subjectwise:
        # More relevant for our needs as we would like to use this in real-life for new subjects
        preds = subjectwise_HPO(X, clf, model_name="LOF")
    else:
        preds = datasetwise_HPO(X, clf, model_name="LOF")
    return preds, clf

get_outlier_y_from_data_dict

get_outlier_y_from_data_dict(
    data_dict: Dict[str, Any], label: str, split: str
) -> ndarray

Extract outlier labels from data dictionary by difficulty level.

PARAMETER DESCRIPTION
data_dict

Data dictionary containing labels for each split.

TYPE: dict

label

Difficulty level: 'all', 'granular', 'easy', or 'medium'.

TYPE: str

split

Data split: 'train' or 'test'.

TYPE: str

RETURNS DESCRIPTION
ndarray

Boolean outlier mask array.

RAISES DESCRIPTION
ValueError

If label is not a recognized difficulty level.

Source code in src/anomaly_detection/outlier_sklearn.py
def get_outlier_y_from_data_dict(
    data_dict: Dict[str, Any], label: str, split: str
) -> np.ndarray:
    """
    Extract outlier labels from data dictionary by difficulty level.

    Parameters
    ----------
    data_dict : dict
        Data dictionary containing labels for each split.
    label : str
        Difficulty level: 'all', 'granular', 'easy', or 'medium'.
    split : str
        Data split: 'train' or 'test'.

    Returns
    -------
    np.ndarray
        Boolean outlier mask array.

    Raises
    ------
    ValueError
        If label is not a recognized difficulty level.
    """
    labels = data_dict["df"][split]["labels"]
    if label == "all":
        y = labels["outlier_mask"]
    elif label == "granular":
        y = labels["outlier_mask_easy"] | labels["outlier_mask_medium"]
    elif label == "easy":
        y = labels["outlier_mask_easy"]
    elif label == "medium":
        y = labels["outlier_mask_medium"]
    else:
        logger.error(f"unknown label = {label}")
        raise ValueError(f"unknown label = {label}")
    no_of_outliers = np.sum(y)
    outlier_percentage = 100 * (no_of_outliers / y.size)
    logger.debug(f"Outlier percentage in {label} set: {outlier_percentage:.2f}%")
    return y

eval_on_all_outlier_difficulty_levels

eval_on_all_outlier_difficulty_levels(
    data_dict: Dict[str, Any], preds: ndarray, split: str
) -> Dict[str, Dict[str, Any]]

Evaluate outlier detection across all difficulty levels.

PARAMETER DESCRIPTION
data_dict

Data dictionary containing labels for each split.

TYPE: dict

preds

Binary outlier predictions.

TYPE: ndarray

split

Data split: 'train' or 'test'.

TYPE: str

RETURNS DESCRIPTION
dict

Dictionary with metrics for each difficulty level ('all', 'easy', 'medium'). Each entry contains 'scalars' (metrics) and 'arrays' (predictions, labels).

Source code in src/anomaly_detection/outlier_sklearn.py
def eval_on_all_outlier_difficulty_levels(
    data_dict: Dict[str, Any], preds: np.ndarray, split: str
) -> Dict[str, Dict[str, Any]]:
    """
    Evaluate outlier detection across all difficulty levels.

    Parameters
    ----------
    data_dict : dict
        Data dictionary containing labels for each split.
    preds : np.ndarray
        Binary outlier predictions.
    split : str
        Data split: 'train' or 'test'.

    Returns
    -------
    dict
        Dictionary with metrics for each difficulty level ('all', 'easy', 'medium').
        Each entry contains 'scalars' (metrics) and 'arrays' (predictions, labels).
    """
    # difficulties = ["all", "granular", "easy", "medium"]
    difficulties = ["all", "easy", "medium"]
    # difficulties = ["easy", "medium"]
    dict_out = {}
    for diff in difficulties:
        y = get_outlier_y_from_data_dict(data_dict, diff, split=split)
        dict_out[diff] = {
            "scalars": get_scalar_outlier_metrics(preds, y, score=None),
            "arrays": {"pred_mask": preds, "trues": y},
        }
    return dict_out

get_outlier_metrics

get_outlier_metrics(
    score: Optional[ndarray],
    preds: ndarray,
    y: ndarray,
    df: Optional[DataFrame] = None,
    cfg: Optional[DictConfig] = None,
    split: Optional[str] = None,
) -> Dict[str, Any]

Compute outlier detection metrics.

PARAMETER DESCRIPTION
score

Anomaly scores (if available).

TYPE: ndarray or None

preds

Binary outlier predictions.

TYPE: ndarray

y

Ground truth outlier labels.

TYPE: ndarray

df

Original DataFrame (for future use). Default is None.

TYPE: DataFrame DEFAULT: None

cfg

Configuration (for future use). Default is None.

TYPE: DictConfig DEFAULT: None

split

Data split name (for future use). Default is None.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
dict

Dictionary containing: - 'scalars': Scalar metrics (F1, precision, recall, etc.) - 'arrays': Prediction mask and true labels.

Source code in src/anomaly_detection/outlier_sklearn.py
def get_outlier_metrics(
    score: Optional[np.ndarray],
    preds: np.ndarray,
    y: np.ndarray,
    df: Optional[pl.DataFrame] = None,
    cfg: Optional[DictConfig] = None,
    split: Optional[str] = None,
) -> Dict[str, Any]:
    """
    Compute outlier detection metrics.

    Parameters
    ----------
    score : np.ndarray or None
        Anomaly scores (if available).
    preds : np.ndarray
        Binary outlier predictions.
    y : np.ndarray
        Ground truth outlier labels.
    df : pl.DataFrame, optional
        Original DataFrame (for future use). Default is None.
    cfg : DictConfig, optional
        Configuration (for future use). Default is None.
    split : str, optional
        Data split name (for future use). Default is None.

    Returns
    -------
    dict
        Dictionary containing:
        - 'scalars': Scalar metrics (F1, precision, recall, etc.)
        - 'arrays': Prediction mask and true labels.
    """
    dict_all = {
        "scalars": get_scalar_outlier_metrics(preds, y, score, cfg),
        "arrays": {"pred_mask": preds, "trues": y},
    }
    # data_dict = convert_df_to_dict(data_df=df, cfg=cfg)
    # dict_granular = eval_on_all_outlier_difficulty_levels(data_dict, preds, split=split)
    return dict_all

LOF_wrapper

LOF_wrapper(
    X: ndarray,
    y: ndarray,
    X_test: ndarray,
    y_test: ndarray,
    best_params: Dict[str, Any],
) -> Dict[str, Dict[str, Any]]

Run LOF outlier detection on train and test splits.

PARAMETER DESCRIPTION
X

Training data of shape (n_subjects, n_timepoints).

TYPE: ndarray

y

Training labels (outlier mask).

TYPE: ndarray

X_test

Test data of shape (n_subjects, n_timepoints).

TYPE: ndarray

y_test

Test labels (outlier mask).

TYPE: ndarray

best_params

Optimized LOF parameters (n_neighbors, contamination).

TYPE: dict

RETURNS DESCRIPTION
dict

Metrics dictionary with keys 'train', 'test', 'outlier_train', 'outlier_test'.

Source code in src/anomaly_detection/outlier_sklearn.py
def LOF_wrapper(
    X: np.ndarray,
    y: np.ndarray,
    X_test: np.ndarray,
    y_test: np.ndarray,
    best_params: Dict[str, Any],
) -> Dict[str, Dict[str, Any]]:
    """
    Run LOF outlier detection on train and test splits.

    Parameters
    ----------
    X : np.ndarray
        Training data of shape (n_subjects, n_timepoints).
    y : np.ndarray
        Training labels (outlier mask).
    X_test : np.ndarray
        Test data of shape (n_subjects, n_timepoints).
    y_test : np.ndarray
        Test labels (outlier mask).
    best_params : dict
        Optimized LOF parameters (n_neighbors, contamination).

    Returns
    -------
    dict
        Metrics dictionary with keys 'train', 'test', 'outlier_train', 'outlier_test'.
    """

    def preds_per_split(
        X: np.ndarray, y: np.ndarray, best_params: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Compute LOF predictions and metrics for a single data split.

        Parameters
        ----------
        X : np.ndarray
            Data of shape (n_subjects, n_timepoints).
        y : np.ndarray
            Ground truth outlier labels.
        best_params : dict
            Optimized LOF parameters.

        Returns
        -------
        dict
            Outlier detection metrics.
        """
        preds, clf = get_LOF(X, params=best_params, subjectwise=True)
        metrics = get_outlier_metrics(None, preds, y)
        return metrics

    metrics = {}
    metrics["train"] = preds_per_split(X=X, y=y, best_params=best_params)
    metrics["test"] = preds_per_split(X=X_test, y=y_test, best_params=best_params)
    # to match the other reconstruction methods
    metrics["outlier_train"] = metrics["train"]
    metrics["outlier_test"] = metrics["test"]

    return metrics

sklearn_outlier_hyperparameter_tuning

sklearn_outlier_hyperparameter_tuning(
    model_cfg: DictConfig,
    X: ndarray,
    y: ndarray,
    model_name: str,
    contamination: float,
    subjectwise: bool = True,
) -> Dict[str, int]

Perform grid search hyperparameter tuning for sklearn outlier detectors.

PARAMETER DESCRIPTION
model_cfg

Model configuration containing search space parameters.

TYPE: DictConfig

X

Training data of shape (n_subjects, n_timepoints).

TYPE: ndarray

y

Ground truth outlier labels.

TYPE: ndarray

model_name

Name of the model ('LOF' currently supported).

TYPE: str

contamination

Expected proportion of outliers in the data.

TYPE: float

subjectwise

Whether to fit model per subject. Default is True.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
dict

Best hyperparameters found (e.g., {'n_neighbors': 200}).

RAISES DESCRIPTION
NotImplementedError

If model_name is 'OneClassSVM' (not yet implemented).

ValueError

If model_name is not supported.

Source code in src/anomaly_detection/outlier_sklearn.py
def sklearn_outlier_hyperparameter_tuning(
    model_cfg: DictConfig,
    X: np.ndarray,
    y: np.ndarray,
    model_name: str,
    contamination: float,
    subjectwise: bool = True,
) -> Dict[str, int]:
    """
    Perform grid search hyperparameter tuning for sklearn outlier detectors.

    Parameters
    ----------
    model_cfg : DictConfig
        Model configuration containing search space parameters.
    X : np.ndarray
        Training data of shape (n_subjects, n_timepoints).
    y : np.ndarray
        Ground truth outlier labels.
    model_name : str
        Name of the model ('LOF' currently supported).
    contamination : float
        Expected proportion of outliers in the data.
    subjectwise : bool, optional
        Whether to fit model per subject. Default is True.

    Returns
    -------
    dict
        Best hyperparameters found (e.g., {'n_neighbors': 200}).

    Raises
    ------
    NotImplementedError
        If model_name is 'OneClassSVM' (not yet implemented).
    ValueError
        If model_name is not supported.
    """
    # Quick n dirty grid search as we have only now one hard-coded hyperparam
    n = model_cfg["SEARCH_SPACE"]["GRID"]["n_neighbors"]
    n_range = list(range(n[0], n[1] + n[2], n[2]))

    f1s = []
    if model_name == "LOF":
        for n_neigh in tqdm(n_range, "LOF Hyperparameter Tuning"):
            params = {"n_neighbors": n_neigh, "contamination": contamination}
            preds, _ = get_LOF(X, params, subjectwise)
            f1 = f1_score(y.flatten(), preds.flatten())
            f1s.append(f1)
    elif model_name == "OneClassSVM":
        raise NotImplementedError("OneClassSVM not implemented yet")
    else:
        logger.error(f"Model {model_name} not supported")
        raise ValueError(f"Model {model_name} not supported")

    best_f1 = np.max(f1s)

    if model_name == "LOF":
        best_n_neigh = n_range[np.argmax(f1s)]
        # LOF dataset-wise: Best F1: 0.09516351911561492, Best n_neigh: 145
        # LOF Best F1: 0.25406203840472674, Best n_neigh: 200
        logger.info(f"Best F1: {best_f1}, Best n_neigh: {best_n_neigh}")
        return {"n_neighbors": best_n_neigh}

subjectwise_OneClassSVM

subjectwise_OneClassSVM(
    X: ndarray, params: Dict[str, Any]
) -> ndarray

Apply One-Class SVM outlier detection subject by subject.

PARAMETER DESCRIPTION
X

Input data of shape (n_subjects, n_timepoints).

TYPE: ndarray

params

Parameters for OneClassSVM (gamma, kernel, nu).

TYPE: dict

RETURNS DESCRIPTION
ndarray

Binary outlier predictions of shape (n_subjects, n_timepoints).

Source code in src/anomaly_detection/outlier_sklearn.py
def subjectwise_OneClassSVM(X: np.ndarray, params: Dict[str, Any]) -> np.ndarray:
    """
    Apply One-Class SVM outlier detection subject by subject.

    Parameters
    ----------
    X : np.ndarray
        Input data of shape (n_subjects, n_timepoints).
    params : dict
        Parameters for OneClassSVM (gamma, kernel, nu).

    Returns
    -------
    np.ndarray
        Binary outlier predictions of shape (n_subjects, n_timepoints).
    """
    no_subjects = X.shape[0]
    preds = None
    for subj_idx in range(no_subjects):
        X_subj = X[subj_idx, :].reshape(-1, 1)
        clf = OneClassSVM(**params).fit(X_subj)
        pred = clf.predict(X_subj)
        pred = np.where(pred == -1, 1, 0)
        if preds is None:
            preds = pred
        else:
            preds = np.vstack((preds, pred))

    return preds

OneClassSVM_wrapper

OneClassSVM_wrapper(
    X: ndarray,
    y: ndarray,
    X_test: ndarray,
    y_test: ndarray,
    params: Dict[str, Any],
) -> Dict[str, Dict[str, Any]]

Run One-Class SVM outlier detection on train and test splits.

PARAMETER DESCRIPTION
X

Training data of shape (n_subjects, n_timepoints).

TYPE: ndarray

y

Training labels (outlier mask).

TYPE: ndarray

X_test

Test data of shape (n_subjects, n_timepoints).

TYPE: ndarray

y_test

Test labels (outlier mask).

TYPE: ndarray

params

OneClassSVM parameters (gamma, kernel, nu).

TYPE: dict

RETURNS DESCRIPTION
dict

Metrics dictionary with keys 'train', 'test', 'outlier_train', 'outlier_test'.

Source code in src/anomaly_detection/outlier_sklearn.py
def OneClassSVM_wrapper(
    X: np.ndarray,
    y: np.ndarray,
    X_test: np.ndarray,
    y_test: np.ndarray,
    params: Dict[str, Any],
) -> Dict[str, Dict[str, Any]]:
    """
    Run One-Class SVM outlier detection on train and test splits.

    Parameters
    ----------
    X : np.ndarray
        Training data of shape (n_subjects, n_timepoints).
    y : np.ndarray
        Training labels (outlier mask).
    X_test : np.ndarray
        Test data of shape (n_subjects, n_timepoints).
    y_test : np.ndarray
        Test labels (outlier mask).
    params : dict
        OneClassSVM parameters (gamma, kernel, nu).

    Returns
    -------
    dict
        Metrics dictionary with keys 'train', 'test', 'outlier_train', 'outlier_test'.
    """
    metrics = {}
    preds = subjectwise_OneClassSVM(X, params)
    metrics["train"] = get_outlier_metrics(None, preds, y)

    preds_test = subjectwise_OneClassSVM(X_test, params)
    metrics["test"] = get_outlier_metrics(None, preds_test, y_test)

    # to match the other reconstruction methods
    metrics["outlier_train"] = metrics["train"]
    metrics["outlier_test"] = metrics["test"]

    return metrics

mlflow_log_params

mlflow_log_params(params: Dict[str, Any]) -> None

Log parameters to MLflow.

PARAMETER DESCRIPTION
params

Dictionary of parameter names and values to log.

TYPE: dict

Notes

Logs a warning if a parameter cannot be logged.

Source code in src/anomaly_detection/outlier_sklearn.py
def mlflow_log_params(params: Dict[str, Any]) -> None:
    """
    Log parameters to MLflow.

    Parameters
    ----------
    params : dict
        Dictionary of parameter names and values to log.

    Notes
    -----
    Logs a warning if a parameter cannot be logged.
    """
    for key, val in params.items():
        try:
            mlflow.log_param(key, val)
        except Exception as e:
            logger.warning(f"Error in logging param {key}: {e}")

log_outlier_pickled_artifact

log_outlier_pickled_artifact(
    metrics: Dict[str, Any], model_name: str
) -> None

Save and log outlier detection results as a pickled artifact to MLflow.

PARAMETER DESCRIPTION
metrics

Outlier detection metrics and predictions to save.

TYPE: dict

model_name

Name of the model (used for filename).

TYPE: str

Source code in src/anomaly_detection/outlier_sklearn.py
def log_outlier_pickled_artifact(metrics: Dict[str, Any], model_name: str) -> None:
    """
    Save and log outlier detection results as a pickled artifact to MLflow.

    Parameters
    ----------
    metrics : dict
        Outlier detection metrics and predictions to save.
    model_name : str
        Name of the model (used for filename).
    """
    artifacts_dir = Path(get_artifacts_dir("outlier_detection"))
    fname = get_outlier_pickle_name(model_name)
    path_out = artifacts_dir / fname
    save_results_dict(metrics, str(path_out))
    mlflow.log_artifact(str(path_out), "outlier_detection")

log_prophet_model

log_prophet_model(
    model: Optional[Any], model_name: str
) -> None

Log a Prophet model to MLflow (placeholder).

PARAMETER DESCRIPTION
model

The Prophet model to log.

TYPE: Prophet or None

model_name

Name of the model.

TYPE: str

Notes

Currently a stub function; model logging not implemented.

Source code in src/anomaly_detection/outlier_sklearn.py
def log_prophet_model(model: Optional[Any], model_name: str) -> None:
    """
    Log a Prophet model to MLflow (placeholder).

    Parameters
    ----------
    model : Prophet or None
        The Prophet model to log.
    model_name : str
        Name of the model.

    Notes
    -----
    Currently a stub function; model logging not implemented.
    """
    if model is not None:
        logger.debug("if you have a model, log it here")

log_outlier_mlflow_artifacts

log_outlier_mlflow_artifacts(
    metrics: Dict[str, Dict[str, Any]],
    model: Optional[Any],
    model_name: str,
) -> None

Log outlier detection metrics and artifacts to MLflow.

PARAMETER DESCRIPTION
metrics

Dictionary containing metrics for each split with 'scalars' and 'arrays'.

TYPE: dict

model

The trained model (if applicable).

TYPE: object or None

model_name

Name of the model.

TYPE: str

Notes

Logs scalar metrics to MLflow. For array values (confidence intervals), logs separate _lo and _hi metrics.

Source code in src/anomaly_detection/outlier_sklearn.py
def log_outlier_mlflow_artifacts(
    metrics: Dict[str, Dict[str, Any]], model: Optional[Any], model_name: str
) -> None:
    """
    Log outlier detection metrics and artifacts to MLflow.

    Parameters
    ----------
    metrics : dict
        Dictionary containing metrics for each split with 'scalars' and 'arrays'.
    model : object or None
        The trained model (if applicable).
    model_name : str
        Name of the model.

    Notes
    -----
    Logs scalar metrics to MLflow. For array values (confidence intervals),
    logs separate _lo and _hi metrics.
    """
    for split in metrics:
        global_metrics = metrics[split]["scalars"]["global"]
        for key, value in global_metrics.items():
            if value is not None:
                if isinstance(value, np.ndarray):
                    mlflow.log_metric(f"{split}/{key}_lo", value[0])
                    mlflow.log_metric(f"{split}/{key}_hi", value[1])
                else:
                    mlflow.log_metric(f"{split}/{key}", value)

    log_outlier_pickled_artifact(metrics, model_name)
    log_prophet_model(model, model_name)

outlier_sklearn_wrapper

outlier_sklearn_wrapper(
    df: DataFrame,
    cfg: DictConfig,
    model_cfg: DictConfig,
    experiment_name: str,
    run_name: str,
    model_name: str,
) -> Tuple[Dict[str, Dict[str, Any]], None]

Run sklearn-based outlier detection with hyperparameter tuning.

Supports Local Outlier Factor (LOF) and One-Class SVM methods. For LOF, performs grid search hyperparameter tuning on n_neighbors.

PARAMETER DESCRIPTION
df

Input PLR data containing pupil signals.

TYPE: DataFrame

cfg

Full Hydra configuration.

TYPE: DictConfig

model_cfg

Model-specific configuration.

TYPE: DictConfig

experiment_name

MLflow experiment name.

TYPE: str

run_name

MLflow run name.

TYPE: str

model_name

Name of the model: 'LOF' or 'OneClassSVM'.

TYPE: str

RETURNS DESCRIPTION
tuple

A tuple containing: - metrics : dict Outlier detection metrics for train and test splits. - model : None No model object returned for sklearn methods.

RAISES DESCRIPTION
ValueError

If model_name is not supported.

References

Hyperparameter tuning approach: https://github.com/vsatyakumar/automatic-local-outlier-factor-tuning https://arxiv.org/abs/1902.00567

Source code in src/anomaly_detection/outlier_sklearn.py
def outlier_sklearn_wrapper(
    df: pl.DataFrame,
    cfg: DictConfig,
    model_cfg: DictConfig,
    experiment_name: str,
    run_name: str,
    model_name: str,
) -> Tuple[Dict[str, Dict[str, Any]], None]:
    """
    Run sklearn-based outlier detection with hyperparameter tuning.

    Supports Local Outlier Factor (LOF) and One-Class SVM methods.
    For LOF, performs grid search hyperparameter tuning on n_neighbors.

    Parameters
    ----------
    df : pl.DataFrame
        Input PLR data containing pupil signals.
    cfg : DictConfig
        Full Hydra configuration.
    model_cfg : DictConfig
        Model-specific configuration.
    experiment_name : str
        MLflow experiment name.
    run_name : str
        MLflow run name.
    model_name : str
        Name of the model: 'LOF' or 'OneClassSVM'.

    Returns
    -------
    tuple
        A tuple containing:
        - metrics : dict
            Outlier detection metrics for train and test splits.
        - model : None
            No model object returned for sklearn methods.

    Raises
    ------
    ValueError
        If model_name is not supported.

    References
    ----------
    Hyperparameter tuning approach:
    https://github.com/vsatyakumar/automatic-local-outlier-factor-tuning
    https://arxiv.org/abs/1902.00567
    """

    train_on = model_cfg["MODEL"]["train_on"]
    from src.anomaly_detection.anomaly_utils import get_data_for_sklearn_anomaly_models

    X, y, X_test, y_test, _ = get_data_for_sklearn_anomaly_models(
        df=df, cfg=cfg, train_on=train_on
    )
    contamination = np.sum(y.flatten()) / len(y.flatten())  # 0.077
    # sklearn LOF requires contamination in (0, 0.5], cap it if needed (e.g., synthetic data)
    if contamination > 0.5:
        logger.warning(
            f"Contamination {contamination:.4f} exceeds 0.5 limit, capping to 0.499"
        )
        contamination = 0.499

    # HPO for LOF
    if model_name == "LOF":
        best_params = sklearn_outlier_hyperparameter_tuning(
            model_cfg, X, y, model_name, contamination=contamination
        )

    if model_name == "LOF":
        best_params = {**best_params, "contamination": contamination}
        mlflow_log_params(best_params)
        metrics = LOF_wrapper(X, y, X_test, y_test, best_params)

    elif model_name == "OneClassSVM":
        params = {
            "gamma": model_cfg["MODEL"]["gamma"],
            "kernel": model_cfg["MODEL"]["kernel"],
            "nu": contamination,
        }
        mlflow_log_params(params)
        metrics = OneClassSVM_wrapper(X, y, X_test, y_test, params)

    else:
        logger.error(f"Model {model_name} not supported")
        raise ValueError(f"Model {model_name} not supported")

    # Log the metrics and the results
    log_outlier_mlflow_artifacts(metrics, None, model_name)
    mlflow.end_run()

    return metrics, None

outlier_prophet

create_ds

create_ds(X_subj: ndarray, fps: int = 30) -> List[datetime]

Create datetime series for Prophet from sample indices.

Converts sample indices to datetime objects based on the sampling rate, which Prophet requires for time series modeling.

PARAMETER DESCRIPTION
X_subj

Subject data array (used for length).

TYPE: ndarray

fps

Sampling rate in frames per second. Default is 30.

TYPE: int DEFAULT: 30

RETURNS DESCRIPTION
list

List of datetime objects representing each timepoint.

Source code in src/anomaly_detection/outlier_prophet.py
def create_ds(X_subj: np.ndarray, fps: int = 30) -> List[datetime.datetime]:
    """
    Create datetime series for Prophet from sample indices.

    Converts sample indices to datetime objects based on the sampling rate,
    which Prophet requires for time series modeling.

    Parameters
    ----------
    X_subj : np.ndarray
        Subject data array (used for length).
    fps : int, optional
        Sampling rate in frames per second. Default is 30.

    Returns
    -------
    list
        List of datetime objects representing each timepoint.
    """
    # in seconds
    time_vector = np.linspace(0, X_subj.shape[0] - 1, X_subj.shape[0]) / fps

    # add seconds to a dummy date
    a = datetime.datetime(2000, 1, 1, 12, 00, 00)
    ds = [a + datetime.timedelta(0, t) for t in time_vector]

    return ds

reject_outliers

reject_outliers(
    pred: DataFrame, model_cfg: DictConfig
) -> Tuple[ndarray, ndarray]

Identify outliers based on Prophet prediction uncertainty.

Points where the prediction error exceeds a factor of the uncertainty interval are flagged as outliers.

PARAMETER DESCRIPTION
pred

Prophet prediction DataFrame with columns 'y', 'yhat', 'yhat_upper', 'yhat_lower'.

TYPE: DataFrame

model_cfg

Model configuration containing 'uncertainty_factor'.

TYPE: DictConfig

RETURNS DESCRIPTION
tuple

A tuple containing: - y : np.ndarray Original values with outliers set to NaN. - pred_mask : np.ndarray Binary mask (1 = outlier, 0 = inlier).

References

https://medium.com/@reza.rajabi/outlier-and-anomaly-detection-using-facebook-prophet-in-python-3a83d58b1bdf

Source code in src/anomaly_detection/outlier_prophet.py
def reject_outliers(
    pred: pd.DataFrame, model_cfg: DictConfig
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Identify outliers based on Prophet prediction uncertainty.

    Points where the prediction error exceeds a factor of the uncertainty
    interval are flagged as outliers.

    Parameters
    ----------
    pred : pd.DataFrame
        Prophet prediction DataFrame with columns 'y', 'yhat', 'yhat_upper', 'yhat_lower'.
    model_cfg : DictConfig
        Model configuration containing 'uncertainty_factor'.

    Returns
    -------
    tuple
        A tuple containing:
        - y : np.ndarray
            Original values with outliers set to NaN.
        - pred_mask : np.ndarray
            Binary mask (1 = outlier, 0 = inlier).

    References
    ----------
    https://medium.com/@reza.rajabi/outlier-and-anomaly-detection-using-facebook-prophet-in-python-3a83d58b1bdf
    """
    # We calculate the prediction error here and uncertainty
    pred["error"] = pred["y"] - pred["yhat"]
    pred["uncertainty"] = pred["yhat_upper"] - pred["yhat_lower"]

    # We this factor we can identify the outlier or anomaly.
    # This factor can be customized based on the data
    factor = model_cfg["MODEL"]["uncertainty_factor"]
    pred["anomaly"] = pred.apply(
        lambda x: 1 if (np.abs(x["error"]) > factor * x["uncertainty"]) else 0,
        axis=1,
    )
    # no_anomalies = pred['anomaly'].sum()

    # Set the outliers to NaN
    pred.loc[pred["anomaly"] == 1, "y"] = np.nan
    pred_mask = pred["anomaly"]

    # Remove one future timestep
    y = pred["y"].values[:-1]
    pred_mask = pred_mask.values[:-1]

    return y, pred_mask

pad_input

pad_input(X_subj: ndarray) -> ndarray

Pad input array by duplicating the last value.

PARAMETER DESCRIPTION
X_subj

Input array of shape (n_timepoints, 1).

TYPE: ndarray

RETURNS DESCRIPTION
ndarray

Padded array of shape (n_timepoints + 1, 1).

Source code in src/anomaly_detection/outlier_prophet.py
def pad_input(X_subj: np.ndarray) -> np.ndarray:
    """
    Pad input array by duplicating the last value.

    Parameters
    ----------
    X_subj : np.ndarray
        Input array of shape (n_timepoints, 1).

    Returns
    -------
    np.ndarray
        Padded array of shape (n_timepoints + 1, 1).
    """
    X_new = np.zeros((X_subj.shape[0] + 1, 1))
    X_new[0:-1] = X_subj
    X_new[-1] = X_new[-2]
    return X_new

plot_fitted_model

plot_fitted_model(model: Prophet, pred: DataFrame) -> None

Display Prophet model fit visualization.

PARAMETER DESCRIPTION
model

Fitted Prophet model.

TYPE: Prophet

pred

Prophet prediction DataFrame.

TYPE: DataFrame

Source code in src/anomaly_detection/outlier_prophet.py
def plot_fitted_model(model: Prophet, pred: pd.DataFrame) -> None:
    """
    Display Prophet model fit visualization.

    Parameters
    ----------
    model : Prophet
        Fitted Prophet model.
    pred : pd.DataFrame
        Prophet prediction DataFrame.
    """
    _ = model.plot(pred)
    plt.show()

create_prophet_df

create_prophet_df(X: ndarray) -> DataFrame

Create Prophet-compatible DataFrame from time series data.

PARAMETER DESCRIPTION
X

Input data array of shape (n_timepoints, 1).

TYPE: ndarray

RETURNS DESCRIPTION
DataFrame

DataFrame with columns 'ds' (datetime) and 'y' (values).

RAISES DESCRIPTION
NotImplementedError

If X has more than one channel (multivariate not supported).

Source code in src/anomaly_detection/outlier_prophet.py
def create_prophet_df(X: np.ndarray) -> pd.DataFrame:
    """
    Create Prophet-compatible DataFrame from time series data.

    Parameters
    ----------
    X : np.ndarray
        Input data array of shape (n_timepoints, 1).

    Returns
    -------
    pd.DataFrame
        DataFrame with columns 'ds' (datetime) and 'y' (values).

    Raises
    ------
    NotImplementedError
        If X has more than one channel (multivariate not supported).
    """
    # Prophet requires the time series to be in a DataFrame
    ds = create_ds(X)
    if X.shape[1] == 1:
        df = pd.DataFrame({"ds": ds, "y": X.flatten()})
    else:
        logger.error("Multiple timeseries, is it possible to use Prophet?")
        # Not multivariate per se, but multiple timeseries (different subjects
        raise NotImplementedError("Multiple timeseries, is it possible to use Prophet?")

    return df

get_changepoints_from_light

get_changepoints_from_light(
    light: Dict[str, Any], df: DataFrame
) -> Series

Extract changepoints from light stimulus timing for Prophet.

Identifies key physiological events (light onsets/offsets and maximum constriction points) to use as manual changepoints in Prophet.

PARAMETER DESCRIPTION
light

Light stimulus data with 'Red' and 'Blue' channels.

TYPE: dict

df

Prophet DataFrame with 'ds' and 'y' columns.

TYPE: DataFrame

RETURNS DESCRIPTION
Series

Sorted datetime changepoints including light onsets/offsets and maximum constriction times.

References

https://facebook.github.io/prophet/docs/trend_changepoints.html https://github.com/facebook/prophet/issues/697

Source code in src/anomaly_detection/outlier_prophet.py
def get_changepoints_from_light(light: Dict[str, Any], df: pd.DataFrame) -> pd.Series:
    """
    Extract changepoints from light stimulus timing for Prophet.

    Identifies key physiological events (light onsets/offsets and maximum
    constriction points) to use as manual changepoints in Prophet.

    Parameters
    ----------
    light : dict
        Light stimulus data with 'Red' and 'Blue' channels.
    df : pd.DataFrame
        Prophet DataFrame with 'ds' and 'y' columns.

    Returns
    -------
    pd.Series
        Sorted datetime changepoints including light onsets/offsets
        and maximum constriction times.

    References
    ----------
    https://facebook.github.io/prophet/docs/trend_changepoints.html
    https://github.com/facebook/prophet/issues/697
    """

    def get_color_onset_offset(light_df: pl.DataFrame, color: str) -> List[Any]:
        """
        Get onset and offset timestamps for a specific light color.

        Parameters
        ----------
        light_df : pl.DataFrame
            DataFrame containing light stimulus data with 'ds' column.
        color : str
            Light color channel ('Red' or 'Blue').

        Returns
        -------
        list
            Two-element list with onset and offset timestamps.
        """
        changepoints = []
        light_onset_row = get_top1_of_col(df=light_df, col=color, descending=False)
        # If you want to add some delay (offset), here is your change, e.g. 200 ms delay
        changepoints.append(light_onset_row["ds"].to_numpy()[0])
        light_offset_row = get_top1_of_col(df=light_df, col=color, descending=True)
        changepoints.append(light_offset_row["ds"].to_numpy()[0])
        return changepoints

    def get_light_onsets_and_offsets(light_df: pl.DataFrame) -> List[Any]:
        """
        Get all light onset and offset timestamps for Red and Blue channels.

        Parameters
        ----------
        light_df : pl.DataFrame
            DataFrame containing light stimulus data.

        Returns
        -------
        list
            Four-element list with Red onset, Red offset, Blue onset, Blue offset.
        """
        changepoints = []
        for color in ["Red", "Blue"]:
            changepoints += get_color_onset_offset(light_df, color)
        assert len(changepoints) == 4
        return changepoints

    def smooth_pupil(x: np.ndarray, pupil_signal: np.ndarray) -> np.ndarray:
        """
        Apply Savitzky-Golay filter to smooth pupil signal.

        Parameters
        ----------
        x : np.ndarray
            Time vector (unused, kept for interface consistency).
        pupil_signal : np.ndarray
            Raw pupil signal values.

        Returns
        -------
        np.ndarray
            Smoothed pupil signal.
        """
        smooth = signal.savgol_filter(
            pupil_signal,
            53,  # window size used for filtering
            3,  # order of fitted polynomial
        )
        return smooth

    def get_color_df(
        df_pd: pd.DataFrame,
        onset: datetime.datetime,
        offset: datetime.datetime,
        col: str = "ds",
    ) -> pd.DataFrame:
        """
        Filter DataFrame to time window between onset and offset.

        Parameters
        ----------
        df_pd : pd.DataFrame
            DataFrame with datetime column.
        onset : datetime
            Start of time window.
        offset : datetime
            End of time window.
        col : str, optional
            Column name for datetime values. Default is 'ds'.

        Returns
        -------
        pd.DataFrame
            Filtered DataFrame containing only rows within the time window.
        """
        df_pd = df_pd[(df_pd[col] >= onset) & (df_pd[col] <= offset)]
        return df_pd

    def get_max_constriction(df: pl.DataFrame) -> List[Any]:
        """
        Find timestamps of maximum pupil constriction for each light color.

        Parameters
        ----------
        df : pl.DataFrame
            DataFrame with 'ds' and 'y' columns.

        Returns
        -------
        list
            Two-element list with max constriction timestamps for Red and Blue.
        """
        changepoints_out = []
        pupil_signal = df["y"].to_numpy()
        df["y"] = smooth_pupil(x=df["ds"].to_numpy(), pupil_signal=pupil_signal)
        for color in ["Red", "Blue"]:
            changepoints = get_color_onset_offset(light_df, color)
            values_per_color = get_color_df(
                df_pd=df, onset=changepoints[0], offset=changepoints[1]
            )
            min_row = values_per_color[
                values_per_color["y"] == values_per_color["y"].min()
            ]
            if min_row.shape[0] > 1:
                changepoints_out.append(min_row["ds"].to_numpy()[0])
        assert len(changepoints_out) == 2
        return changepoints_out

    # https://facebook.github.io/prophet/docs/trend_changepoints.html
    # https://github.com/facebook/prophet/issues/697
    # These in practice are the timestamps from ds
    light["ds"] = df["ds"]
    light_df = pl.DataFrame(light)
    changepoints_offset_onset = get_light_onsets_and_offsets(light_df)
    changepoints_max_constriction = get_max_constriction(df)

    sorted_list = sorted(changepoints_offset_onset + changepoints_max_constriction)
    changepoints: pd.Series = pd.Series(sorted_list)

    return changepoints

add_manual_changepoints

add_manual_changepoints(
    auto_changepoints: Series,
    changepoints: List[Any],
    df: DataFrame,
) -> Series

Combine automatic and manual changepoints.

PARAMETER DESCRIPTION
auto_changepoints

Changepoints automatically detected by Prophet.

TYPE: Series

changepoints

Manually specified changepoints.

TYPE: list

df

Prophet DataFrame to match timestamps.

TYPE: DataFrame

RETURNS DESCRIPTION
Series

Combined unique changepoints.

Source code in src/anomaly_detection/outlier_prophet.py
def add_manual_changepoints(
    auto_changepoints: pd.Series, changepoints: List[Any], df: pd.DataFrame
) -> pd.Series:
    """
    Combine automatic and manual changepoints.

    Parameters
    ----------
    auto_changepoints : pd.Series
        Changepoints automatically detected by Prophet.
    changepoints : list
        Manually specified changepoints.
    df : pd.DataFrame
        Prophet DataFrame to match timestamps.

    Returns
    -------
    pd.Series
        Combined unique changepoints.
    """
    # do you need the correct index? anyway picking the ds values from original df
    df_out = pd.DataFrame()
    for changepoint in changepoints:
        # find the matching timestamp from df["ds"]
        row = df[df["ds"] == changepoint]
        df_out = pd.concat([df_out, row])

    # same for the auto changepoints
    for changepoint in auto_changepoints:
        row = df[df["ds"] == changepoint]
        df_out = pd.concat([df_out, row])

    return pd.Series(pd.Series(df_out["ds"]).unique())

get_prophet_model

get_prophet_model(
    df: DataFrame,
    light: Dict[str, Any],
    model_cfg: DictConfig,
) -> Prophet

Fit a Prophet model with optional manual changepoints.

PARAMETER DESCRIPTION
df

Prophet DataFrame with 'ds' and 'y' columns.

TYPE: DataFrame

light

Light stimulus data for manual changepoint extraction.

TYPE: dict

model_cfg

Model configuration with 'changepoint_prior_scale' and 'manual_changepoints' settings.

TYPE: DictConfig

RETURNS DESCRIPTION
Prophet

Fitted Prophet model.

RAISES DESCRIPTION
ValueError

If manual_changepoints method is not recognized.

Notes

If manual_changepoints is 'light', extracts changepoints from light stimulus timing and refits the model with combined changepoints.

Source code in src/anomaly_detection/outlier_prophet.py
def get_prophet_model(
    df: pd.DataFrame, light: Dict[str, Any], model_cfg: DictConfig
) -> Prophet:
    """
    Fit a Prophet model with optional manual changepoints.

    Parameters
    ----------
    df : pd.DataFrame
        Prophet DataFrame with 'ds' and 'y' columns.
    light : dict
        Light stimulus data for manual changepoint extraction.
    model_cfg : DictConfig
        Model configuration with 'changepoint_prior_scale' and
        'manual_changepoints' settings.

    Returns
    -------
    Prophet
        Fitted Prophet model.

    Raises
    ------
    ValueError
        If manual_changepoints method is not recognized.

    Notes
    -----
    If manual_changepoints is 'light', extracts changepoints from light
    stimulus timing and refits the model with combined changepoints.
    """
    # Get the changepoints
    if model_cfg["MODEL"]["manual_changepoints"] is not None:
        if model_cfg["MODEL"]["manual_changepoints"] == "light":
            changepoints = get_changepoints_from_light(light, deepcopy(df))
        else:
            logger.error(
                "Manual changepoints method not recognized, = {}".format(
                    model_cfg["MODEL"]["manual_changepoints"]
                )
            )
            raise ValueError(
                "Manual changepoints method not recognized, = {}".format(
                    model_cfg["MODEL"]["manual_changepoints"]
                )
            )

    # TODO! Could you get a better trend here by creating a trend model from the "pupil_gt" that is pretty
    #  smooth and then use that as a prior for the trend?
    model = Prophet(
        changepoint_prior_scale=model_cfg["MODEL"]["changepoint_prior_scale"]
    ).fit(df)
    auto_changepoints = model.changepoints
    # I guess you just have to deal with the excessive log prints?
    # https://stackoverflow.com/a/76233910/6412152

    if model_cfg["MODEL"]["manual_changepoints"] == "light":
        auto_changepoints = model.changepoints
        # Add the manual changepoints to auto changepoints
        changepoints_new = add_manual_changepoints(auto_changepoints, changepoints, df)

        # Refit the model
        model = Prophet(
            changepoint_prior_scale=model_cfg["MODEL"]["changepoint_prior_scale"],
            changepoints=changepoints_new,
        ).fit(df)

    return model

prophet_per_X

prophet_per_X(
    X: ndarray,
    y: ndarray,
    light: Dict[str, Any],
    model_cfg: DictConfig,
    model: Optional[Prophet] = None,
    plot: bool = False,
) -> Tuple[ndarray, ndarray, int, Prophet]

Run Prophet outlier detection on a single time series.

PARAMETER DESCRIPTION
X

Input time series of shape (n_timepoints, 1).

TYPE: ndarray

y

Ground truth labels (not used in detection, for consistency).

TYPE: ndarray

light

Light stimulus data for changepoint extraction.

TYPE: ndarray

model_cfg

Model configuration.

TYPE: DictConfig

model

Pre-fitted model to use. If None, fits a new model. Default is None.

TYPE: Prophet DEFAULT: None

plot

Whether to display the fit visualization. Default is False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
tuple

A tuple containing: - y : np.ndarray Original values with outliers set to NaN. - pred_mask : np.ndarray Binary outlier mask. - no_outliers : int Number of detected outliers. - model : Prophet The fitted Prophet model.

Source code in src/anomaly_detection/outlier_prophet.py
def prophet_per_X(
    X: np.ndarray,
    y: np.ndarray,
    light: Dict[str, Any],
    model_cfg: DictConfig,
    model: Optional[Prophet] = None,
    plot: bool = False,
) -> Tuple[np.ndarray, np.ndarray, int, Prophet]:
    """
    Run Prophet outlier detection on a single time series.

    Parameters
    ----------
    X : np.ndarray
        Input time series of shape (n_timepoints, 1).
    y : np.ndarray
        Ground truth labels (not used in detection, for consistency).
    light : np.ndarray
        Light stimulus data for changepoint extraction.
    model_cfg : DictConfig
        Model configuration.
    model : Prophet, optional
        Pre-fitted model to use. If None, fits a new model. Default is None.
    plot : bool, optional
        Whether to display the fit visualization. Default is False.

    Returns
    -------
    tuple
        A tuple containing:
        - y : np.ndarray
            Original values with outliers set to NaN.
        - pred_mask : np.ndarray
            Binary outlier mask.
        - no_outliers : int
            Number of detected outliers.
        - model : Prophet
            The fitted Prophet model.
    """
    # Create the dataframe
    df = create_prophet_df(X)

    # Fit the model
    if model is None:
        model = get_prophet_model(df, light, model_cfg)

    # predict 1-second in the future
    future = model.make_future_dataframe(periods=1, freq="s")
    pred = model.predict(future)
    pred["y"] = pad_input(X)
    if plot:
        # when you are debugging, and want to visualize
        plot_fitted_model(model, pred)

    # Reject outliers
    # This is in practice quite conservative, and removes clear outliers but keeps a lot of the outliers
    y, pred_mask = reject_outliers(pred, model_cfg)
    no_outliers = np.sum(pred_mask)

    return y, pred_mask, no_outliers, model

prophet_per_split

prophet_per_split(
    X: ndarray,
    y: ndarray,
    light: Dict[str, Any],
    model_cfg: DictConfig,
    model: Optional[Prophet] = None,
) -> Tuple[ndarray, ndarray, ndarray]

Run Prophet outlier detection on all subjects in a split.

PARAMETER DESCRIPTION
X

Input data of shape (n_subjects, n_timepoints).

TYPE: ndarray

y

Ground truth labels of shape (n_subjects, n_timepoints).

TYPE: ndarray

light

Light stimulus data.

TYPE: dict

model_cfg

Model configuration.

TYPE: DictConfig

model

Pre-fitted model to use for all subjects. Default is None.

TYPE: Prophet DEFAULT: None

RETURNS DESCRIPTION
tuple

A tuple containing: - X_cleaned : np.ndarray Cleaned signals with outliers as NaN. - pred_masks : np.ndarray Binary outlier masks for all subjects. - no_outliers_per_subject : np.ndarray Number of outliers detected per subject.

Source code in src/anomaly_detection/outlier_prophet.py
def prophet_per_split(
    X: np.ndarray,
    y: np.ndarray,
    light: Dict[str, Any],
    model_cfg: DictConfig,
    model: Optional[Prophet] = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Run Prophet outlier detection on all subjects in a split.

    Parameters
    ----------
    X : np.ndarray
        Input data of shape (n_subjects, n_timepoints).
    y : np.ndarray
        Ground truth labels of shape (n_subjects, n_timepoints).
    light : dict
        Light stimulus data.
    model_cfg : DictConfig
        Model configuration.
    model : Prophet, optional
        Pre-fitted model to use for all subjects. Default is None.

    Returns
    -------
    tuple
        A tuple containing:
        - X_cleaned : np.ndarray
            Cleaned signals with outliers as NaN.
        - pred_masks : np.ndarray
            Binary outlier masks for all subjects.
        - no_outliers_per_subject : np.ndarray
            Number of outliers detected per subject.
    """
    no_subjects = X.shape[0]
    no_outliers_per_subject = np.zeros(no_subjects)
    X_cleaned = None
    pred_masks = None
    for subj_idx in range(no_subjects):
        X_subj = X[subj_idx, :].reshape(-1, 1)
        y_subj = y[subj_idx, :].reshape(-1, 1)
        assert X_subj.shape[0] == y_subj.shape[0]
        X_out, pred_mask, no_outliers, _ = prophet_per_X(
            X=X_subj, y=y_subj, light=light, model_cfg=model_cfg, model=model
        )

        no_outliers_per_subject[subj_idx] = no_outliers
        if X_cleaned is None:
            X_cleaned = X_out
            pred_masks = pred_mask
        else:
            X_cleaned = np.vstack((X_cleaned, X_out))
            pred_masks = np.vstack((pred_masks, pred_mask))

    return X_cleaned, pred_masks, no_outliers_per_subject

prophet_dataset_per_split

prophet_dataset_per_split(
    X: ndarray,
    y: ndarray,
    X_test: ndarray,
    y_test: ndarray,
    light: Dict[str, Any],
    model_cfg: DictConfig,
) -> Tuple[ndarray, int, ndarray, ndarray, Prophet]

Train Prophet on training data and apply to test set.

PARAMETER DESCRIPTION
X

Training data.

TYPE: ndarray

y

Training labels.

TYPE: ndarray

X_test

Test data.

TYPE: ndarray

y_test

Test labels.

TYPE: ndarray

light

Light stimulus data.

TYPE: dict

model_cfg

Model configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
tuple

A tuple containing: - pred_mask : np.ndarray Training outlier mask. - no_outliers : int Number of training outliers. - pred_masks_test : np.ndarray Test outlier masks. - no_outliers_test : np.ndarray Number of outliers per test subject. - model : Prophet The trained Prophet model.

Source code in src/anomaly_detection/outlier_prophet.py
def prophet_dataset_per_split(
    X: np.ndarray,
    y: np.ndarray,
    X_test: np.ndarray,
    y_test: np.ndarray,
    light: Dict[str, Any],
    model_cfg: DictConfig,
) -> Tuple[np.ndarray, int, np.ndarray, np.ndarray, Prophet]:
    """
    Train Prophet on training data and apply to test set.

    Parameters
    ----------
    X : np.ndarray
        Training data.
    y : np.ndarray
        Training labels.
    X_test : np.ndarray
        Test data.
    y_test : np.ndarray
        Test labels.
    light : dict
        Light stimulus data.
    model_cfg : DictConfig
        Model configuration.

    Returns
    -------
    tuple
        A tuple containing:
        - pred_mask : np.ndarray
            Training outlier mask.
        - no_outliers : int
            Number of training outliers.
        - pred_masks_test : np.ndarray
            Test outlier masks.
        - no_outliers_test : np.ndarray
            Number of outliers per test subject.
        - model : Prophet
            The trained Prophet model.
    """
    _, pred_mask, no_outliers, model = prophet_per_X(
        X=X, y=y, light=light, model_cfg=model_cfg
    )
    _, pred_masks_test, no_outliers_test = prophet_per_split(
        X_test, y_test, light, model_cfg, model=model
    )

    return pred_mask, no_outliers, pred_masks_test, no_outliers_test, model

outlier_prophet_wrapper

outlier_prophet_wrapper(
    df: DataFrame,
    cfg: DictConfig,
    model_cfg: DictConfig,
    experiment_name: str,
    run_name: str,
) -> Tuple[Dict[str, Dict[str, Any]], Optional[Prophet]]

Run Prophet-based outlier detection on PLR data.

Uses Facebook Prophet to model the time series trend and identifies outliers based on prediction uncertainty intervals.

PARAMETER DESCRIPTION
df

Input PLR data.

TYPE: DataFrame

cfg

Full Hydra configuration.

TYPE: DictConfig

model_cfg

Prophet model configuration.

TYPE: DictConfig

experiment_name

MLflow experiment name.

TYPE: str

run_name

MLflow run name.

TYPE: str

RETURNS DESCRIPTION
tuple

A tuple containing: - metrics : dict Outlier detection metrics for train and test splits. - model : Prophet or None The trained Prophet model (None for per_subject mode).

RAISES DESCRIPTION
NotImplementedError

If train_method is 'datasetwise' (not yet implemented).

ValueError

If train_method is not recognized.

Source code in src/anomaly_detection/outlier_prophet.py
def outlier_prophet_wrapper(
    df: pl.DataFrame,
    cfg: DictConfig,
    model_cfg: DictConfig,
    experiment_name: str,
    run_name: str,
) -> Tuple[Dict[str, Dict[str, Any]], Optional[Prophet]]:
    """
    Run Prophet-based outlier detection on PLR data.

    Uses Facebook Prophet to model the time series trend and identifies
    outliers based on prediction uncertainty intervals.

    Parameters
    ----------
    df : pl.DataFrame
        Input PLR data.
    cfg : DictConfig
        Full Hydra configuration.
    model_cfg : DictConfig
        Prophet model configuration.
    experiment_name : str
        MLflow experiment name.
    run_name : str
        MLflow run name.

    Returns
    -------
    tuple
        A tuple containing:
        - metrics : dict
            Outlier detection metrics for train and test splits.
        - model : Prophet or None
            The trained Prophet model (None for per_subject mode).

    Raises
    ------
    NotImplementedError
        If train_method is 'datasetwise' (not yet implemented).
    ValueError
        If train_method is not recognized.
    """
    train_on = model_cfg["MODEL"]["train_on"]
    X, y, X_test, y_test, light = get_data_for_sklearn_anomaly_models(
        df=df, cfg=cfg, train_on=train_on
    )

    # Get outliers
    if model_cfg["MODEL"]["train_method"] == "per_subject":
        # No training per se, just per subject
        logger.info("Subject-wise PROPHET")
        _, pred_masks_train, no_outliers_train = prophet_per_split(
            X, y, light, model_cfg
        )
        _, pred_masks_test, no_outliers_test = prophet_per_split(
            X_test, y_test, light, model_cfg
        )
        model = None
    elif model_cfg["MODEL"]["train_method"] == "datasetwise":
        raise NotImplementedError("Dataset-wise training not implemented yet")
    else:
        logger.error(
            "Unrecognized train method = {}".format(model_cfg["MODEL"]["train_method"])
        )
        raise ValueError(
            "Unrecognized train method = {}".format(model_cfg["MODEL"]["train_method"])
        )

    # Get metrics
    metrics = {}
    metrics["train"] = get_outlier_metrics(
        None, pred_masks_train, y, df=df, cfg=cfg, split="train"
    )
    metrics["test"] = get_outlier_metrics(
        None, pred_masks_test, y_test, df=df, cfg=cfg, split="test"
    )

    metrics["outlier_train"] = metrics["train"]
    metrics["outlier_test"] = metrics["test"]

    # Log the metrics and the results
    log_outlier_mlflow_artifacts(metrics, model, "PROPHET")
    mlflow.end_run()

    return metrics, model

TimesNet Integration

timesnet_wrapper

log_mlflow_params

log_mlflow_params(
    model: Module, outlier_model_cfg: DictConfig
) -> None

Log TimesNet model parameters to MLflow.

PARAMETER DESCRIPTION
model

TimesNet model.

TYPE: Module

outlier_model_cfg

Model configuration containing PARAMS and MODEL sections.

TYPE: DictConfig

Source code in src/anomaly_detection/timesnet_wrapper.py
def log_mlflow_params(model: torch.nn.Module, outlier_model_cfg: DictConfig) -> None:
    """
    Log TimesNet model parameters to MLflow.

    Parameters
    ----------
    model : torch.nn.Module
        TimesNet model.
    outlier_model_cfg : DictConfig
        Model configuration containing PARAMS and MODEL sections.
    """
    # Get number of parameters
    mlflow.log_param(
        "num_params", sum(p.numel() for p in model.parameters() if p.requires_grad)
    )
    for key, value in outlier_model_cfg["PARAMS"].items():
        mlflow.log_param(key, value)
    for key, value in outlier_model_cfg["MODEL"].items():
        mlflow.log_param(key, value)

log_timesnet_mlflow_metrics

log_timesnet_mlflow_metrics(
    metrics: dict, results_best: dict, best_epoch: int
) -> None

Log TimesNet training metrics to MLflow.

PARAMETER DESCRIPTION
metrics

Dictionary of metrics per split containing 'global' scalar values.

TYPE: dict

results_best

Best epoch results containing losses per split.

TYPE: dict

best_epoch

Index of the best training epoch.

TYPE: int

Source code in src/anomaly_detection/timesnet_wrapper.py
def log_timesnet_mlflow_metrics(
    metrics: dict, results_best: dict, best_epoch: int
) -> None:
    """
    Log TimesNet training metrics to MLflow.

    Parameters
    ----------
    metrics : dict
        Dictionary of metrics per split containing 'global' scalar values.
    results_best : dict
        Best epoch results containing losses per split.
    best_epoch : int
        Index of the best training epoch.
    """
    mlflow.log_metric("best_epoch", best_epoch)

    for split in metrics:
        for key, value in metrics[split]["global"].items():
            if value is not None:
                if isinstance(value, np.ndarray):
                    mlflow.log_metric(f"{split}/{key}_lo", value[0])
                    mlflow.log_metric(f"{split}/{key}_hi", value[1])
                else:
                    mlflow.log_metric(f"{split}/{key}", value)

    for split in results_best:
        if "losses" in results_best[split]:
            loss_values = results_best[split]["losses"]
            loss_value = loss_values[best_epoch]
            mlflow.log_metric(f"{split}/recon_loss", loss_value)

timesnet_outlier_wrapper

timesnet_outlier_wrapper(
    df: DataFrame,
    cfg: DictConfig,
    outlier_model_cfg: DictConfig,
    experiment_name: str,
    run_name: str,
    task: str = "outlier_detection",
    model_name: str = "TimesNet",
) -> tuple[dict, Module]

Run TimesNet-based outlier detection on PLR data.

TimesNet uses temporal 2D variations for time series analysis. This wrapper handles data preparation, training, and MLflow logging.

PARAMETER DESCRIPTION
df

Input PLR data.

TYPE: DataFrame

cfg

Full Hydra configuration.

TYPE: DictConfig

outlier_model_cfg

TimesNet model configuration.

TYPE: DictConfig

experiment_name

MLflow experiment name.

TYPE: str

run_name

MLflow run name.

TYPE: str

task

Task name. Default is 'outlier_detection'.

TYPE: str DEFAULT: 'outlier_detection'

model_name

Model name for logging. Default is 'TimesNet'.

TYPE: str DEFAULT: 'TimesNet'

RETURNS DESCRIPTION
tuple

A tuple containing: - outlier_artifacts : dict Dictionary with metrics, predictions, and metadata. - model : torch.nn.Module The trained TimesNet model.

References

"4.3 Anomaly Detection" https://github.com/thuml/Time-Series-Library/blob/main/tutorial/TimesNet_tutorial.ipynb

Source code in src/anomaly_detection/timesnet_wrapper.py
def timesnet_outlier_wrapper(
    df: pl.DataFrame,
    cfg: DictConfig,
    outlier_model_cfg: DictConfig,
    experiment_name: str,
    run_name: str,
    task: str = "outlier_detection",
    model_name: str = "TimesNet",
) -> tuple[dict, torch.nn.Module]:
    """
    Run TimesNet-based outlier detection on PLR data.

    TimesNet uses temporal 2D variations for time series analysis.
    This wrapper handles data preparation, training, and MLflow logging.

    Parameters
    ----------
    df : pl.DataFrame
        Input PLR data.
    cfg : DictConfig
        Full Hydra configuration.
    outlier_model_cfg : DictConfig
        TimesNet model configuration.
    experiment_name : str
        MLflow experiment name.
    run_name : str
        MLflow run name.
    task : str, optional
        Task name. Default is 'outlier_detection'.
    model_name : str, optional
        Model name for logging. Default is 'TimesNet'.

    Returns
    -------
    tuple
        A tuple containing:
        - outlier_artifacts : dict
            Dictionary with metrics, predictions, and metadata.
        - model : torch.nn.Module
            The trained TimesNet model.

    References
    ----------
    "4.3 Anomaly Detection"
    https://github.com/thuml/Time-Series-Library/blob/main/tutorial/TimesNet_tutorial.ipynb
    """
    data_dict = convert_df_to_dict(data_df=df, cfg=cfg)
    dataloaders = init_torch_training(
        data_dict,
        cfg,
        outlier_model_cfg,
        task=task,
        run_name=run_name,
        model_name=model_name,
        create_outlier_dataloaders=True,
    )

    # Model
    model = Model(configs=outlier_model_cfg["PARAMS"])
    model.to(cfg["DEVICE"]["device"])

    # Log params
    log_mlflow_params(model, outlier_model_cfg)

    # Train
    outlier_artifacts, model = timesnet_train(
        model=model,
        device=cfg["DEVICE"]["device"],
        outlier_model_cfg=outlier_model_cfg,
        cfg=cfg,
        run_name=run_name,
        experiment_name=experiment_name,
        train_loader=dataloaders["train"],
        test_loader=dataloaders["test"],
        outlier_train_loader=dataloaders["outlier_train"],
        outlier_test_loader=dataloaders["outlier_test"],
        recon_on_outliers=False,
    )

    # Log metrics to MLflow
    log_timesnet_mlflow_metrics(
        metrics=outlier_artifacts["metrics"],
        results_best=outlier_artifacts["results_best"],
        best_epoch=outlier_artifacts["metadata"]["best_epoch"],
    )

    # Rearrange these a bit so that the output would roughly match MOMENT output
    # outlier_results = {0: outlier_artifacts} # add a dummy epoch?

    # Log the artifacts as well (pickled)
    log_outlier_artifacts_dict(model_name, outlier_artifacts, cfg, checks_on=False)

    # Log the model (seems not so useful, big in size, and quick to re-train, so saving some disk space and
    # not saving) implement model saving here otherwise
    mlflow.end_run()

    return outlier_artifacts, model

Logging

log_anomaly_detection

log_anomaly_metrics

log_anomaly_metrics(metrics: dict, cfg: DictConfig)

Log outlier detection metrics to MLflow.

PARAMETER DESCRIPTION
metrics

Dictionary of metrics per split containing 'scalars' with 'global' values.

TYPE: dict

cfg

Hydra configuration (currently unused).

TYPE: DictConfig

Notes

For array values (confidence intervals), logs separate _lo and _hi metrics.

Source code in src/anomaly_detection/log_anomaly_detection.py
def log_anomaly_metrics(metrics: dict, cfg: DictConfig):
    """
    Log outlier detection metrics to MLflow.

    Parameters
    ----------
    metrics : dict
        Dictionary of metrics per split containing 'scalars' with 'global' values.
    cfg : DictConfig
        Hydra configuration (currently unused).

    Notes
    -----
    For array values (confidence intervals), logs separate _lo and _hi metrics.
    """
    logger.info("Logging Outlier detection metrics to MLflow")
    for split, split_metrics in metrics.items():
        if "global" in split_metrics["scalars"]:
            global_scalars = split_metrics["scalars"]["global"]
            for scalar_key, value in global_scalars.items():
                if value is not None:
                    metric_key = f"{split}/{scalar_key}"
                    if isinstance(value, np.ndarray):
                        mlflow.log_metric(metric_key + "_lo", value[0])
                        mlflow.log_metric(metric_key + "_hi", value[1])
                    else:
                        mlflow.log_metric(metric_key, value)

log_losses

log_losses(
    best_outlier_results: dict,
    cfg: DictConfig,
    best_epoch: int,
)

Log reconstruction losses to MLflow.

PARAMETER DESCRIPTION
best_outlier_results

Results from the best epoch containing per-split loss arrays.

TYPE: dict

cfg

Hydra configuration (currently unused).

TYPE: DictConfig

best_epoch

Index of the best training epoch.

TYPE: int

RAISES DESCRIPTION
ValueError

If no losses are found for a split.

Source code in src/anomaly_detection/log_anomaly_detection.py
def log_losses(best_outlier_results: dict, cfg: DictConfig, best_epoch: int):
    """
    Log reconstruction losses to MLflow.

    Parameters
    ----------
    best_outlier_results : dict
        Results from the best epoch containing per-split loss arrays.
    cfg : DictConfig
        Hydra configuration (currently unused).
    best_epoch : int
        Index of the best training epoch.

    Raises
    ------
    ValueError
        If no losses are found for a split.
    """
    logger.info("Logging Outlier detection losses (MSE) to MLflow")
    for split, split_metrics in best_outlier_results.items():
        flat_arrays = split_metrics["results_dict"]["metrics"]["arrays_flat"]
        if "losses" in flat_arrays.keys():
            metric_key = f"{split}/{'loss'}"
            array = flat_arrays["losses"]
            best_loss = np.mean(array)
            assert isinstance(best_loss, float), (
                f"best_loss is not a float: {best_loss}"
            )
            mlflow.log_metric(metric_key, best_loss)

        else:
            logger.error(f"No losses found for split {split}, nothing logged!")
            raise ValueError(f"No losses found for split {split}, nothing logged!")

log_anomaly_predictions

log_anomaly_predictions(
    model_name: str,
    preds: dict,
    cfg: DictConfig,
    transpose: bool = True,
)

Log outlier detection predictions to MLflow as CSV files.

PARAMETER DESCRIPTION
model_name

Name of the model for filename generation.

TYPE: str

preds

Dictionary of predictions per split containing 'arrays'.

TYPE: dict

cfg

Hydra configuration (currently unused).

TYPE: DictConfig

transpose

Whether to transpose arrays before saving. Default is True.

TYPE: bool DEFAULT: True

Source code in src/anomaly_detection/log_anomaly_detection.py
def log_anomaly_predictions(
    model_name: str, preds: dict, cfg: DictConfig, transpose: bool = True
):
    """
    Log outlier detection predictions to MLflow as CSV files.

    Parameters
    ----------
    model_name : str
        Name of the model for filename generation.
    preds : dict
        Dictionary of predictions per split containing 'arrays'.
    cfg : DictConfig
        Hydra configuration (currently unused).
    transpose : bool, optional
        Whether to transpose arrays before saving. Default is True.
    """
    logger.info("Logging Outlier detection predictions to MLflow as CSV")
    for split, split_preds in preds.items():
        artifacts_dir = Path(get_artifacts_dir(service_name="outlier_detection"))
        for key, array in split_preds["arrays"].items():
            csv_path = artifacts_dir / get_outlier_csv_name(model_name, split, key)
            if array is not None:
                if transpose:
                    array = array.T
                if csv_path.exists():
                    csv_path.unlink()
                save_array_as_csv(array=array, path=str(csv_path))
                mlflow.log_artifact(str(csv_path), "outlier_detection")

check_debug_n_subjects_outlier_artifacts

check_debug_n_subjects_outlier_artifacts(
    outlier_artifacts, cfg
)

Validate subject count in debug mode.

PARAMETER DESCRIPTION
outlier_artifacts

Outlier detection artifacts to validate.

TYPE: dict

cfg

Hydra configuration with debug settings.

TYPE: DictConfig

RAISES DESCRIPTION
ValueError

If subject count doesn't match expected debug count.

Source code in src/anomaly_detection/log_anomaly_detection.py
def check_debug_n_subjects_outlier_artifacts(outlier_artifacts, cfg):
    """
    Validate subject count in debug mode.

    Parameters
    ----------
    outlier_artifacts : dict
        Outlier detection artifacts to validate.
    cfg : DictConfig
        Hydra configuration with debug settings.

    Raises
    ------
    ValueError
        If subject count doesn't match expected debug count.
    """
    no_subjects_out = get_no_subjects_in_outlier_artifacts(outlier_artifacts)
    if cfg["EXPERIMENT"]["debug"]:
        if no_subjects_out != cfg["DEBUG"]["debug_n_subjects"] * 2:
            logger.error(
                "Number of subjects in the outlier artifacts ({}) does not match the "
                "number of subjects in the "
                "experiment ({})".format(
                    no_subjects_out, cfg["EXPERIMENT"]["no_subjects"]
                )
            )
            raise ValueError(
                "Number of subjects in the outlier artifacts ({}) does not match the "
                "number of subjects in the "
                "experiment ({})".format(
                    no_subjects_out, cfg["EXPERIMENT"]["no_subjects"]
                )
            )

log_outlier_artifacts_dict

log_outlier_artifacts_dict(
    model_name,
    outlier_artifacts,
    cfg,
    checks_on: bool = True,
)

Save and log outlier detection artifacts to MLflow.

PARAMETER DESCRIPTION
model_name

Name of the model for filename generation.

TYPE: str

outlier_artifacts

Complete outlier detection results to save.

TYPE: dict

cfg

Hydra configuration.

TYPE: DictConfig

checks_on

Whether to run validation checks. Default is True.

TYPE: bool DEFAULT: True

Source code in src/anomaly_detection/log_anomaly_detection.py
def log_outlier_artifacts_dict(
    model_name, outlier_artifacts, cfg, checks_on: bool = True
):
    """
    Save and log outlier detection artifacts to MLflow.

    Parameters
    ----------
    model_name : str
        Name of the model for filename generation.
    outlier_artifacts : dict
        Complete outlier detection results to save.
    cfg : DictConfig
        Hydra configuration.
    checks_on : bool, optional
        Whether to run validation checks. Default is True.
    """
    artifact_dir = Path(get_artifacts_dir(service_name="outlier_detection"))
    results_path = artifact_dir / get_outlier_pickle_name(model_name)
    logger.debug(
        "Saving the imputation results as a pickled artifact: {}".format(results_path)
    )
    if checks_on:
        # these for Moment basically
        check_outlier_detection_artifact(outlier_artifacts)
        # check_debug_n_subjects_outlier_artifacts(outlier_artifacts, cfg)
    save_results_dict(
        results_dict=outlier_artifacts,
        results_path=str(results_path),
        name="outlier_detection",
    )
    logger.debug("And logging to MLflow as an artifact")
    mlflow.log_artifact(str(results_path), "outlier_detection")

log_anomaly_detection_to_mlflow

log_anomaly_detection_to_mlflow(
    model_name: str,
    run_name: str,
    outlier_artifacts: dict,
    cfg: DictConfig,
)

Log complete anomaly detection results to MLflow.

Orchestrates logging of metrics, losses, predictions, and artifacts.

PARAMETER DESCRIPTION
model_name

Name of the model.

TYPE: str

run_name

MLflow run name.

TYPE: str

outlier_artifacts

Complete outlier detection results containing: - 'metadata': with 'best_epoch' - 'outlier_results': per-epoch results - 'metrics': evaluation metrics - 'preds': predictions

TYPE: dict

cfg

Hydra configuration.

TYPE: DictConfig

Notes

Ends the MLflow run after logging all artifacts.

Source code in src/anomaly_detection/log_anomaly_detection.py
def log_anomaly_detection_to_mlflow(
    model_name: str, run_name: str, outlier_artifacts: dict, cfg: DictConfig
):
    """
    Log complete anomaly detection results to MLflow.

    Orchestrates logging of metrics, losses, predictions, and artifacts.

    Parameters
    ----------
    model_name : str
        Name of the model.
    run_name : str
        MLflow run name.
    outlier_artifacts : dict
        Complete outlier detection results containing:
        - 'metadata': with 'best_epoch'
        - 'outlier_results': per-epoch results
        - 'metrics': evaluation metrics
        - 'preds': predictions
    cfg : DictConfig
        Hydra configuration.

    Notes
    -----
    Ends the MLflow run after logging all artifacts.
    """
    best_epoch = outlier_artifacts["metadata"]["best_epoch"]
    if best_epoch is not None:
        # finetune
        best_outlier_results = outlier_artifacts["outlier_results"][best_epoch]
    else:
        # zero-shot
        first_key = list(outlier_artifacts["outlier_results"].keys())[0]
        best_outlier_results = outlier_artifacts["outlier_results"][first_key]

    log_anomaly_metrics(metrics=outlier_artifacts["metrics"], cfg=cfg)
    log_losses(
        best_outlier_results=best_outlier_results,
        cfg=cfg,
        best_epoch=best_epoch,
    )
    log_anomaly_predictions(model_name, preds=outlier_artifacts["preds"], cfg=cfg)
    log_outlier_artifacts_dict(model_name, outlier_artifacts, cfg)
    # TODO! https://medium.com/@ij_82957/how-to-reduce-mlflow-logging-overhead-by-using-log-batch-b61301cc540f
    #  you could loop through the dict for each epoch and get the average loss of the batch and have the training
    #  history, or just Tensorboard?
    mlflow.end_run()