Skip to content

featurization

Feature extraction from PLR signals.

Overview

This module extracts handcrafted physiological features:

  • Amplitude bins (histogram features)
  • Latency features (timing)
  • Velocity features
  • PIPR (Post-Illumination Pupil Response)

Main Entry Point

flow_featurization

flow_featurization

flow_featurization(cfg: DictConfig) -> None

Main featurization flow orchestrating handcrafted and embedding features.

Initializes MLflow experiment, retrieves data sources from imputation, and runs both handcrafted featurization and optionally embedding extraction.

PARAMETER DESCRIPTION
cfg

Configuration dictionary containing PREFECT, MLFLOW, and other settings.

TYPE: DictConfig

Source code in src/featurization/flow_featurization.py
def flow_featurization(cfg: DictConfig) -> None:
    """Main featurization flow orchestrating handcrafted and embedding features.

    Initializes MLflow experiment, retrieves data sources from imputation,
    and runs both handcrafted featurization and optionally embedding extraction.

    Parameters
    ----------
    cfg : DictConfig
        Configuration dictionary containing PREFECT, MLFLOW, and other settings.
    """
    experiment_name = experiment_name_wrapper(
        experiment_name=cfg["PREFECT"]["FLOW_NAMES"]["FEATURIZATION"], cfg=cfg
    )
    logger.info("FLOW | Name: {}".format(experiment_name))
    logger.info("=====================")
    prev_experiment_name = experiment_name_wrapper(
        experiment_name=cfg["PREFECT"]["FLOW_NAMES"]["IMPUTATION"], cfg=cfg
    )

    # Initialize the MLflow experiment
    init_mlflow_experiment(mlflow_cfg=cfg["MLFLOW"], experiment_name=experiment_name)

    # Get the data sources (from imputation, and from original ground truth DuckDB database)
    sources = define_sources_for_flow(
        cfg=cfg, prev_experiment_name=prev_experiment_name, task="imputation"
    )

    # Get the handcrafed features
    flow_handcrafted_featurization(
        cfg=cfg,
        sources=sources,
        experiment_name=experiment_name,
        prev_experiment_name=prev_experiment_name,
    )

    # Get the "deep features" as in embeddings e.g. from foundation moodels
    compute_embeddings = False  # not so useful, so quick'n'dirty skip
    if compute_embeddings:
        flow_embedding(
            cfg=cfg,
            sources=sources,
            experiment_name=experiment_name,
            prev_experiment_name=prev_experiment_name,
        )

PLR Featurization

featurize_PLR

featurize_subject

featurize_subject(
    subject_dict: dict,
    subject_code: str,
    cfg: DictConfig,
    feature_cfg: DictConfig,
    i: int,
    feature_col: str = "X",
)

Compute all features for a single subject.

Extracts features for each light color and combines with metadata.

PARAMETER DESCRIPTION
subject_dict

Dictionary containing subject data arrays.

TYPE: dict

subject_code

Unique subject identifier.

TYPE: str

cfg

Main configuration dictionary.

TYPE: DictConfig

feature_cfg

Feature-specific configuration.

TYPE: DictConfig

i

Subject index in the dataset.

TYPE: int

feature_col

Column name for feature values, by default 'X'.

TYPE: str DEFAULT: 'X'

RETURNS DESCRIPTION
dict

Dictionary with features per color and metadata.

Source code in src/featurization/featurize_PLR.py
def featurize_subject(
    subject_dict: dict,
    subject_code: str,
    cfg: DictConfig,
    feature_cfg: DictConfig,
    i: int,
    feature_col: str = "X",
):
    """Compute all features for a single subject.

    Extracts features for each light color and combines with metadata.

    Parameters
    ----------
    subject_dict : dict
        Dictionary containing subject data arrays.
    subject_code : str
        Unique subject identifier.
    cfg : DictConfig
        Main configuration dictionary.
    feature_cfg : DictConfig
        Feature-specific configuration.
    i : int
        Subject index in the dataset.
    feature_col : str, optional
        Column name for feature values, by default 'X'.

    Returns
    -------
    dict
        Dictionary with features per color and metadata.
    """
    features = {}

    # Convert to Polars dataframe
    df_subject: pl.DataFrame = convert_subject_dict_of_arrays_to_df(subject_dict)
    light_timings = get_light_stimuli_timings(df_subject)
    for color in light_timings.keys():
        features[color] = get_features_per_color(
            df_subject,
            light_timing=light_timings[color],
            bin_cfg=feature_cfg["FEATURES"],
            color=color,
            feature_col=feature_col,
        )

    # check that colors are different
    check_that_features_are_not_the_same_for_colors(features)

    # add metadata to the dataframe
    df_subject_metadata = convert_subject_dict_of_arrays_to_df(
        subject_dict, wildcard_categories=["metadata", "labels"]
    )
    features["metadata"] = df_subject_metadata

    return features

compute_features_from_dict

compute_features_from_dict(
    split_dict: dict,
    split: str,
    preprocess_dict: dict,
    feature_cfg: DictConfig,
    cfg: DictConfig,
)

Compute features for all subjects in a data split.

Destandardizes data if needed, then iterates through subjects to compute hand-crafted PLR features.

PARAMETER DESCRIPTION
split_dict

Dictionary containing split data with 'data' and 'X' arrays.

TYPE: dict

split

Split name (e.g., 'train', 'test').

TYPE: str

preprocess_dict

Preprocessing statistics for destandardization.

TYPE: dict

feature_cfg

Feature configuration.

TYPE: DictConfig

cfg

Main configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Dictionary keyed by subject_code containing computed features.

Source code in src/featurization/featurize_PLR.py
def compute_features_from_dict(
    split_dict: dict,
    split: str,
    preprocess_dict: dict,
    feature_cfg: DictConfig,
    cfg: DictConfig,
):
    """Compute features for all subjects in a data split.

    Destandardizes data if needed, then iterates through subjects to
    compute hand-crafted PLR features.

    Parameters
    ----------
    split_dict : dict
        Dictionary containing split data with 'data' and 'X' arrays.
    split : str
        Split name (e.g., 'train', 'test').
    preprocess_dict : dict
        Preprocessing statistics for destandardization.
    feature_cfg : DictConfig
        Feature configuration.
    cfg : DictConfig
        Main configuration dictionary.

    Returns
    -------
    dict
        Dictionary keyed by subject_code containing computed features.
    """
    # Destandardize the data (if needed)
    split_dict = destandardize_the_data_dict_for_featurization(
        split, split_dict, preprocess_dict, cfg
    )

    no_of_subjects = split_dict["data"]["X"].shape[0]
    features_per_code = {}
    for i in tqdm(
        range(no_of_subjects), total=no_of_subjects, desc="Featurizing the PLR subjects"
    ):
        # server: Featurizing the PLR subjects: 100%|██████████| 152/152 [02:03<00:00,  1.23it/s] why so slow?
        # laptop: Featurizing the PLR subjects: 100%|██████████| 16/16 [00:01<00:00, 11.18it/s]
        # Keeps just the one subject in each of the arrays in the "data_dict"
        # TODO! Make this faster, the pl.DataFrame creation per subject slowing thins down
        subject_dict = get_subject_dict_for_featurization(split_dict, i, cfg)
        subject_code = subject_dict["metadata"]["subject_code"][0]

        # Compute the features for the subject
        features_per_code[subject_code] = featurize_subject(
            subject_dict,
            subject_code=subject_code,
            cfg=cfg,
            feature_cfg=feature_cfg,
            i=i,
        )

    return features_per_code

get_handcrafted_PLR_features

get_handcrafted_PLR_features(
    source_data: dict,
    cfg: DictConfig,
    feature_cfg: DictConfig,
)

Extract handcrafted PLR features from source data.

Processes all splits, computes features per subject, and flattens the nested structure into dataframes.

PARAMETER DESCRIPTION
source_data

Source data dictionary with 'df', 'preprocess', and 'mlflow' keys.

TYPE: dict

cfg

Main configuration dictionary.

TYPE: DictConfig

feature_cfg

Feature-specific configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Dictionary with 'data' (dataframes per split) and 'mlflow_run'.

Source code in src/featurization/featurize_PLR.py
def get_handcrafted_PLR_features(
    source_data: dict, cfg: DictConfig, feature_cfg: DictConfig
):
    """Extract handcrafted PLR features from source data.

    Processes all splits, computes features per subject, and flattens
    the nested structure into dataframes.

    Parameters
    ----------
    source_data : dict
        Source data dictionary with 'df', 'preprocess', and 'mlflow' keys.
    cfg : DictConfig
        Main configuration dictionary.
    feature_cfg : DictConfig
        Feature-specific configuration.

    Returns
    -------
    dict
        Dictionary with 'data' (dataframes per split) and 'mlflow_run'.
    """
    features_nested = {}
    preprocess_dict = source_data["preprocess"]
    for split, split_dict in deepcopy(source_data["df"]).items():
        features_nested[split] = compute_features_from_dict(
            split_dict, split, preprocess_dict, feature_cfg, cfg
        )

    # Subject-wise dicts flattened to a dataframe
    features = flatten_dict_to_dataframe(
        features_nested=features_nested, mlflow_series=source_data["mlflow"], cfg=cfg
    )

    return features

featurization_script

featurization_script(
    experiment_name: str,
    prev_experiment_name: str,
    cfg: DictConfig,
    source_name: str,
    source_data: dict,
    featurization_method: str,
    feature_cfg: DictConfig,
    run_name: str,
)

Execute the featurization pipeline for a single source.

Runs featurization with MLflow tracking, logging parameters, metrics, and artifacts. Supports handcrafted features and embeddings.

PARAMETER DESCRIPTION
experiment_name

MLflow experiment name for featurization.

TYPE: str

prev_experiment_name

Previous experiment name (imputation).

TYPE: str

cfg

Main configuration dictionary.

TYPE: DictConfig

source_name

Name of the data source being featurized.

TYPE: str

source_data

Source data dictionary.

TYPE: dict

featurization_method

Method name ('handcrafted_features' or 'embeddings').

TYPE: str

feature_cfg

Feature configuration.

TYPE: DictConfig

run_name

MLflow run name.

TYPE: str

Source code in src/featurization/featurize_PLR.py
def featurization_script(
    experiment_name: str,
    prev_experiment_name: str,
    cfg: DictConfig,
    source_name: str,
    source_data: dict,
    featurization_method: str,
    feature_cfg: DictConfig,
    run_name: str,
):
    """Execute the featurization pipeline for a single source.

    Runs featurization with MLflow tracking, logging parameters, metrics,
    and artifacts. Supports handcrafted features and embeddings.

    Parameters
    ----------
    experiment_name : str
        MLflow experiment name for featurization.
    prev_experiment_name : str
        Previous experiment name (imputation).
    cfg : DictConfig
        Main configuration dictionary.
    source_name : str
        Name of the data source being featurized.
    source_data : dict
        Source data dictionary.
    featurization_method : str
        Method name ('handcrafted_features' or 'embeddings').
    feature_cfg : DictConfig
        Feature configuration.
    run_name : str
        MLflow run name.
    """
    if if_refeaturize_from_imputation(
        run_name=run_name, experiment_name=experiment_name, cfg=cfg
    ):
        with mlflow.start_run(run_name=run_name):
            # Log params and metrics to MLflow
            featurization_mlflow_metrics_and_params(
                mlflow_run=source_data["mlflow"], source_name=source_name, cfg=cfg
            )

            # Task) Featurize the data
            if (
                feature_cfg["FEATURES_METADATA"]["feature_method"]
                == "handcrafted_features"
            ):
                features = get_handcrafted_PLR_features(
                    source_data=source_data, cfg=cfg, feature_cfg=feature_cfg
                )
            elif feature_cfg["FEATURES_METADATA"]["feature_method"] == "embeddings":
                logger.error("Embeddings not implemented yet")
                raise NotImplementedError("Embeddings not implemented yet")
                # features = get_PLR_embeddings(
                #     source_data=source_data, cfg=cfg, feature_cfg=feature_cfg
                # )
            else:
                logger.error("Unknown feature method")
                raise NotImplementedError("Unknown feature method")

            # Task) Log to MLflow
            export_features_to_mlflow(
                features=features,
                run_name=run_name,
                cfg=cfg,
            )

            # TODO! If you like to use a Feature Store, you could implement it here:
            #  https://www.snowflake.com/guides/what-feature-store-machine-learning/
            #  https://github.com/awesome-mlops/awesome-feature-store

            # Task) Visualize the features
            mlflow_info = get_mlflow_info()
            visualize_features_of_all_sources(
                features=features, mlflow_infos=mlflow_info, cfg=cfg
            )

            mlflow.end_run()

    else:
        # You could put the visualization here, and read the results from MLflow
        logger.info(
            "The imputation results have been already featurized, skipping the featurization step"
        )

featurizer_PLR_subject

nan_auc

nan_auc(x: ndarray, y: ndarray, method: str = '') -> float

Compute AUC while handling NaN values.

PARAMETER DESCRIPTION
x

X values for AUC computation.

TYPE: array - like

y

Y values for AUC computation.

TYPE: array - like

method

Method for handling NaNs, by default ''.

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
float

Computed AUC score.

RAISES DESCRIPTION
NotImplementedError

This function is not yet implemented.

Source code in src/featurization/featurizer_PLR_subject.py
def nan_auc(x: np.ndarray, y: np.ndarray, method: str = "") -> float:
    """Compute AUC while handling NaN values.

    Parameters
    ----------
    x : array-like
        X values for AUC computation.
    y : array-like
        Y values for AUC computation.
    method : str, optional
        Method for handling NaNs, by default ''.

    Returns
    -------
    float
        Computed AUC score.

    Raises
    ------
    NotImplementedError
        This function is not yet implemented.
    """
    raise NotImplementedError

compute_AUC

compute_AUC(
    y: ndarray, fps: int = 30, return_abs_AUC: bool = False
) -> float

Compute area under the curve for a time series.

PARAMETER DESCRIPTION
y

Y values (e.g., pupil size measurements).

TYPE: array - like

fps

Frames per second for time axis calculation, by default 30.

TYPE: int DEFAULT: 30

return_abs_AUC

If True, return absolute value of AUC, by default False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
float

AUC value, or NaN if y contains NaN values.

Source code in src/featurization/featurizer_PLR_subject.py
def compute_AUC(y: np.ndarray, fps: int = 30, return_abs_AUC: bool = False) -> float:
    """Compute area under the curve for a time series.

    Parameters
    ----------
    y : array-like
        Y values (e.g., pupil size measurements).
    fps : int, optional
        Frames per second for time axis calculation, by default 30.
    return_abs_AUC : bool, optional
        If True, return absolute value of AUC, by default False.

    Returns
    -------
    float
        AUC value, or NaN if y contains NaN values.
    """
    x = np.linspace(0, len(y) - 1, len(y)) / fps
    if np.any(np.isnan(y)):
        # what to do with missing values?
        # the Raw PLR contains missing values giving NaNs in the AUC vanilla auc does not handle NaNs
        auc_score = np.nan  # nan_auc(x, y)
        if return_abs_AUC:
            return abs(auc_score)
    else:
        if return_abs_AUC:
            return abs(auc(x, y))
        else:
            return auc(x, y)

compute_feature

compute_feature(
    feature_samples: DataFrame,
    feature: str,
    feature_params: dict[str, Any],
    feature_col: str = "imputation_mean",
) -> dict[str, Any]

Compute a single feature from sampled time points.

Dispatches to amplitude or timing feature computation based on feature_params['measure'].

PARAMETER DESCRIPTION
feature_samples

Dataframe with time points within the feature window.

TYPE: DataFrame

feature

Feature name being computed.

TYPE: str

feature_params

Feature parameters including 'measure' and 'stat'.

TYPE: dict

feature_col

Column name for feature values, by default 'imputation_mean'.

TYPE: str DEFAULT: 'imputation_mean'

RETURNS DESCRIPTION
dict

Feature dictionary with 'value', 'std', 'ci_pos', and 'ci_neg'.

RAISES DESCRIPTION
NotImplementedError

If feature measure type is not 'amplitude' or 'timing'.

Source code in src/featurization/featurizer_PLR_subject.py
def compute_feature(
    feature_samples: pl.DataFrame,
    feature: str,
    feature_params: dict[str, Any],
    feature_col: str = "imputation_mean",
) -> dict[str, Any]:
    """Compute a single feature from sampled time points.

    Dispatches to amplitude or timing feature computation based on
    feature_params['measure'].

    Parameters
    ----------
    feature_samples : pl.DataFrame
        Dataframe with time points within the feature window.
    feature : str
        Feature name being computed.
    feature_params : dict
        Feature parameters including 'measure' and 'stat'.
    feature_col : str, optional
        Column name for feature values, by default 'imputation_mean'.

    Returns
    -------
    dict
        Feature dictionary with 'value', 'std', 'ci_pos', and 'ci_neg'.

    Raises
    ------
    NotImplementedError
        If feature measure type is not 'amplitude' or 'timing'.
    """

    def get_amplitude_feature(
        feature_samples: pl.DataFrame,
        feature: str,
        feature_params: dict[str, Any],
        feature_col: str,
    ) -> dict[str, Any]:
        y = feature_samples[feature_col].to_numpy()
        if "CI_pos" not in feature_samples.columns:
            logger.error("No confidence interval columns found in the feature samples")
            logger.error("Returning None for the confidence intervals")
            ci_pos = None
            ci_neg = None
        else:
            # Atm no imputation method actually estimates the CI, so we just return None
            # TODO! ensemble imputation would have this
            ci_pos = None
            ci_neg = None
            if np.isnan(feature_samples["CI_pos"].to_numpy()).all():
                ci_pos = None
            if np.isnan(feature_samples["CI_neg"].to_numpy()).all():
                ci_neg = None

        if feature_params["stat"] == "min":
            feature_dict = {
                "value": np.nanmin(y),
                "std": np.nanstd(y),
                "ci_pos": ci_pos,
                "ci_neg": ci_neg,
            }
        elif feature_params["stat"] == "max":
            feature_dict = {
                "value": np.nanmin(y),
                "std": np.nanstd(y),
                "ci_pos": ci_pos,
                "ci_neg": ci_neg,
            }
        elif feature_params["stat"] == "mean":
            feature_dict = {
                "value": np.nanmean(y),
                "std": np.nanstd(y),
                "ci_pos": ci_pos,
                "ci_neg": ci_neg,
            }
        elif feature_params["stat"] == "median":
            feature_dict = {
                "value": np.nanmedian(y),
                "std": np.nanstd(y),
                "ci_pos": ci_pos,
                "ci_neg": ci_neg,
            }
        elif feature_params["stat"] == "AUC":
            feature_dict = {
                "value": compute_AUC(y),
                "std": None,
                "ci_pos": ci_pos,
                "ci_neg": ci_neg,
            }
        else:
            logger.error("Unknown feature stat: {}".format(feature_params["stat"]))
            raise NotImplementedError(
                "Unknown feature stat: {}".format(feature_params["stat"])
            )

        return feature_dict

    def get_timing_feature(
        feature_samples: pl.DataFrame,
        feature: str,
        feature_params: dict[str, Any],
        feature_col: str,
    ) -> dict[str, Any]:
        t0 = feature_samples[0, "time"]
        min_time = get_top1_of_col(feature_samples, feature_col, descending=False).item(
            0, "time"
        )
        return {
            # TODO! Check why is this "value" is None?
            "value": min_time - t0,
            # TODO! This obviously is not None, as we have discretized frame rate, latency estimate not so great
            #  you can always think of some latency tricks as well if you feel like it?
            #  See e.g. Bergamin et al. (2003): "Latency of the Pupil Light Reflex:
            #  Sample Rate, Stimulus Intensity, and Variation in Normal Subjects" (Savitzky-Golay filter)
            #  https://doi.org/10.1167/iovs.02-0468
            #  And obviously, you could train a Neural ODE or something to reconstruct nowadays the PLR for
            #   higher temporal resolution to get better "synthetic" temporal resolution?
            "std": None,
            "ci_pos": None,
            "ci_neg": None,
        }

    if feature_params["measure"] == "amplitude":
        feature_dict = get_amplitude_feature(
            feature_samples, feature, feature_params, feature_col
        )
    elif feature_params["measure"] == "timing":
        feature_dict = get_timing_feature(
            feature_samples, feature, feature_params, feature_col
        )
    else:
        logger.error("Unknown feature measure: {}".format(feature_params["measure"]))
        raise NotImplementedError(
            "Unknown feature measure: {}".format(feature_params["measure"])
        )

    return feature_dict

get_individual_feature

get_individual_feature(
    df_subject: DataFrame,
    light_timing: dict[str, Any],
    feature_cfg: DictConfig,
    color: str,
    feature: str,
    feature_params: dict[str, Any],
    feature_col: str = "mean",
) -> Optional[dict[str, Any]]

Extract a single feature for a subject at a specific light color.

Converts relative timing to absolute, extracts samples within the time window, and computes the feature value.

PARAMETER DESCRIPTION
df_subject

Subject dataframe with time series data.

TYPE: DataFrame

light_timing

Light timing information with onset/offset times.

TYPE: dict

feature_cfg

Feature configuration.

TYPE: DictConfig

color

Light color ('Red' or 'Blue').

TYPE: str

feature

Feature name to compute.

TYPE: str

feature_params

Feature parameters with timing and statistic info.

TYPE: dict

feature_col

Column name for feature values, by default 'mean'.

TYPE: str DEFAULT: 'mean'

RETURNS DESCRIPTION
dict or None

Feature dictionary with value, std, and CI, or None on error.

RAISES DESCRIPTION
Exception

Re-raised if error occurs during feature extraction.

Source code in src/featurization/featurizer_PLR_subject.py
def get_individual_feature(
    df_subject: pl.DataFrame,
    light_timing: dict[str, Any],
    feature_cfg: DictConfig,
    color: str,
    feature: str,
    feature_params: dict[str, Any],
    feature_col: str = "mean",
) -> Optional[dict[str, Any]]:
    """Extract a single feature for a subject at a specific light color.

    Converts relative timing to absolute, extracts samples within the
    time window, and computes the feature value.

    Parameters
    ----------
    df_subject : pl.DataFrame
        Subject dataframe with time series data.
    light_timing : dict
        Light timing information with onset/offset times.
    feature_cfg : DictConfig
        Feature configuration.
    color : str
        Light color ('Red' or 'Blue').
    feature : str
        Feature name to compute.
    feature_params : dict
        Feature parameters with timing and statistic info.
    feature_col : str, optional
        Column name for feature values, by default 'mean'.

    Returns
    -------
    dict or None
        Feature dictionary with value, std, and CI, or None on error.

    Raises
    ------
    Exception
        Re-raised if error occurs during feature extraction.
    """
    # subject_code = df_subject["subject_code"].to_numpy()[0]
    # Get the absolute timing from the recording
    feature_params_abs = convert_relative_timing_to_absolute_timing(
        light_timing, feature_params, color, feature, feature_cfg
    )

    # Get the time points within the bin
    try:
        feature_samples = get_feature_samples(
            df_subject, feature_params_abs, feature=feature
        )
        # When you have the samples, compute the feature (amplitude or timing) with desired stat (min, max, mean, median)
        feature_dict = compute_feature(
            feature_samples, feature, feature_params_abs, feature_col
        )
    except Exception as e:
        logger.error(
            "Error when getting the feature samples for feature {} and color {}: {}".format(
                feature, color, e
            )
        )
        raise e

    return feature_dict

get_features_per_color

get_features_per_color(
    df_subject: DataFrame,
    light_timing: dict[str, Any],
    bin_cfg: DictConfig,
    color: str,
    feature_col: str,
) -> dict[str, Optional[dict[str, Any]]]

Compute all configured features for a specific light color.

PARAMETER DESCRIPTION
df_subject

Subject dataframe with time series data.

TYPE: DataFrame

light_timing

Light timing information with onset/offset times.

TYPE: dict

bin_cfg

Configuration defining which features to compute.

TYPE: DictConfig

color

Light color ('Red' or 'Blue').

TYPE: str

feature_col

Column name for feature values.

TYPE: str

RETURNS DESCRIPTION
dict

Dictionary keyed by feature name containing feature dictionaries.

Source code in src/featurization/featurizer_PLR_subject.py
def get_features_per_color(
    df_subject: pl.DataFrame,
    light_timing: dict[str, Any],
    bin_cfg: DictConfig,
    color: str,
    feature_col: str,
) -> dict[str, Optional[dict[str, Any]]]:
    """Compute all configured features for a specific light color.

    Parameters
    ----------
    df_subject : pl.DataFrame
        Subject dataframe with time series data.
    light_timing : dict
        Light timing information with onset/offset times.
    bin_cfg : DictConfig
        Configuration defining which features to compute.
    color : str
        Light color ('Red' or 'Blue').
    feature_col : str
        Column name for feature values.

    Returns
    -------
    dict
        Dictionary keyed by feature name containing feature dictionaries.
    """
    features = {}
    for feature in bin_cfg.keys():
        features[feature] = get_individual_feature(
            df_subject,
            light_timing,
            bin_cfg,
            color,
            feature,
            feature_params=deepcopy(bin_cfg[feature]),
            feature_col=feature_col,
        )

    return features

check_that_features_are_not_the_same_for_colors

check_that_features_are_not_the_same_for_colors(
    features: dict[str, dict[str, dict[str, Any]]],
) -> None

Validate that red and blue light features are different.

Ensures that features computed for different light colors are not identical, which would indicate a data processing error.

PARAMETER DESCRIPTION
features

Dictionary keyed by color containing feature dictionaries.

TYPE: dict

RAISES DESCRIPTION
ValueError

If feature values are identical for both colors.

Source code in src/featurization/featurizer_PLR_subject.py
def check_that_features_are_not_the_same_for_colors(
    features: dict[str, dict[str, dict[str, Any]]],
) -> None:
    """Validate that red and blue light features are different.

    Ensures that features computed for different light colors are not
    identical, which would indicate a data processing error.

    Parameters
    ----------
    features : dict
        Dictionary keyed by color containing feature dictionaries.

    Raises
    ------
    ValueError
        If feature values are identical for both colors.
    """

    def compare_lists(list1: list[Any], list2: list[Any]) -> bool:
        return all([list1[i] != list2[i] for i in range(len(list1))])

    vals = {}
    colors = list(features.keys())
    for color in colors:
        vals[color] = []
        for feature in features[color].keys():
            vals[color].append(features[color][feature]["value"])

    lists_ok = compare_lists(colors[0], colors[1])
    if not lists_ok:
        logger.error("The feature values for the colors are the same!")
        logger.error("Unlikely that this would happen without a glitch in the data?")
        raise ValueError("The feature values for the colors are the same!")

Feature Utilities

feature_utils

if_refeaturize

if_refeaturize(cfg: DictConfig) -> bool

Determine whether featurization should be re-run.

Checks if re-featurization is forced by config or if no existing features are found on disk.

PARAMETER DESCRIPTION
cfg

Configuration dictionary containing PLR_FEATURIZATION settings.

TYPE: DictConfig

RETURNS DESCRIPTION
bool

True if featurization should be performed, False if existing features should be loaded from disk.

Source code in src/featurization/feature_utils.py
def if_refeaturize(cfg: DictConfig) -> bool:
    """Determine whether featurization should be re-run.

    Checks if re-featurization is forced by config or if no existing
    features are found on disk.

    Parameters
    ----------
    cfg : DictConfig
        Configuration dictionary containing PLR_FEATURIZATION settings.

    Returns
    -------
    bool
        True if featurization should be performed, False if existing
        features should be loaded from disk.
    """
    re_featurize = cfg["PLR_FEATURIZATION"]["re_featurize"]
    file_path = get_features_fpath(cfg)

    features_found = False
    if Path(file_path).exists():
        features_found = True

    if not re_featurize and features_found:
        logger.info("Reading precomputed features from the disk: {}".format(file_path))
        return False
    else:
        if re_featurize:
            logger.info("Recomputing the features (re_featurize is set to True)")
            return True
        else:
            logger.info(
                "Re-featurization is set to False, but no features found from the disk -> re-featurizing"
            )
            return True

combine_df_with_outputs

combine_df_with_outputs(
    df: DataFrame,
    data_dict: dict[str, ndarray],
    imputation: dict[str, Any],
    split: str,
    split_key: str,
    model_name: str,
) -> DataFrame

Combine dataframe with imputation outputs and standardized inputs.

PARAMETER DESCRIPTION
df

Input dataframe to combine with.

TYPE: DataFrame

data_dict

Dictionary containing standardized input data arrays.

TYPE: dict

imputation

Dictionary containing imputation results.

TYPE: dict

split

Data split identifier (e.g., 'train', 'test').

TYPE: str

split_key

Split key identifier (e.g., 'train_gt', 'train_raw').

TYPE: str

model_name

Name of the imputation model.

TYPE: str

RETURNS DESCRIPTION
DataFrame

Dataframe with imputation and standardized input columns added.

Source code in src/featurization/feature_utils.py
def combine_df_with_outputs(
    df: pl.DataFrame,
    data_dict: dict[str, np.ndarray],
    imputation: dict[str, Any],
    split: str,
    split_key: str,
    model_name: str,
) -> pl.DataFrame:
    """Combine dataframe with imputation outputs and standardized inputs.

    Parameters
    ----------
    df : pl.DataFrame
        Input dataframe to combine with.
    data_dict : dict
        Dictionary containing standardized input data arrays.
    imputation : dict
        Dictionary containing imputation results.
    split : str
        Data split identifier (e.g., 'train', 'test').
    split_key : str
        Split key identifier (e.g., 'train_gt', 'train_raw').
    model_name : str
        Name of the imputation model.

    Returns
    -------
    pl.DataFrame
        Dataframe with imputation and standardized input columns added.
    """
    df = combine_inputation_with_df(df, imputation)
    df = combine_standardized_inputs_with_df(df, data_dict)

    return df

combine_standardized_inputs_with_df

combine_standardized_inputs_with_df(
    df: DataFrame, data_dict: dict[str, ndarray]
) -> DataFrame

Add standardized input arrays as columns to a Polars dataframe.

PARAMETER DESCRIPTION
df

Input dataframe to add columns to.

TYPE: DataFrame

data_dict

Dictionary with keys as column names and values as numpy arrays.

TYPE: dict

RETURNS DESCRIPTION
DataFrame

Dataframe with new columns prefixed with 'standardized_'.

RAISES DESCRIPTION
AssertionError

If number of samples in array doesn't match dataframe rows.

Source code in src/featurization/feature_utils.py
def combine_standardized_inputs_with_df(
    df: pl.DataFrame, data_dict: dict[str, np.ndarray]
) -> pl.DataFrame:
    """Add standardized input arrays as columns to a Polars dataframe.

    Parameters
    ----------
    df : pl.DataFrame
        Input dataframe to add columns to.
    data_dict : dict
        Dictionary with keys as column names and values as numpy arrays.

    Returns
    -------
    pl.DataFrame
        Dataframe with new columns prefixed with 'standardized_'.

    Raises
    ------
    AssertionError
        If number of samples in array doesn't match dataframe rows.
    """
    logger.debug("Combining the standardized inputs with the dataframe")
    no_samples_in = df.shape[0]
    for data_key in data_dict.keys():
        array = data_dict[data_key]
        array_flattened = array.flatten()
        assert no_samples_in == array_flattened.shape[0], (
            "Number of samples in the imputation and the data should be the same"
        )
        df = df.with_columns(pl.lit(array_flattened).alias("standardized_" + data_key))

    return df

combine_inputation_with_df

combine_inputation_with_df(
    df: DataFrame, imputation: dict[str, Any]
) -> DataFrame

Add imputation results as columns to a Polars dataframe.

PARAMETER DESCRIPTION
df

Input dataframe to add columns to.

TYPE: DataFrame

imputation

Dictionary containing 'imputation' sub-dict with arrays and 'indicating_mask' for missingness.

TYPE: dict

RETURNS DESCRIPTION
DataFrame

Dataframe with imputation columns and missingness mask added.

RAISES DESCRIPTION
AssertionError

If number of samples in arrays doesn't match dataframe rows.

Source code in src/featurization/feature_utils.py
def combine_inputation_with_df(
    df: pl.DataFrame, imputation: dict[str, Any]
) -> pl.DataFrame:
    """Add imputation results as columns to a Polars dataframe.

    Parameters
    ----------
    df : pl.DataFrame
        Input dataframe to add columns to.
    imputation : dict
        Dictionary containing 'imputation' sub-dict with arrays and
        'indicating_mask' for missingness.

    Returns
    -------
    pl.DataFrame
        Dataframe with imputation columns and missingness mask added.

    Raises
    ------
    AssertionError
        If number of samples in arrays doesn't match dataframe rows.
    """
    logger.debug("Combining the imputation input with the dataframe")
    no_samples_in = df.shape[0]
    for imputation_stats_key in imputation["imputation"].keys():
        array = imputation["imputation"][imputation_stats_key]
        if "imputation" not in imputation_stats_key:
            # quick fix to add the imputation prefix if it not there so it is easier
            # see what the columns are all about
            key_out = "imputation_" + imputation_stats_key
        else:
            key_out = imputation_stats_key
        if array is None:
            df = df.with_columns(pl.lit(None).alias(key_out))
        else:
            array_flattened = array.flatten()
            assert no_samples_in == array_flattened.shape[0], (
                "Number of samples in the imputation and the data should be the same"
            )
            df = df.with_columns(pl.lit(array_flattened).alias(key_out))

    missingness_mask = imputation["indicating_mask"]
    missingness_flattened = missingness_mask.flatten()
    assert no_samples_in == missingness_flattened.shape[0], (
        "Number of samples in the imputation and the data should be the same"
    )
    df = df.with_columns(
        pl.lit(missingness_flattened).alias("imputation_missingness_mask")
    )

    return df

subjects_with_class_labels

subjects_with_class_labels(
    df: DataFrame, split: str
) -> DataFrame

Get unique subject codes that have class labels (glaucoma/control).

PARAMETER DESCRIPTION
df

Dataframe containing subject data with 'subject_code' and 'class_label'.

TYPE: DataFrame

split

Data split identifier (e.g., 'train', 'test').

TYPE: str

RETURNS DESCRIPTION
DataFrame

Sorted dataframe with unique subject codes that have non-null class labels.

Source code in src/featurization/feature_utils.py
def subjects_with_class_labels(df: pl.DataFrame, split: str) -> pl.DataFrame:
    """Get unique subject codes that have class labels (glaucoma/control).

    Parameters
    ----------
    df : pl.DataFrame
        Dataframe containing subject data with 'subject_code' and 'class_label'.
    split : str
        Data split identifier (e.g., 'train', 'test').

    Returns
    -------
    pl.DataFrame
        Sorted dataframe with unique subject codes that have non-null class labels.
    """
    unique_codes = get_unique_polars_rows(
        df,
        unique_col="subject_code",
        value_col="class_label",
        split=split,
        df_string="PLR",
    )

    unique_codes = unique_codes.sort("subject_code")

    # drop rows with no class_label from Polars dataframe
    unique_codes = unique_codes.filter(
        ~pl.all_horizontal(pl.col("class_label").is_null())
    )

    logger.info(
        "Number of subjects with a class label (glaucoma vs control) : {}".format(
            len(unique_codes)
        )
    )

    return unique_codes

pick_correct_split

pick_correct_split(
    data_dict: dict[str, Any],
    split: str,
    split_key: str,
    eval_results: dict[str, Any],
    model_name: str,
    standardize_stats: dict[str, dict[str, float]],
) -> dict[str, Any]

Select and destandardize evaluation results for the correct data split.

PARAMETER DESCRIPTION
data_dict

Dictionary containing preprocessed data.

TYPE: dict

split

Data split name (e.g., 'train', 'test').

TYPE: str

split_key

Split key containing 'gt' or 'raw' suffix.

TYPE: str

eval_results

Dictionary containing evaluation results keyed by split_key.

TYPE: dict

model_name

Name of the model being processed.

TYPE: str

standardize_stats

Dictionary with 'gt' and 'raw' sub-dicts containing mean/std.

TYPE: dict

RETURNS DESCRIPTION
dict

Destandardized results for the specified split key.

RAISES DESCRIPTION
ValueError

If split_key doesn't contain 'gt' or 'raw'.

Source code in src/featurization/feature_utils.py
def pick_correct_split(
    data_dict: dict[str, Any],
    split: str,
    split_key: str,
    eval_results: dict[str, Any],
    model_name: str,
    standardize_stats: dict[str, dict[str, float]],
) -> dict[str, Any]:
    """Select and destandardize evaluation results for the correct data split.

    Parameters
    ----------
    data_dict : dict
        Dictionary containing preprocessed data.
    split : str
        Data split name (e.g., 'train', 'test').
    split_key : str
        Split key containing 'gt' or 'raw' suffix.
    eval_results : dict
        Dictionary containing evaluation results keyed by split_key.
    model_name : str
        Name of the model being processed.
    standardize_stats : dict
        Dictionary with 'gt' and 'raw' sub-dicts containing mean/std.

    Returns
    -------
    dict
        Destandardized results for the specified split key.

    Raises
    ------
    ValueError
        If split_key doesn't contain 'gt' or 'raw'.
    """
    # pick the split (train, val)
    logger.info(
        "Split key = {}, Featurizing the results from model {}".format(
            split_key, model_name
        )
    )
    if "gt" in split_key:
        logger.debug("Standardizing the results from model {} using the gt stats")
        stdz_dict = standardize_stats["gt"]
    elif "raw" in split_key:
        logger.debug("Standardizing the results from model {} using the raw stats")
        stdz_dict = standardize_stats["raw"]
    else:
        logger.error(
            'How come you have split_key = "{}" here? Should be either gt or raw'.format(
                split_key
            )
        )
        raise ValueError(
            'How come you have split_key = "{}" here? Should be either gt or raw'.format(
                split_key
            )
        )

    split_key_results = eval_results[split_key]
    split_key_results["imputation"] = destandardize_dict(
        imputation_dict=split_key_results["imputation"],
        mean=stdz_dict["mean"],
        std=stdz_dict["std"],
    )

    return split_key_results

pick_input_data

pick_input_data(
    input_data: dict[str, ndarray],
    split: str,
    split_key: str,
    model_name: str,
) -> dict[str, ndarray]

Select input data arrays for the correct split and data type.

PARAMETER DESCRIPTION
input_data

Dictionary containing preprocessed data with keys like 'X_train_gt'.

TYPE: dict

split

Data split name, e.g., 'train'.

TYPE: str

split_key

Split key, e.g., 'train_gt' or 'train_raw'.

TYPE: str

model_name

Name of the model, e.g., 'CSDI'.

TYPE: str

RETURNS DESCRIPTION
dict

Dictionary with 'X' (selected data) and 'X_gt' (ground truth).

RAISES DESCRIPTION
ValueError

If split_key doesn't contain 'gt' or 'raw'.

Source code in src/featurization/feature_utils.py
def pick_input_data(
    input_data: dict[str, np.ndarray], split: str, split_key: str, model_name: str
) -> dict[str, np.ndarray]:
    """Select input data arrays for the correct split and data type.

    Parameters
    ----------
    input_data : dict
        Dictionary containing preprocessed data with keys like 'X_train_gt'.
    split : str
        Data split name, e.g., 'train'.
    split_key : str
        Split key, e.g., 'train_gt' or 'train_raw'.
    model_name : str
        Name of the model, e.g., 'CSDI'.

    Returns
    -------
    dict
        Dictionary with 'X' (selected data) and 'X_gt' (ground truth).

    Raises
    ------
    ValueError
        If split_key doesn't contain 'gt' or 'raw'.
    """
    if "raw" in split_key:
        X = input_data[f"X_{split}_raw"]
    elif "gt" in split_key:
        X = input_data[f"X_{split}_gt"]
    else:
        logger.error(
            'How come you have split_key = "{}" here? Should be either gt or raw'.format(
                split_key
            )
        )
        raise ValueError(
            'How come you have split_key = "{}" here? Should be either gt or raw'.format(
                split_key
            )
        )

    X_gt = input_data[f"X_{split}_gt"]

    return {"X": X, "X_gt": X_gt}

get_light_stimuli_timings

get_light_stimuli_timings(
    df_subject: DataFrame,
) -> dict[str, dict[str, float]]

Extract light stimulus onset/offset timings for red and blue colors.

PARAMETER DESCRIPTION
df_subject

Subject dataframe containing 'Red', 'Blue', and 'time' columns.

TYPE: DataFrame

RETURNS DESCRIPTION
dict

Dictionary with 'Red' and 'Blue' keys, each containing: - 'light_onset': float, time of light onset - 'light_offset': float, time of light offset - 'light_duration': float, duration of light stimulus

Source code in src/featurization/feature_utils.py
def get_light_stimuli_timings(
    df_subject: pl.DataFrame,
) -> dict[str, dict[str, float]]:
    """Extract light stimulus onset/offset timings for red and blue colors.

    Parameters
    ----------
    df_subject : pl.DataFrame
        Subject dataframe containing 'Red', 'Blue', and 'time' columns.

    Returns
    -------
    dict
        Dictionary with 'Red' and 'Blue' keys, each containing:
        - 'light_onset': float, time of light onset
        - 'light_offset': float, time of light offset
        - 'light_duration': float, duration of light stimulus
    """

    def set_null_to_zero(df: pl.DataFrame, col: str) -> pl.DataFrame:
        df = df.with_columns(
            (
                pl.when(pl.col(col).is_null()).then(pl.lit(0)).otherwise(pl.col(col))
            ).alias(col)
        )
        return df

    def individual_light_timing(
        df_subject: pl.DataFrame, color: str = "Red"
    ) -> dict[str, float]:
        df_subject = df_subject.fill_nan(0)
        light_offset_row = get_top1_of_col(df=df_subject, col=color, descending=True)
        light_onset_row = get_top1_of_col(df=df_subject, col=color, descending=False)
        assert light_offset_row.item(0, "time") > light_onset_row.item(0, "time"), (
            "Light offset should be after the light onset, but got "
            "light_onset = {} and light_offset = {}".format(
                light_onset_row.item(0, "time"), light_offset_row.item(0, "time")
            )
        )
        light_duration = light_offset_row.item(0, "time") - light_onset_row.item(
            0, "time"
        )

        return {
            "light_onset": light_onset_row.item(0, "time"),
            "light_offset": light_offset_row.item(0, "time"),
            "light_duration": light_duration,
        }

    timings = {}
    timings["Red"] = individual_light_timing(df_subject, color="Red")
    timings["Blue"] = individual_light_timing(df_subject, color="Blue")

    return timings

get_top1_of_col

get_top1_of_col(
    df: DataFrame, col: str, descending: bool
) -> DataFrame

Get the first or last row (by time) where a column has non-zero values.

Used to find light onset (first timepoint where light=1) or light offset (last timepoint where light=1).

PARAMETER DESCRIPTION
df

Input dataframe with 'time' column.

TYPE: DataFrame

col

Column name to filter by (e.g., 'Red', 'Blue').

TYPE: str

descending

If True, get the LAST timepoint (light offset). If False, get the FIRST timepoint (light onset).

TYPE: bool

RETURNS DESCRIPTION
DataFrame

Single-row dataframe with the onset or offset timepoint.

RAISES DESCRIPTION
AssertionError

If no samples remain after filtering null values.

Source code in src/featurization/feature_utils.py
def get_top1_of_col(df: pl.DataFrame, col: str, descending: bool) -> pl.DataFrame:
    """Get the first or last row (by time) where a column has non-zero values.

    Used to find light onset (first timepoint where light=1) or light offset
    (last timepoint where light=1).

    Parameters
    ----------
    df : pl.DataFrame
        Input dataframe with 'time' column.
    col : str
        Column name to filter by (e.g., 'Red', 'Blue').
    descending : bool
        If True, get the LAST timepoint (light offset).
        If False, get the FIRST timepoint (light onset).

    Returns
    -------
    pl.DataFrame
        Single-row dataframe with the onset or offset timepoint.

    Raises
    ------
    AssertionError
        If no samples remain after filtering null values.
    """
    df = replace_zeros_with_null(df, col=col)
    df = df.filter(~pl.all_horizontal(pl.col(col).is_null()))
    assert df.shape[0] > 0, "No samples in the dataframe"
    # Sort by TIME to get first/last timepoint where light is on
    df = df.sort("time", descending=descending)
    df_row = df[0]
    return df_row

replace_zeros_with_null

replace_zeros_with_null(
    df: DataFrame, col: str
) -> DataFrame

Replace zero values with NaN in a specified column.

Used to identify light onset/offset where zero indicates light-off periods.

PARAMETER DESCRIPTION
df

Input dataframe.

TYPE: DataFrame

col

Column name to process.

TYPE: str

RETURNS DESCRIPTION
DataFrame

Dataframe with zeros replaced by NaN in the specified column.

Source code in src/featurization/feature_utils.py
def replace_zeros_with_null(df: pl.DataFrame, col: str) -> pl.DataFrame:
    """Replace zero values with NaN in a specified column.

    Used to identify light onset/offset where zero indicates light-off periods.

    Parameters
    ----------
    df : pl.DataFrame
        Input dataframe.
    col : str
        Column name to process.

    Returns
    -------
    pl.DataFrame
        Dataframe with zeros replaced by NaN in the specified column.
    """
    # as in when the light was not on, the value is zero
    # first and last non-zero value is the light onset and offset
    # df = df.with_columns(
    #     (pl.when(pl.col(col) == 0).then(pl.lit(None)).otherwise(pl.col(col))).alias(col)
    # )
    df_pd = df.to_pandas()
    df_pd[col] = df_pd[col].replace(0, np.nan)
    return pl.DataFrame(df_pd)

convert_relative_timing_to_absolute_timing

convert_relative_timing_to_absolute_timing(
    light_timing: dict[str, float],
    feature_params: dict[str, Any],
    color: str,
    feature: str,
    feature_cfg: DictConfig,
) -> dict[str, Any]

Convert relative feature timing to absolute timing based on light stimulus.

PARAMETER DESCRIPTION
light_timing

Dictionary with 'light_onset' and 'light_offset' times.

TYPE: dict

feature_params

Feature parameters with 'time_from', 'time_start', and 'time_end'.

TYPE: dict

color

Light color ('Red' or 'Blue').

TYPE: str

feature

Feature name being computed.

TYPE: str

feature_cfg

Feature configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Updated feature_params with absolute 'time_start' and 'time_end'.

RAISES DESCRIPTION
ValueError

If 'time_from' is not 'onset' or 'offset'.

Source code in src/featurization/feature_utils.py
def convert_relative_timing_to_absolute_timing(
    light_timing: dict[str, float],
    feature_params: dict[str, Any],
    color: str,
    feature: str,
    feature_cfg: DictConfig,
) -> dict[str, Any]:
    """Convert relative feature timing to absolute timing based on light stimulus.

    Parameters
    ----------
    light_timing : dict
        Dictionary with 'light_onset' and 'light_offset' times.
    feature_params : dict
        Feature parameters with 'time_from', 'time_start', and 'time_end'.
    color : str
        Light color ('Red' or 'Blue').
    feature : str
        Feature name being computed.
    feature_cfg : DictConfig
        Feature configuration.

    Returns
    -------
    dict
        Updated feature_params with absolute 'time_start' and 'time_end'.

    Raises
    ------
    ValueError
        If 'time_from' is not 'onset' or 'offset'.
    """
    if feature_params["time_from"] == "onset":
        t0 = light_timing["light_onset"]
    elif feature_params["time_from"] == "offset":
        t0 = light_timing["light_offset"]
    else:
        logger.error("Unknown time_from = {}".format(feature_params["time_from"]))
        raise ValueError("Unknown time_from = {}".format(feature_params["time_from"]))

    feature_params["time_start"] = t0 + feature_params["time_start"]
    feature_params["time_end"] = t0 + feature_params["time_end"]

    return feature_params

get_feature_samples

get_feature_samples(
    df_subject: DataFrame,
    feature_params: dict[str, Any],
    col: str = "time",
    feature: Optional[str] = None,
) -> DataFrame

Filter dataframe to samples within a time window for feature extraction.

PARAMETER DESCRIPTION
df_subject

Subject dataframe with time series data.

TYPE: DataFrame

feature_params

Dictionary with 'time_start' and 'time_end' defining the window.

TYPE: dict

col

Column name for time values, by default 'time'.

TYPE: str DEFAULT: 'time'

feature

Feature name for logging, by default None.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Filtered dataframe with samples within the time window.

Notes

Uses pandas conversion to avoid Polars Rust errors with Object arrays. See https://github.com/pola-rs/polars/issues/18399

Source code in src/featurization/feature_utils.py
def get_feature_samples(
    df_subject: pl.DataFrame,
    feature_params: dict[str, Any],
    col: str = "time",
    feature: Optional[str] = None,
) -> pl.DataFrame:
    """Filter dataframe to samples within a time window for feature extraction.

    Parameters
    ----------
    df_subject : pl.DataFrame
        Subject dataframe with time series data.
    feature_params : dict
        Dictionary with 'time_start' and 'time_end' defining the window.
    col : str, optional
        Column name for time values, by default 'time'.
    feature : str, optional
        Feature name for logging, by default None.

    Returns
    -------
    pl.DataFrame
        Filtered dataframe with samples within the time window.

    Notes
    -----
    Uses pandas conversion to avoid Polars Rust errors with Object arrays.
    See https://github.com/pola-rs/polars/issues/18399
    """
    logger.debug("f{feature}: {feature_params}")
    df_pd = df_subject.to_pandas()
    df_pd = df_pd[
        (df_pd[col] >= feature_params["time_start"])
        & (df_pd[col] <= feature_params["time_end"])
    ]
    feature_samples = pl.from_pandas(df_pd)

    return feature_samples

flatten_dict_to_dataframe

flatten_dict_to_dataframe(
    features_nested: dict[str, dict[str, Any]],
    mlflow_series: Optional[Series],
    cfg: DictConfig,
) -> dict[str, Any]

Convert nested features dictionary to flat dataframe structure.

PARAMETER DESCRIPTION
features_nested

Nested dictionary keyed by split, then by subject code, containing features.

TYPE: dict

mlflow_series

MLflow run information as a Polars series.

TYPE: Series or None

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Dictionary with 'data' (containing flattened dataframes per split) and 'mlflow_run' information.

Source code in src/featurization/feature_utils.py
def flatten_dict_to_dataframe(
    features_nested: dict[str, dict[str, Any]],
    mlflow_series: Optional[pl.Series],
    cfg: DictConfig,
) -> dict[str, Any]:
    """Convert nested features dictionary to flat dataframe structure.

    Parameters
    ----------
    features_nested : dict
        Nested dictionary keyed by split, then by subject code, containing features.
    mlflow_series : pl.Series or None
        MLflow run information as a Polars series.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    dict
        Dictionary with 'data' (containing flattened dataframes per split)
        and 'mlflow_run' information.
    """
    # Init the dict with the MLflow run the same for all the splits
    features_df = {
        "data": {},
        "mlflow_run": dict(mlflow_series) if mlflow_series is not None else None,
    }

    for j, split in enumerate(features_nested.keys()):
        features_df["data"][split] = {}
        subjects_as_dicts = features_nested[split]
        features_df["data"][split] = flatten_subject_dicts_to_df(subjects_as_dicts, cfg)

    return features_df

flatten_subject_dicts_to_df

flatten_subject_dicts_to_df(
    subjects_as_dicts: dict[str, dict[str, Any]],
    cfg: DictConfig,
) -> DataFrame

Convert subject-wise feature dictionaries to a single dataframe.

PARAMETER DESCRIPTION
subjects_as_dicts

Dictionary keyed by subject_code containing feature dictionaries.

TYPE: dict

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
DataFrame

Dataframe with one row per subject and feature columns.

Source code in src/featurization/feature_utils.py
def flatten_subject_dicts_to_df(
    subjects_as_dicts: dict[str, dict[str, Any]], cfg: DictConfig
) -> pl.DataFrame:
    """Convert subject-wise feature dictionaries to a single dataframe.

    Parameters
    ----------
    subjects_as_dicts : dict
        Dictionary keyed by subject_code containing feature dictionaries.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    pl.DataFrame
        Dataframe with one row per subject and feature columns.
    """
    # Each dictionary becomes a row in the dataframe, with the computed features as columns
    df_features = None
    for i, subject_code in enumerate(subjects_as_dicts.keys()):
        df_row = create_df_row(
            subject_dict=subjects_as_dicts[subject_code],
            subject_code=subject_code,
            cfg=cfg,
        )
        # TODO "direct concatenation"?
        if df_features is None:
            df_features = df_row.to_pandas()
        else:
            df_features = pd.concat(
                [df_features, df_row.to_pandas()], ignore_index=True, axis=0
            )

    return pl.from_pandas(df_features)

create_df_row

create_df_row(
    subject_dict: dict[str, Any],
    subject_code: str,
    cfg: DictConfig,
) -> DataFrame

Create a single dataframe row from a subject's feature dictionary.

PARAMETER DESCRIPTION
subject_dict

Dictionary containing features keyed by color, then feature name.

TYPE: dict

subject_code

Unique identifier for the subject.

TYPE: str

cfg

Configuration dictionary with stat_keys to extract.

TYPE: DictConfig

RETURNS DESCRIPTION
DataFrame

Single-row dataframe with flattened feature columns.

Source code in src/featurization/feature_utils.py
def create_df_row(
    subject_dict: dict[str, Any], subject_code: str, cfg: DictConfig
) -> pl.DataFrame:
    """Create a single dataframe row from a subject's feature dictionary.

    Parameters
    ----------
    subject_dict : dict
        Dictionary containing features keyed by color, then feature name.
    subject_code : str
        Unique identifier for the subject.
    cfg : DictConfig
        Configuration dictionary with stat_keys to extract.

    Returns
    -------
    pl.DataFrame
        Single-row dataframe with flattened feature columns.
    """
    statkeys_to_pick = cfg["PLR_FEATURIZATION"]["FEATURIZATION"]["stat_keys"]
    dict_features = {"subject_code": subject_code}
    for m, color in enumerate(subject_dict.keys()):
        color_features = subject_dict[color]
        if isinstance(color_features, dict):
            for n, feature_name in enumerate(color_features.keys()):
                feature_name_flat = f"{color}_{feature_name}"
                # This is the feature dictionary with the stat keys (value, std, CI, etc.)
                feature_dict = color_features[feature_name]
                for o, dict_key in enumerate(feature_dict.keys()):
                    if dict_key in statkeys_to_pick:
                        feature_name_out = f"{feature_name_flat}_{dict_key}"
                        dict_features[feature_name_out] = feature_dict[dict_key]
        elif isinstance(color_features, pl.DataFrame):
            # Metadata (e.g. glaucoma or not, age, or whatever)
            for col in color_features.columns:
                col_out = f"metadata_{col}"
                dict_features[col_out] = color_features[col][0]
    return pl.DataFrame(dict_features)

get_features_fpath

get_features_fpath(
    cfg: DictConfig, service_name: str = "best_models"
) -> str

Construct the file path for saving/loading features.

PARAMETER DESCRIPTION
cfg

Configuration dictionary with DATA and ARTIFACTS settings.

TYPE: DictConfig

service_name

Service name for artifacts directory, by default 'best_models'.

TYPE: str DEFAULT: 'best_models'

RETURNS DESCRIPTION
str

Full file path for the features file.

Source code in src/featurization/feature_utils.py
def get_features_fpath(cfg: DictConfig, service_name: str = "best_models") -> str:
    """Construct the file path for saving/loading features.

    Parameters
    ----------
    cfg : DictConfig
        Configuration dictionary with DATA and ARTIFACTS settings.
    service_name : str, optional
        Service name for artifacts directory, by default 'best_models'.

    Returns
    -------
    str
        Full file path for the features file.
    """
    duckdb_path = Path(cfg["DATA"]["filename_DuckDB"])
    filename = (
        duckdb_path.stem
        + cfg["PLR_FEATURIZATION"]["feature_file_suffix"]
        + "."
        + cfg["ARTIFACTS"]["results_format"]
    )
    artifacts_dir = Path(get_artifacts_dir(service_name=service_name))
    return str(artifacts_dir / filename)

export_features_to_disk

export_features_to_disk(
    dict_out: dict[str, Any], cfg: DictConfig
) -> None

Save features dictionary to disk as a pickle file.

PARAMETER DESCRIPTION
dict_out

Features dictionary to save.

TYPE: dict

cfg

Configuration dictionary for determining file path.

TYPE: DictConfig

Source code in src/featurization/feature_utils.py
def export_features_to_disk(dict_out: dict[str, Any], cfg: DictConfig) -> None:
    """Save features dictionary to disk as a pickle file.

    Parameters
    ----------
    dict_out : dict
        Features dictionary to save.
    cfg : DictConfig
        Configuration dictionary for determining file path.
    """
    features_filepath = Path(get_features_fpath(cfg))
    features_filepath.parent.mkdir(parents=True, exist_ok=True)

    logger.info("Exporting the features to the disk: {}".format(features_filepath))
    save_results_dict(dict_out, str(features_filepath), debug_load=True)

load_features_from_disk

load_features_from_disk(cfg: DictConfig) -> dict[str, Any]

Load features dictionary from disk.

PARAMETER DESCRIPTION
cfg

Configuration dictionary for determining file path.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Loaded features dictionary.

Source code in src/featurization/feature_utils.py
def load_features_from_disk(cfg: DictConfig) -> dict[str, Any]:
    """Load features dictionary from disk.

    Parameters
    ----------
    cfg : DictConfig
        Configuration dictionary for determining file path.

    Returns
    -------
    dict
        Loaded features dictionary.
    """
    features_filepath = get_features_fpath(cfg)
    features = load_results_dict(features_filepath)
    return features

get_feature_names

get_feature_names(
    features: dict[str, Any],
    cols_exclude: tuple[str, ...] = ("subject_code",),
    name_substring: str = "_value",
) -> list[str]

Extract feature names from a nested features dictionary.

PARAMETER DESCRIPTION
features

Nested features dictionary with source -> data -> split structure.

TYPE: dict

cols_exclude

Column names to exclude, by default ('subject_code',).

TYPE: tuple DEFAULT: ('subject_code',)

name_substring

Substring to filter column names, by default '_value'.

TYPE: str DEFAULT: '_value'

RETURNS DESCRIPTION
list

List of feature names with the substring removed.

Source code in src/featurization/feature_utils.py
def get_feature_names(
    features: dict[str, Any],
    cols_exclude: tuple[str, ...] = ("subject_code",),
    name_substring: str = "_value",
) -> list[str]:
    """Extract feature names from a nested features dictionary.

    Parameters
    ----------
    features : dict
        Nested features dictionary with source -> data -> split structure.
    cols_exclude : tuple, optional
        Column names to exclude, by default ('subject_code',).
    name_substring : str, optional
        Substring to filter column names, by default '_value'.

    Returns
    -------
    list
        List of feature names with the substring removed.
    """

    def filter_for_col_substring(col_names, name_substring):
        # we now have Blue_PIPR_value, and Blue_PIPR_std so we do not duplicate names
        col_names = [col for col in col_names if name_substring in col]
        # get rid of the substring
        return [col.replace(name_substring, "") for col in col_names]

    def exclude_col_names(col_names, cols_exclude):
        return [col for col in col_names if col not in cols_exclude]

    for i, source in enumerate(features):
        if i == 0:
            for j, split in enumerate(features[source]["data"]):
                if j == 0:
                    for k, split_key in enumerate(features[source]["data"][split]):
                        if k == 0:
                            col_names = features[source]["data"][split][
                                split_key
                            ].columns

    col_names = exclude_col_names(col_names, cols_exclude)
    return filter_for_col_substring(col_names, name_substring)

get_split_keys

get_split_keys(
    features: dict[str, Any],
    model_exclude: str = "BASELINE_GT",
    return_suffix: bool = True,
) -> Optional[list[str]]

Get split key suffixes from features dictionary.

PARAMETER DESCRIPTION
features

Features dictionary keyed by split.

TYPE: dict

model_exclude

Model name to exclude from search, by default 'BASELINE_GT'.

TYPE: str DEFAULT: 'BASELINE_GT'

return_suffix

If True, return only the suffix; if False, return full keys.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
list

List of split key suffixes (e.g., ['_gt', '_raw']).

Source code in src/featurization/feature_utils.py
def get_split_keys(
    features: dict[str, Any],
    model_exclude: str = "BASELINE_GT",
    return_suffix: bool = True,
) -> Optional[list[str]]:
    """Get split key suffixes from features dictionary.

    Parameters
    ----------
    features : dict
        Features dictionary keyed by split.
    model_exclude : str, optional
        Model name to exclude from search, by default 'BASELINE_GT'.
    return_suffix : bool, optional
        If True, return only the suffix; if False, return full keys.

    Returns
    -------
    list
        List of split key suffixes (e.g., ['_gt', '_raw']).
    """
    for i, split in enumerate(features):
        if i == 0:
            for j, model in enumerate(features[split]):
                if model is not model_exclude:
                    keys = list(features[split][model].keys())
                    # replace split in the keys with ''
                    if return_suffix:
                        keys = [key.replace(f"{split}", "") for key in keys]
                    return keys

data_for_featurization_wrapper

data_for_featurization_wrapper(
    artifacts: dict[str, Any], cfg: DictConfig
) -> dict[str, Any]

Prepare data dictionaries for featurization from artifacts.

Combines imputed data from models with baseline input data.

PARAMETER DESCRIPTION
artifacts

Dictionary containing model artifacts with imputation results.

TYPE: dict

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Combined dictionary of imputed and baseline data ready for featurization.

Source code in src/featurization/feature_utils.py
def data_for_featurization_wrapper(
    artifacts: dict[str, Any], cfg: DictConfig
) -> dict[str, Any]:
    """Prepare data dictionaries for featurization from artifacts.

    Combines imputed data from models with baseline input data.

    Parameters
    ----------
    artifacts : dict
        Dictionary containing model artifacts with imputation results.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    dict
        Combined dictionary of imputed and baseline data ready for featurization.
    """
    logger.info("Get the data for featurization from artifacts")

    # Imputed data from various imputer models
    imputed_data, model_name, split_names = get_imputed_data_for_featurization(
        artifacts, cfg
    )

    # Baseline data for featurization
    input_data = get_baseline_input_data_for_featurization(
        artifacts, model_name, split_names
    )

    # combine dictionaries for output
    return {**input_data, **imputed_data}

get_baseline_input_data_for_featurization

get_baseline_input_data_for_featurization(
    artifacts: dict[str, Any],
    model_name: str,
    split_names: list[str],
) -> dict[str, Any]

Extract baseline input data (GT and raw) formatted for featurization.

Creates pseudo-imputation dictionaries from original input data to enable consistent processing with actual imputation outputs.

PARAMETER DESCRIPTION
artifacts

Dictionary containing model artifacts.

TYPE: dict

model_name

Name of a model to extract metadata from.

TYPE: str

split_names

List of split names (e.g., ['train', 'test']).

TYPE: list

RETURNS DESCRIPTION
dict

Dictionary with 'BASELINE_GT' and 'BASELINE_RAW' data structures.

Source code in src/featurization/feature_utils.py
def get_baseline_input_data_for_featurization(
    artifacts: dict[str, Any], model_name: str, split_names: list[str]
) -> dict[str, Any]:
    """Extract baseline input data (GT and raw) formatted for featurization.

    Creates pseudo-imputation dictionaries from original input data to enable
    consistent processing with actual imputation outputs.

    Parameters
    ----------
    artifacts : dict
        Dictionary containing model artifacts.
    model_name : str
        Name of a model to extract metadata from.
    split_names : list
        List of split names (e.g., ['train', 'test']).

    Returns
    -------
    dict
        Dictionary with 'BASELINE_GT' and 'BASELINE_RAW' data structures.
    """
    # Get the data for featurization (for baseline features)
    # You will get 2 new "models" here as it is easier to compute stuff and visualize when input data
    # is treated as the imputed model outputs
    model_artifacts = artifacts[model_name]
    data_names = ["BASELINE_GT", "BASELINE_RAW"]
    split_key_names = ["gt", "raw"]
    input_data = {}
    for i, dataname in enumerate(data_names):
        input_data[dataname] = {}
        for j, split in enumerate(split_names):
            input_data[dataname][split] = {}
            data = get_pseudoimputation_dicts_from_input_data(model_artifacts, split)
            split_key = split_key_names[i]
            data_out = data[split_key]
            input_data[dataname][split][split_key] = {}
            input_data[dataname][split][split_key]["data"] = data_out
            input_data[dataname][split][split_key]["metadata"] = model_artifacts[
                "data_input"
            ][split]["metadata"]

    return input_data

get_imputed_data_for_featurization

get_imputed_data_for_featurization(
    artifacts: dict[str, Any], cfg: DictConfig
) -> tuple[dict[str, Any], str, Any]

Extract imputed data from all models for featurization.

PARAMETER DESCRIPTION
artifacts

Dictionary keyed by model name containing imputation results.

TYPE: dict

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
tuple

(imputed_data, model_name, split_names) where imputed_data is the nested dictionary of imputation results.

Source code in src/featurization/feature_utils.py
def get_imputed_data_for_featurization(
    artifacts: dict[str, Any], cfg: DictConfig
) -> tuple[dict[str, Any], str, Any]:
    """Extract imputed data from all models for featurization.

    Parameters
    ----------
    artifacts : dict
        Dictionary keyed by model name containing imputation results.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    tuple
        (imputed_data, model_name, split_names) where imputed_data is
        the nested dictionary of imputation results.
    """
    # Get the dictinaries outputted from the imputation models
    imputed_data = {}
    for i, (model_name, model_artifacts) in enumerate(artifacts.items()):
        imputed_data[model_name] = {}
        split_names = model_artifacts["imputation"].keys()
        for split in split_names:
            imputed_data[model_name][split] = {}
            split_keys = model_artifacts["imputation"][split].keys()
            for split_key in split_keys:
                imputed_data[model_name][split][split_key] = imputed_data_by_split_key(
                    imputation=model_artifacts["imputation"][split][split_key][
                        "imputation_dict"
                    ]["imputation"],
                    metadata=model_artifacts["data_input"][split]["metadata"],
                    split=split,
                    split_key=split_key,
                )

    return imputed_data, model_name, split_names

imputed_data_by_split_key

imputed_data_by_split_key(
    imputation: dict[str, Any],
    metadata: dict[str, Any],
    split: str,
    split_key: str,
) -> dict[str, Any]

Package imputation data with metadata for a specific split key.

PARAMETER DESCRIPTION
imputation

Imputation results dictionary.

TYPE: dict

metadata

Metadata dictionary for the split.

TYPE: dict

split

Split name (e.g., 'train', 'test').

TYPE: str

split_key

Split key (e.g., 'gt', 'raw').

TYPE: str

RETURNS DESCRIPTION
dict

Dictionary with 'data' and 'metadata' keys.

Source code in src/featurization/feature_utils.py
def imputed_data_by_split_key(
    imputation: dict[str, Any], metadata: dict[str, Any], split: str, split_key: str
) -> dict[str, Any]:
    """Package imputation data with metadata for a specific split key.

    Parameters
    ----------
    imputation : dict
        Imputation results dictionary.
    metadata : dict
        Metadata dictionary for the split.
    split : str
        Split name (e.g., 'train', 'test').
    split_key : str
        Split key (e.g., 'gt', 'raw').

    Returns
    -------
    dict
        Dictionary with 'data' and 'metadata' keys.
    """
    imputed_data = {"data": imputation, "metadata": metadata}
    return imputed_data

get_pseudoimputation_dicts_from_input_data

get_pseudoimputation_dicts_from_input_data(
    model_artifacts: dict[str, Any], split: str
) -> dict[str, dict[str, Any]]

Create imputation-like dictionaries from raw input data.

Converts ground truth and raw data arrays into the same format as imputation model outputs for consistent downstream processing.

PARAMETER DESCRIPTION
model_artifacts

Model artifacts containing data_input with ground truth and raw data.

TYPE: dict

split

Split name (e.g., 'train', 'test').

TYPE: str

RETURNS DESCRIPTION
dict

Dictionary with 'gt' and 'raw' keys containing pseudo-imputation dicts.

Source code in src/featurization/feature_utils.py
def get_pseudoimputation_dicts_from_input_data(
    model_artifacts: dict[str, Any], split: str
) -> dict[str, dict[str, Any]]:
    """Create imputation-like dictionaries from raw input data.

    Converts ground truth and raw data arrays into the same format
    as imputation model outputs for consistent downstream processing.

    Parameters
    ----------
    model_artifacts : dict
        Model artifacts containing data_input with ground truth and raw data.
    split : str
        Split name (e.g., 'train', 'test').

    Returns
    -------
    dict
        Dictionary with 'gt' and 'raw' keys containing pseudo-imputation dicts.
    """

    def convert_np_array_to_dict(X):
        return {"mean": X, "ci_pos": None, "ci_neg": None}

    # Ground truth (denoised), no missing data
    X_gt = model_artifacts["data_input"][split]["data"]["ground_truth"]["gt"]

    # Raw data (noisy), missing data
    X_raw = model_artifacts["data_input"][split]["data"]["data_missing"]["raw"]

    return {
        "gt": convert_np_array_to_dict(X_gt),
        "raw": convert_np_array_to_dict(X_raw),
    }

get_dict_PLR_per_code

get_dict_PLR_per_code(
    data_dict: dict[str, Any], i: int
) -> dict[str, Optional[ndarray]]

Extract PLR data for a single subject from a data dictionary.

PARAMETER DESCRIPTION
data_dict

Dictionary containing numpy arrays with shape (n_subjects, n_timepoints, 1).

TYPE: dict

i

Subject index to extract.

TYPE: int

RETURNS DESCRIPTION
dict

Dictionary with PLR data arrays for the specified subject.

RAISES DESCRIPTION
ValueError

If data type is not a number or numpy array.

Source code in src/featurization/feature_utils.py
def get_dict_PLR_per_code(
    data_dict: dict[str, Any], i: int
) -> dict[str, Optional[np.ndarray]]:
    """Extract PLR data for a single subject from a data dictionary.

    Parameters
    ----------
    data_dict : dict
        Dictionary containing numpy arrays with shape (n_subjects, n_timepoints, 1).
    i : int
        Subject index to extract.

    Returns
    -------
    dict
        Dictionary with PLR data arrays for the specified subject.

    Raises
    ------
    ValueError
        If data type is not a number or numpy array.
    """
    dict_PLR = {}
    dict_tmp = data_dict
    for key_in in dict_tmp.keys():
        if dict_tmp[key_in] is not None:
            if isinstance(dict_tmp[key_in], numbers.Number):
                # e.g. you might have n for ensemble models, integer indicating the number of models
                dict_PLR[key_in] = dict_tmp[key_in]
            elif isinstance(dict_tmp[key_in], np.ndarray):
                dict_PLR[key_in] = dict_tmp[key_in][i, :, 0]
            else:
                logger.error(
                    "Unknown type for the PLR data: {}".format(type(dict_tmp[key_in]))
                )
                raise ValueError(
                    "Unknown type for the PLR data: {}".format(type(dict_tmp[key_in]))
                )

        else:
            # when you have undefined CI for example, this is still None
            dict_PLR[key_in] = None

    return dict_PLR

subjectwise_df_for_featurization

subjectwise_df_for_featurization(
    data_dict_subj: dict[str, Any],
    metadata_subject: DataFrame,
    subject_code: str,
    cfg: DictConfig,
    i: Optional[int] = None,
) -> DataFrame

Create a subject-specific dataframe for featurization.

Combines PLR time series data with subject metadata into a single dataframe.

PARAMETER DESCRIPTION
data_dict_subj

Dictionary containing PLR data arrays for the subject.

TYPE: dict

metadata_subject

Subject metadata as a Polars dataframe.

TYPE: DataFrame

subject_code

Unique subject identifier.

TYPE: str

cfg

Configuration dictionary with PLR_length.

TYPE: DictConfig

i

Subject index for extraction, by default None.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Combined dataframe with PLR data and metadata.

RAISES DESCRIPTION
AssertionError

If dataframe length doesn't match expected PLR length.

Source code in src/featurization/feature_utils.py
def subjectwise_df_for_featurization(
    data_dict_subj: dict[str, Any],
    metadata_subject: pl.DataFrame,
    subject_code: str,
    cfg: DictConfig,
    i: Optional[int] = None,
) -> pl.DataFrame:
    """Create a subject-specific dataframe for featurization.

    Combines PLR time series data with subject metadata into a single dataframe.

    Parameters
    ----------
    data_dict_subj : dict
        Dictionary containing PLR data arrays for the subject.
    metadata_subject : pl.DataFrame
        Subject metadata as a Polars dataframe.
    subject_code : str
        Unique subject identifier.
    cfg : DictConfig
        Configuration dictionary with PLR_length.
    i : int, optional
        Subject index for extraction, by default None.

    Returns
    -------
    pl.DataFrame
        Combined dataframe with PLR data and metadata.

    Raises
    ------
    AssertionError
        If dataframe length doesn't match expected PLR length.
    """
    # PLR recording from dict to Polars dataframe
    dict_PLR = get_dict_PLR_per_code(data_dict_subj, i)
    df_PLR = subjectdict_to_df(dict_PLR)
    assert df_PLR.shape[0] == cfg["DATA"]["PLR_length"], (
        f"{df_PLR.shape[0]} should be the same as "
        f"PLR length {cfg['DATA']['PLR_length']}"
    )

    # Combine these two dataframes
    df_subject = pandas_concat(df_PLR, metadata_subject, axis=1)
    # df_subject = pl.concat([df_PLR, metadata_subject], how="horizontal")
    assert df_subject.shape[0] == cfg["DATA"]["PLR_length"], (
        f"{df_subject.shape[0]} should be the same as "
        f"PLR length {cfg['DATA']['PLR_length']}"
    )

    # Check and fix the schema of the dataframe
    df_subject = check_and_fix_df_schema(df_subject)

    return df_subject

drop_useless_metadata_cols

drop_useless_metadata_cols(
    metadata_subject: DataFrame, i: int, cfg: DictConfig
) -> DataFrame

Remove unnecessary metadata columns from subject dataframe.

PARAMETER DESCRIPTION
metadata_subject

Subject metadata dataframe.

TYPE: DataFrame

i

Subject index (used for logging only on first subject).

TYPE: int

cfg

Configuration with DROP_COLS and DROP_COLS_EXTRA lists.

TYPE: DictConfig

RETURNS DESCRIPTION
DataFrame

Dataframe with specified columns removed.

Source code in src/featurization/feature_utils.py
def drop_useless_metadata_cols(
    metadata_subject: pl.DataFrame, i: int, cfg: DictConfig
) -> pl.DataFrame:
    """Remove unnecessary metadata columns from subject dataframe.

    Parameters
    ----------
    metadata_subject : pl.DataFrame
        Subject metadata dataframe.
    i : int
        Subject index (used for logging only on first subject).
    cfg : DictConfig
        Configuration with DROP_COLS and DROP_COLS_EXTRA lists.

    Returns
    -------
    pl.DataFrame
        Dataframe with specified columns removed.
    """
    # Note! the df_PLR now contains also "useless" columns, but it is easier to keep them
    #  than to figure out a flexible scheme that would handle new added metadata columns
    try:
        metadata_subject = metadata_subject.drop(cfg["PLR_FEATURIZATION"]["DROP_COLS"])
    except Exception as e:
        logger.warning(
            "Failed to drop the 'useless columns' from the metadata: {}".format(e)
        )
        logger.warning(
            "You will now have some extra columns that might confuse you? "
            "But no problems caused for computation of features"
        )

    # Now there are also the non-harmonized naming
    for set_keys in cfg["PLR_FEATURIZATION"]["DROP_COLS_EXTRA"]:
        drop_cols = cfg["PLR_FEATURIZATION"]["DROP_COLS_EXTRA"][set_keys]
        try:
            metadata_subject = metadata_subject.drop(drop_cols)
        except Exception:
            pass

    if i == 0:
        # Display only on the first subject, no need to clutter the logs
        logger.debug(
            'Dropping the "useless columns" from the metadata: {}'.format(
                cfg["PLR_FEATURIZATION"]["DROP_COLS"]
            )
        )
        logger.debug(
            "Remaining columns in the metadata: {}".format(metadata_subject.columns)
        )

    return metadata_subject

subjectdict_to_df

subjectdict_to_df(
    dict_PLR: dict[str, Optional[ndarray]],
) -> Optional[DataFrame]

Convert a subject's PLR dictionary to a Polars dataframe.

PARAMETER DESCRIPTION
dict_PLR

Dictionary with PLR data arrays keyed by data type.

TYPE: dict

RETURNS DESCRIPTION
DataFrame

Dataframe with one column per data type.

Source code in src/featurization/feature_utils.py
def subjectdict_to_df(
    dict_PLR: dict[str, Optional[np.ndarray]],
) -> Optional[pl.DataFrame]:
    """Convert a subject's PLR dictionary to a Polars dataframe.

    Parameters
    ----------
    dict_PLR : dict
        Dictionary with PLR data arrays keyed by data type.

    Returns
    -------
    pl.DataFrame
        Dataframe with one column per data type.
    """
    df = None  # pl.DataFrame
    for data_key in dict_PLR.keys():
        if dict_PLR[data_key] is not None:
            array = dict_PLR[data_key]
            if df is not None:
                df = df.with_columns(pl.lit(array).alias(data_key))
            else:
                df = pl.DataFrame({data_key: array})
        else:
            if df is not None:
                df = df.with_columns(pl.lit(None).alias(data_key))
            else:
                df = pl.DataFrame({data_key: None})
    return df

get_df_subject_per_code

get_df_subject_per_code(
    data_df: DataFrame, subject_code: str, cfg: DictConfig
) -> DataFrame

Filter dataframe to get data for a specific subject.

PARAMETER DESCRIPTION
data_df

Full dataframe containing all subjects.

TYPE: DataFrame

subject_code

Subject code to filter for.

TYPE: str

cfg

Configuration with PLR_length for validation.

TYPE: DictConfig

RETURNS DESCRIPTION
DataFrame

Filtered dataframe for the specified subject.

RAISES DESCRIPTION
AssertionError

If filtered dataframe length doesn't match expected PLR length.

Source code in src/featurization/feature_utils.py
def get_df_subject_per_code(
    data_df: pl.DataFrame, subject_code: str, cfg: DictConfig
) -> pl.DataFrame:
    """Filter dataframe to get data for a specific subject.

    Parameters
    ----------
    data_df : pl.DataFrame
        Full dataframe containing all subjects.
    subject_code : str
        Subject code to filter for.
    cfg : DictConfig
        Configuration with PLR_length for validation.

    Returns
    -------
    pl.DataFrame
        Filtered dataframe for the specified subject.

    Raises
    ------
    AssertionError
        If filtered dataframe length doesn't match expected PLR length.
    """
    # df_subject = data_df.filter(pl.col("subject_code") == subject_code)
    df_subject = pandas_col_condition_filter(
        df=data_df, col_name="subject_code", col_value=subject_code
    )

    assert df_subject.shape[0] == cfg["DATA"]["PLR_length"], (
        f"df length {df_subject.shape[0]} should be the same as "
        f"PLR length {cfg['DATA']['PLR_length']}"
    )
    return df_subject

get_metadata_row

get_metadata_row(df_subject: DataFrame, cfg: DictConfig)

Extract scalar metadata from the first row of subject dataframe.

PARAMETER DESCRIPTION
df_subject

Subject dataframe with repeated metadata across all timepoints.

TYPE: DataFrame

cfg

Configuration dictionary (unused but kept for API consistency).

TYPE: DictConfig

RETURNS DESCRIPTION
DataFrame

First row containing scalar metadata values.

Source code in src/featurization/feature_utils.py
def get_metadata_row(df_subject: pl.DataFrame, cfg: DictConfig):
    """Extract scalar metadata from the first row of subject dataframe.

    Parameters
    ----------
    df_subject : pl.DataFrame
        Subject dataframe with repeated metadata across all timepoints.
    cfg : DictConfig
        Configuration dictionary (unused but kept for API consistency).

    Returns
    -------
    pl.DataFrame
        First row containing scalar metadata values.
    """
    # You have 1981 (or n number) of datapoints, so just take the first row to get the "scalar metadata"
    first_row = df_subject[0]
    # This contains "useless cols" but easier to just use all the columns
    return first_row

get_feature_cfg_hash

get_feature_cfg_hash(subcfg: DictConfig) -> str

Generate a hash string for feature configuration.

PARAMETER DESCRIPTION
subcfg

Feature configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Hash string for the configuration (currently returns placeholder).

Notes

Not fully implemented - returns 'dummyHash'.

Source code in src/featurization/feature_utils.py
def get_feature_cfg_hash(subcfg: DictConfig) -> str:
    """Generate a hash string for feature configuration.

    Parameters
    ----------
    subcfg : DictConfig
        Feature configuration dictionary.

    Returns
    -------
    str
        Hash string for the configuration (currently returns placeholder).

    Notes
    -----
    Not fully implemented - returns 'dummyHash'.
    """
    # see e.g. https://stackoverflow.com/questions/5884066/hashing-a-dictionary
    return "dummyHash"

export_features_pickle_file

export_features_pickle_file(
    features: dict, data_source: str, cfg: DictConfig
)

Export features dictionary to a pickle file.

PARAMETER DESCRIPTION
features

Features dictionary with structure: - data: dict with 'train' and 'test' pl.DataFrames - mlflow_run: MLflow run information

TYPE: dict

data_source

Data source name used for filename.

TYPE: str

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Path to the exported pickle file.

RAISES DESCRIPTION
Exception

If saving fails.

Source code in src/featurization/feature_utils.py
def export_features_pickle_file(features: dict, data_source: str, cfg: DictConfig):
    """Export features dictionary to a pickle file.

    Parameters
    ----------
    features : dict
        Features dictionary with structure:
        - data: dict with 'train' and 'test' pl.DataFrames
        - mlflow_run: MLflow run information
    data_source : str
        Data source name used for filename.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    str
        Path to the exported pickle file.

    Raises
    ------
    Exception
        If saving fails.
    """
    output_dir = Path(get_artifacts_dir(service_name="features"))
    output_dir.mkdir(parents=True, exist_ok=True)
    fname = get_features_pickle_fname(data_source)
    output_path = output_dir / fname
    try:
        save_results_dict(features, str(output_path), name="features")
    except Exception as e:
        logger.error(f"Failed to save features as a pickle: {e}")
        raise e

    return str(output_path)

add_feature_metadata_suffix_to_run_name

add_feature_metadata_suffix_to_run_name(
    run_name: str, subcfg: DictConfig
) -> str

Append feature metadata suffix to run name.

PARAMETER DESCRIPTION
run_name

Base run name.

TYPE: str

subcfg

Configuration with 'name' and 'version' keys.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Run name with appended metadata suffix.

Source code in src/featurization/feature_utils.py
def add_feature_metadata_suffix_to_run_name(run_name: str, subcfg: DictConfig) -> str:
    """Append feature metadata suffix to run name.

    Parameters
    ----------
    run_name : str
        Base run name.
    subcfg : DictConfig
        Configuration with 'name' and 'version' keys.

    Returns
    -------
    str
        Run name with appended metadata suffix.
    """
    return "{}_{}_v{}".format(run_name, subcfg["name"], subcfg["version"])

harmonize_to_imputation_dict

harmonize_to_imputation_dict(
    data_array: ndarray,
    metadata: dict[str, Any],
    split_key_fixed: str,
    cfg: DictConfig,
    destandardize: bool = True,
) -> dict[str, Any]

Convert raw data array to imputation dictionary format.

Optionally destandardizes the data and packages it in the same format as imputation model outputs.

PARAMETER DESCRIPTION
data_array

Input data array.

TYPE: ndarray

metadata

Metadata dictionary with preprocessing stats.

TYPE: dict

split_key_fixed

Split key ('gt' or 'raw').

TYPE: str

cfg

Configuration dictionary.

TYPE: DictConfig

destandardize

Whether to destandardize the data, by default True.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
dict

Dictionary conforming to imputation output format with 'imputation_dict' and 'metadata' keys.

Source code in src/featurization/feature_utils.py
def harmonize_to_imputation_dict(
    data_array: np.ndarray,
    metadata: dict[str, Any],
    split_key_fixed: str,
    cfg: DictConfig,
    destandardize: bool = True,
) -> dict[str, Any]:
    """Convert raw data array to imputation dictionary format.

    Optionally destandardizes the data and packages it in the same
    format as imputation model outputs.

    Parameters
    ----------
    data_array : np.ndarray
        Input data array.
    metadata : dict
        Metadata dictionary with preprocessing stats.
    split_key_fixed : str
        Split key ('gt' or 'raw').
    cfg : DictConfig
        Configuration dictionary.
    destandardize : bool, optional
        Whether to destandardize the data, by default True.

    Returns
    -------
    dict
        Dictionary conforming to imputation output format with
        'imputation_dict' and 'metadata' keys.
    """

    dict_out = {}

    if destandardize:
        mean_before, std_before = np.nanmean(data_array), np.nanstd(data_array)
        if cfg["PREPROCESS"]["standardize"]:
            standardize_stats = metadata["preprocess"]["standardize"]["gt"]
            data_array = destandardize_numpy(
                X=data_array,
                mean=standardize_stats["mean"],
                std=standardize_stats["std"],
            )
        logger.info(
            "Destandardizing the input data | Mean {:.2f} -> {:.2f}, std {:.2f} -> {:.2f}".format(
                mean_before, np.nanmean(data_array), std_before, np.nanstd(data_array)
            )
        )

    # TODO! If you actually start doing anomaly detection, you may want to rethink this a bit
    dict_out[split_key_fixed] = {}
    dict_out[split_key_fixed]["imputation_dict"] = {
        "imputation": {"mean": data_array, "ci_pos": None, "ci_neg": None}
    }
    dict_out[split_key_fixed]["metadata"] = metadata

    return dict_out

get_original_data_per_split_key

get_original_data_per_split_key(
    model_dict: dict[str, Any],
    cfg: DictConfig,
    split_key: str,
) -> dict[str, Any]

Extract and format original data for a specific baseline type.

PARAMETER DESCRIPTION
model_dict

Model dictionary containing data_input.

TYPE: dict

cfg

Configuration dictionary.

TYPE: DictConfig

split_key

Baseline type ('BASELINE_DenoisedGT' or 'BASELINE_OutlierRemovedRaw').

TYPE: str

RETURNS DESCRIPTION
dict

Dictionary keyed by split with harmonized imputation format.

RAISES DESCRIPTION
ValueError

If split_key is not recognized.

Source code in src/featurization/feature_utils.py
def get_original_data_per_split_key(
    model_dict: dict[str, Any], cfg: DictConfig, split_key: str
) -> dict[str, Any]:
    """Extract and format original data for a specific baseline type.

    Parameters
    ----------
    model_dict : dict
        Model dictionary containing data_input.
    cfg : DictConfig
        Configuration dictionary.
    split_key : str
        Baseline type ('BASELINE_DenoisedGT' or 'BASELINE_OutlierRemovedRaw').

    Returns
    -------
    dict
        Dictionary keyed by split with harmonized imputation format.

    Raises
    ------
    ValueError
        If split_key is not recognized.
    """
    results_out = {}
    for split in model_dict["data_input"].keys():
        if split_key == "BASELINE_DenoisedGT":
            data_array = model_dict["data_input"][split]["data"]["ground_truth"]["gt"]
            metadata = model_dict["data_input"][split]["metadata"]
            split_key_fixed = "gt"
        elif split_key == "BASELINE_OutlierRemovedRaw":
            data_array = model_dict["data_input"][split]["data"]["data_missing"]["raw"]
            metadata = model_dict["data_input"][split]["metadata"]
            split_key_fixed = "raw"
        else:
            logger.error(f"Unknown split_key: {split_key}")
            raise ValueError(f"Unknown split_key: {split_key}")

        results_out[split] = harmonize_to_imputation_dict(
            data_array, metadata, split_key_fixed, cfg
        )

    return results_out

name_imputation_sources_for_featurization

name_imputation_sources_for_featurization(
    sources: list[str], cfg: DictConfig
) -> list[str]

Generate featurization run names from imputation source names.

PARAMETER DESCRIPTION
sources

List of imputation source names.

TYPE: list

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
list

List of featurization run names.

Source code in src/featurization/feature_utils.py
def name_imputation_sources_for_featurization(
    sources: list[str], cfg: DictConfig
) -> list[str]:
    """Generate featurization run names from imputation source names.

    Parameters
    ----------
    sources : list
        List of imputation source names.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    list
        List of featurization run names.
    """
    sources_features = []
    for i, source in enumerate(sources):
        sources_features.append(
            define_featurization_run_name_from_base(base_name=source, cfg=cfg)
        )
    return sources_features

get_original_data_to_results

get_original_data_to_results(
    model_dict: dict[str, Any], cfg: DictConfig
) -> dict[str, Any]

Get baseline data (GT and raw) formatted as results dictionaries.

PARAMETER DESCRIPTION
model_dict

Model dictionary containing data_input.

TYPE: dict

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Dictionary keyed by baseline split keys with data and mlflow_run.

Source code in src/featurization/feature_utils.py
def get_original_data_to_results(
    model_dict: dict[str, Any], cfg: DictConfig
) -> dict[str, Any]:
    """Get baseline data (GT and raw) formatted as results dictionaries.

    Parameters
    ----------
    model_dict : dict
        Model dictionary containing data_input.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    dict
        Dictionary keyed by baseline split keys with data and mlflow_run.
    """
    results = {}
    split_keys = get_baseline_names()

    for split_key in split_keys:
        results[split_key] = {
            "data": get_original_data_per_split_key(model_dict, cfg, split_key),
            "mlflow_run": None,
        }

    return results

get_imputed_results

get_imputed_results(
    model_dict: dict[str, Any], cfg: DictConfig
) -> dict[str, Any]

Extract imputation results with metadata from model dictionary.

PARAMETER DESCRIPTION
model_dict

Model dictionary with 'imputation', 'mlflow_run', and 'data_input'.

TYPE: dict

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Dictionary with 'data', 'mlflow_run', and metadata per split.

Source code in src/featurization/feature_utils.py
def get_imputed_results(model_dict: dict[str, Any], cfg: DictConfig) -> dict[str, Any]:
    """Extract imputation results with metadata from model dictionary.

    Parameters
    ----------
    model_dict : dict
        Model dictionary with 'imputation', 'mlflow_run', and 'data_input'.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    dict
        Dictionary with 'data', 'mlflow_run', and metadata per split.
    """
    results_out = {
        "data": model_dict["imputation"],
        "mlflow_run": model_dict["mlflow_run"],
    }

    for split in model_dict["data_input"].keys():
        for split_key in results_out["data"][split].keys():
            results_out["data"][split][split_key]["metadata"] = model_dict[
                "data_input"
            ][split]["metadata"]

    return results_out

create_dict_for_featurization_from_imputation_results_and_original_data

create_dict_for_featurization_from_imputation_results_and_original_data(
    imputation_results: dict[str, Any], cfg: DictConfig
) -> dict[str, Any]

Create unified dictionary for featurization from imputation and baseline data.

Combines imputation model results with original baseline data (GT and raw) into a single dictionary structure for featurization.

PARAMETER DESCRIPTION
imputation_results

Dictionary keyed by model name containing imputation results.

TYPE: dict

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Unified dictionary with all sources ready for featurization.

Source code in src/featurization/feature_utils.py
def create_dict_for_featurization_from_imputation_results_and_original_data(
    imputation_results: dict[str, Any], cfg: DictConfig
) -> dict[str, Any]:
    """Create unified dictionary for featurization from imputation and baseline data.

    Combines imputation model results with original baseline data (GT and raw)
    into a single dictionary structure for featurization.

    Parameters
    ----------
    imputation_results : dict
        Dictionary keyed by model name containing imputation results.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    dict
        Unified dictionary with all sources ready for featurization.
    """
    for i, (model_name, model_dict) in enumerate(imputation_results.items()):
        if i == 0:
            print(f"Model name: {model_name}")
            # get the original data (i.e. "raw" without the outliers, and 'gt' that is denoised and imputed)
            results = get_original_data_to_results(model_dict, cfg)

        # Now process normally the imputed data
        results[model_name] = get_imputed_results(model_dict, cfg)

    return results

check_and_fix_df_schema

check_and_fix_df_schema(df_subject: DataFrame) -> DataFrame

Validate dataframe schema and raise error for Object type columns.

Polars Object type columns cause issues with filtering operations.

PARAMETER DESCRIPTION
df_subject

Subject dataframe to validate.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

Validated dataframe (unchanged if no issues).

RAISES DESCRIPTION
ValueError

If any column has Object dtype.

See Also

https://github.com/pola-rs/polars/issues/18399

Source code in src/featurization/feature_utils.py
def check_and_fix_df_schema(df_subject: pl.DataFrame) -> pl.DataFrame:
    """Validate dataframe schema and raise error for Object type columns.

    Polars Object type columns cause issues with filtering operations.

    Parameters
    ----------
    df_subject : pl.DataFrame
        Subject dataframe to validate.

    Returns
    -------
    pl.DataFrame
        Validated dataframe (unchanged if no issues).

    Raises
    ------
    ValueError
        If any column has Object dtype.

    See Also
    --------
    https://github.com/pola-rs/polars/issues/18399
    """
    # Check the schema of the dataframe
    # https://github.com/pola-rs/polars/issues/18399
    # Problem with some column being an Object class?
    logger.debug("Checking the column schema of the Polars dataframe:")
    for col in df_subject.columns:
        if df_subject[col].dtype == pl.datatypes.Object:
            # Cannot fix? https://stackoverflow.com/q/76829116/6412152
            logger.error(
                "Column {} is of Object type, cannot be used for featurization".format(
                    col
                )
            )
            logger.error(
                "You are creating your DataFrame incorrectly, check your code! "
                "see e.g. https://stackoverflow.com/a/76720675/6412152"
            )
            raise ValueError(
                "Column {} is of Object type, cannot be used for featurization".format(
                    col
                )
            )

    return df_subject

feature_log

get_best_outlier_detection_run

get_best_outlier_detection_run(
    simple_outlier_name: str,
    cfg: DictConfig,
    id: Optional[str] = None,
) -> Optional[DataFrame]

Retrieve the best outlier detection run from MLflow.

PARAMETER DESCRIPTION
simple_outlier_name

Simplified outlier method name (e.g., 'MOMENT-gt-finetune', 'LOF').

TYPE: str

cfg

Configuration dictionary with PREFECT flow names.

TYPE: DictConfig

id

Specific run ID to retrieve, by default None.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
DataFrame or None

Single-row DataFrame with best run info, or None if not found.

Source code in src/featurization/feature_log.py
def get_best_outlier_detection_run(
    simple_outlier_name: str, cfg: DictConfig, id: Optional[str] = None
) -> Optional[pd.DataFrame]:
    """Retrieve the best outlier detection run from MLflow.

    Parameters
    ----------
    simple_outlier_name : str
        Simplified outlier method name (e.g., 'MOMENT-gt-finetune', 'LOF').
    cfg : DictConfig
        Configuration dictionary with PREFECT flow names.
    id : str, optional
        Specific run ID to retrieve, by default None.

    Returns
    -------
    pd.DataFrame or None
        Single-row DataFrame with best run info, or None if not found.
    """
    # hacky fix for TimesNet ambiguities
    if simple_outlier_name == "TimesNet-gt":
        # the -gt was not used when running the anomaly detection, was only added now later for better
        # name for downstream processing, need to harmonize the names and the codes later
        simple_outlier_name = "TimesNet"

    experiment_name = experiment_name_wrapper(
        experiment_name=cfg["PREFECT"]["FLOW_NAMES"]["OUTLIER_DETECTION"], cfg=cfg
    )
    mlflow_runs: pd.DataFrame = mlflow.search_runs(experiment_names=[experiment_name])
    best_dict = get_best_dict("outlier_detection", cfg)

    if id is None:
        # keep the ones that contain the simple_outlier_name
        fields = simple_outlier_name.split("-")
        if len(fields) == 2:
            model_name = fields[0]  # e.g. MOMENT
            model_type = fields[1]  # e.g. zeroshot
            runs = mlflow_runs[
                mlflow_runs["tags.mlflow.runName"].str.contains(model_type)
                & mlflow_runs["tags.mlflow.runName"].str.contains(model_name)
            ]
        elif len(fields) == 1:
            model_name = simple_outlier_name  # e.g. SAITS
            runs = mlflow_runs[
                mlflow_runs["tags.mlflow.runName"].str.contains(model_name)
            ]
        elif len(fields) == 3:
            model_name = fields[0]  # e.g. MOMENT
            data_type = fields[1]  # e.g. gt/orig
            model_type = fields[2]  # e.g. zeroshot
            runs = mlflow_runs[
                mlflow_runs["tags.mlflow.runName"].str.contains(model_type)
                & mlflow_runs["tags.mlflow.runName"].str.contains(data_type)
                & mlflow_runs["tags.mlflow.runName"].str.contains(model_name)
            ]
        else:
            if "ensembleThreshold" in simple_outlier_name:
                model_name = "-".join(fields[1:3])
                runs = mlflow_runs[
                    mlflow_runs["tags.mlflow.runName"].str.contains(model_name)
                ]
            elif "ensemble" in simple_outlier_name:
                # we removed the name from this making this more difficult here :S
                field0corr = fields[0] + "Thresholded"
                model_name = (
                    field0corr + "-" + "-".join(fields[1:3])
                )  # hacky as things have been renamed TODO!
                runs = mlflow_runs[
                    mlflow_runs["tags.mlflow.runName"].str.contains(model_name)
                ]
            else:
                logger.error(
                    f"Unsupported number ({len(fields)}) of name fields: {fields}"
                )
                raise ValueError(
                    f"Unsupported number ({len(fields)}) of name fields: {fields}"
                )

        if runs.shape[0] > 0:
            if "ensemble" not in simple_outlier_name:
                runs = drop_ensemble_runs(runs)

            best_run = get_best_run_dict(
                runs, best_dict, task="outlier_detection"
            ).iloc[0:1]
            assert best_run.shape[0] == 1, "You should have only one best run"
            return best_run
        else:
            logger.warning("Could not find any best run?")
            return None
    else:
        run = mlflow_runs[mlflow_runs["run_id"] == id]
        return run

get_best_outlier_run

get_best_outlier_run(
    mlflow_run: Series, source_name: str, cfg: DictConfig
) -> tuple[Optional[DataFrame], Optional[str]]

Get best outlier detection run associated with an imputation source.

PARAMETER DESCRIPTION
mlflow_run

MLflow run information for the imputation.

TYPE: Series

source_name

Source name containing outlier method (e.g., 'simple1.0__LOF__SAITS').

TYPE: str

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
tuple

(best_outlier_run, outlier_run_id) or (None, None) for pupil-gt.

Source code in src/featurization/feature_log.py
def get_best_outlier_run(
    mlflow_run: pd.Series, source_name: str, cfg: DictConfig
) -> tuple[Optional[pd.DataFrame], Optional[str]]:
    """Get best outlier detection run associated with an imputation source.

    Parameters
    ----------
    mlflow_run : pd.Series
        MLflow run information for the imputation.
    source_name : str
        Source name containing outlier method (e.g., 'simple1.0__LOF__SAITS').
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    tuple
        (best_outlier_run, outlier_run_id) or (None, None) for pupil-gt.
    """
    simple_outlier_name = source_name.split("__")[1]
    if "pupil" in simple_outlier_name:
        logger.debug("Using human-annotated Pupil data, no outlier detection")
        return None, None

    else:
        if "Outlier_run_id" in mlflow_run.keys():
            outlier_run_id = mlflow_run["Outlier_run_id"]
            best_outlier_run = get_best_outlier_detection_run(
                simple_outlier_name=simple_outlier_name, cfg=cfg, id=outlier_run_id
            )
        else:
            # Older runs did not log this, and requires maybe some twiddling to get this
            # And ensemble runs obviously do not have this anymore
            try:
                best_outlier_run = get_best_outlier_detection_run(
                    simple_outlier_name=source_name.split("__")[1], cfg=cfg
                )
                if best_outlier_run is not None:
                    outlier_run_id = str(best_outlier_run["run_id"].values[0])
                else:
                    outlier_run_id = None
            except Exception as e:
                logger.error(f"Could not get the outlier run id: {e}")
                raise e
                # return None, None

        if "ensemble" in source_name:
            if best_outlier_run is None:
                logger.info(
                    "Ensembled imputation, thus no single anomaly detection can be identified"
                )
                logger.info("You can ignore the warning now")

        return best_outlier_run, outlier_run_id

metrics_when_anomaly_detection_pupil_gt

metrics_when_anomaly_detection_pupil_gt(
    best_outlier_string: str,
) -> None

Log MLflow metrics for human-annotated ground truth outlier detection.

Sets perfect metrics (F1=1, FP=0) for ground truth data.

PARAMETER DESCRIPTION
best_outlier_string

Metric name suffix for logging.

TYPE: str

Source code in src/featurization/feature_log.py
def metrics_when_anomaly_detection_pupil_gt(best_outlier_string: str) -> None:
    """Log MLflow metrics for human-annotated ground truth outlier detection.

    Sets perfect metrics (F1=1, FP=0) for ground truth data.

    Parameters
    ----------
    best_outlier_string : str
        Metric name suffix for logging.
    """
    mlflow.log_param("Outlier_run_id", None)
    mlflow.log_metric(f"Outlier_{best_outlier_string}", 1)
    mlflow.log_metric("Outlier_fp", 0)
    mlflow.log_metric("Outlier_f1__easy", 1)

featurization_mlflow_metrics_and_params

featurization_mlflow_metrics_and_params(
    mlflow_run: Optional[Series],
    source_name: str,
    cfg: DictConfig,
) -> None

Log featurization metrics and parameters to MLflow.

Logs imputation and outlier detection metrics from upstream runs.

PARAMETER DESCRIPTION
mlflow_run

MLflow run series from imputation, or None for baseline data.

TYPE: Series or None

source_name

Source name containing outlier and imputation method info.

TYPE: str

cfg

Configuration dictionary.

TYPE: DictConfig

Source code in src/featurization/feature_log.py
def featurization_mlflow_metrics_and_params(
    mlflow_run: Optional[pd.Series], source_name: str, cfg: DictConfig
) -> None:
    """Log featurization metrics and parameters to MLflow.

    Logs imputation and outlier detection metrics from upstream runs.

    Parameters
    ----------
    mlflow_run : pd.Series or None
        MLflow run series from imputation, or None for baseline data.
    source_name : str
        Source name containing outlier and imputation method info.
    cfg : DictConfig
        Configuration dictionary.
    """
    best_imput_dict = get_best_dict("imputation", cfg)
    best_imput_string = best_imput_dict["string"].replace("metrics.", "")
    best_outlier_dict = get_best_dict("outlier_detection", cfg)
    best_outlier_string = best_outlier_dict["string"].replace("metrics.", "")

    if mlflow_run is None:
        mlflow.log_param("Data source", source_name.split("__")[0])
        mlflow.log_param("Imputation_run_id", None)
        mlflow.log_metric(f"Imputation_{best_imput_string}", 0)
        metrics_when_anomaly_detection_pupil_gt(best_outlier_string)

    else:
        mlflow.log_param("Imputation_run_id", mlflow_run["run_id"])
        col_name = get_best_imputation_col_name(best_metric_cfg=best_imput_dict)
        mlflow.log_metric(f"Imputation_{best_imput_string}", mlflow_run[col_name])
        best_outlier_run, outlier_run_id = get_best_outlier_run(
            mlflow_run, source_name, cfg
        )

        if best_outlier_run is not None:
            mlflow.log_param("Outlier_run_id", outlier_run_id)
            col_name = get_best_imputation_col_name(best_metric_cfg=best_outlier_dict)
            # best_outlier_value = best_outlier_run[best_outlier_dict["string"]].values[0]
            best_outlier_value = best_outlier_run[col_name].values[0]
            mlflow.log_metric(f"Outlier_{best_outlier_string}", best_outlier_value)

            # add some extra metrics here
            metric1 = "fp"
            col_name = col_name.replace(best_outlier_string, metric1)
            value = best_outlier_run[col_name].values[0]
            mlflow.log_metric(f"Outlier_{metric1}", value)

            metric2 = "f1__easy"
            col_name = col_name.replace(metric1, metric2)
            value = best_outlier_run[col_name].values[0]
            mlflow.log_metric(f"Outlier_{metric2}", value)

        else:
            model_name, anomaly_source = parse_imputation_run_name_for_ensemble(
                source_name
            )
            if anomaly_source == "pupil-gt":
                metrics_when_anomaly_detection_pupil_gt(best_outlier_string)
            else:
                logger.warning("Could not find any best outlier run?")
                mlflow.log_param("Outlier_run_id", None)
                mlflow.log_metric(f"Outlier_{best_outlier_string}", np.nan)

export_features_to_mlflow

export_features_to_mlflow(
    features: dict, run_name: str, cfg: DictConfig
) -> None

Export features to local pickle and log to MLflow.

PARAMETER DESCRIPTION
features

Features dictionary to export.

TYPE: dict

run_name

Run name used for file naming.

TYPE: str

cfg

Configuration dictionary.

TYPE: DictConfig

Source code in src/featurization/feature_log.py
def export_features_to_mlflow(features: dict, run_name: str, cfg: DictConfig) -> None:
    """Export features to local pickle and log to MLflow.

    Parameters
    ----------
    features : dict
        Features dictionary to export.
    run_name : str
        Run name used for file naming.
    cfg : DictConfig
        Configuration dictionary.
    """
    # Local pickle export
    output_path = export_features_pickle_file(features, run_name, cfg)

    # Log the same to MLflow
    log_features_to_mlflow(run_name, output_path, features["mlflow_run"], cfg)

log_features_to_mlflow

log_features_to_mlflow(
    run_name: str,
    output_path: str,
    mlflow_run: Optional[Series],
    cfg: DictConfig,
) -> None

Log feature pickle file as MLflow artifact.

PARAMETER DESCRIPTION
run_name

Run name for logging.

TYPE: str

output_path

Path to the pickle file.

TYPE: str

mlflow_run

MLflow run information (currently unused).

TYPE: Series or None

cfg

Configuration dictionary (currently unused).

TYPE: DictConfig

Source code in src/featurization/feature_log.py
def log_features_to_mlflow(
    run_name: str, output_path: str, mlflow_run: Optional[pd.Series], cfg: DictConfig
) -> None:
    """Log feature pickle file as MLflow artifact.

    Parameters
    ----------
    run_name : str
        Run name for logging.
    output_path : str
        Path to the pickle file.
    mlflow_run : pd.Series or None
        MLflow run information (currently unused).
    cfg : DictConfig
        Configuration dictionary (currently unused).
    """
    logger.info(
        "Logging features ({}) as a pickled artifact to MLflow".format(run_name)
    )
    mlflow.log_artifact(output_path, artifact_path="features")

get_best_run_per_source

get_best_run_per_source(
    cfg: DictConfig,
    experiment_name: str = "PLR_Featurization",
    skip_embeddings: bool = True,
) -> dict[str, Series]

Get the latest MLflow run for each unique data source.

PARAMETER DESCRIPTION
cfg

Configuration dictionary.

TYPE: DictConfig

experiment_name

MLflow experiment name, by default 'PLR_Featurization'.

TYPE: str DEFAULT: 'PLR_Featurization'

skip_embeddings

If True, exclude embedding sources, by default True.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
dict

Dictionary keyed by source name containing best run Series.

RAISES DESCRIPTION
ValueError

If no runs found for the experiment or a specific source.

Source code in src/featurization/feature_log.py
def get_best_run_per_source(
    cfg: DictConfig,
    experiment_name: str = "PLR_Featurization",
    skip_embeddings: bool = True,
) -> dict[str, pd.Series]:
    """Get the latest MLflow run for each unique data source.

    Parameters
    ----------
    cfg : DictConfig
        Configuration dictionary.
    experiment_name : str, optional
        MLflow experiment name, by default 'PLR_Featurization'.
    skip_embeddings : bool, optional
        If True, exclude embedding sources, by default True.

    Returns
    -------
    dict
        Dictionary keyed by source name containing best run Series.

    Raises
    ------
    ValueError
        If no runs found for the experiment or a specific source.
    """
    mlflow_runs: pd.DataFrame = mlflow.search_runs(experiment_names=[experiment_name])
    if mlflow_runs.shape[0] == 0:
        logger.error(
            "No MLflow featurization runs found from {}".format(experiment_name)
        )
        logger.error(
            "Did you run the previous steps (anomaly/outlier detection, "
            "imputation and featurization for this experiment?)?"
        )
        logger.error(
            'Check values in "PROCESS_FLOWS" of your defaults.yaml for example? Should be True'
        )
        raise ValueError(
            "No MLflow featurization runs found from {}".format(experiment_name)
        )

    unique_sources = mlflow_runs["tags.mlflow.runName"].unique()
    if skip_embeddings:
        logger.info(
            "Skipping classification from the embedding sources, only using the hand-crafted features"
        )
        unique_sources = [
            source for source in unique_sources if "embedding" not in source
        ]

    runs = {}
    for source in unique_sources:
        runs_per_source = mlflow_runs[mlflow_runs["tags.mlflow.runName"] == source]
        if runs_per_source.shape[0] == 0:
            logger.error(
                f"No runs found for source {source} (some glitch here as we already found runs with this?"
            )
            raise ValueError(f"No runs found for source {source}")
        # Get the latest run (TODO! you could propagate the MAE from imputation to featurization as a parameter)
        best_run = runs_per_source.sort_values("start_time", ascending=False).iloc[0]
        if best_run.shape[0] == 0:
            logger.error(f"No best run found for source {source}")
            raise ValueError(f"No best run found for source {source}")
        runs[source] = best_run

    return runs

get_mlflow_run_by_id

get_mlflow_run_by_id(
    run_id: str,
    source: str,
    data_source: Optional[str],
    model_name: Optional[str],
    cfg: DictConfig,
    task_key: str = "OUTLIER_DETECTION",
) -> Optional[Series]

Retrieve a specific MLflow run by its ID.

PARAMETER DESCRIPTION
run_id

MLflow run ID to retrieve.

TYPE: str

source

Source name for logging.

TYPE: str

data_source

Data source identifier.

TYPE: str

model_name

Model name for logging.

TYPE: str

cfg

Configuration dictionary.

TYPE: DictConfig

task_key

Task key for experiment name lookup, by default 'OUTLIER_DETECTION'.

TYPE: str DEFAULT: 'OUTLIER_DETECTION'

RETURNS DESCRIPTION
Series or None

Run information as Series, or None if not found.

Source code in src/featurization/feature_log.py
def get_mlflow_run_by_id(
    run_id: str,
    source: str,
    data_source: Optional[str],
    model_name: Optional[str],
    cfg: DictConfig,
    task_key: str = "OUTLIER_DETECTION",
) -> Optional[pd.Series]:
    """Retrieve a specific MLflow run by its ID.

    Parameters
    ----------
    run_id : str
        MLflow run ID to retrieve.
    source : str
        Source name for logging.
    data_source : str
        Data source identifier.
    model_name : str
        Model name for logging.
    cfg : DictConfig
        Configuration dictionary.
    task_key : str, optional
        Task key for experiment name lookup, by default 'OUTLIER_DETECTION'.

    Returns
    -------
    pd.Series or None
        Run information as Series, or None if not found.
    """
    experiment_name = experiment_name_wrapper(
        experiment_name=cfg["PREFECT"]["FLOW_NAMES"][task_key], cfg=cfg
    )
    mlflow_runs: pd.DataFrame = mlflow.search_runs(experiment_names=[experiment_name])
    run = mlflow_runs[mlflow_runs["run_id"] == run_id]
    # assert run.shape[0] > 1, "The run_id = {} was not found from runs?".format(run_id)
    # assert run.shape[0] == 1, "You should have only one run"
    if run.shape[0] != 1:
        # e.g. The run_id = simple1.0__TimesNet__MOMENT-finetune was not found from runs?
        # (source = None, data_source = None, model = 98aa458aaab94323883a8be9afe3a63f)
        # This might happen if you re-run anomaly detection without re-running the imputation, thus even though
        # the results would converge to the same loss/metric, the preceding tasks are not exactly the same
        # This is fine behavior when you are debugging and developing, but maybe you want to run all the imputations
        # and other downstream tasks for the exactly correct preceding tasks
        logger.error(
            "The run_id = {} was not found from runs? (source = {}, data_source = {}, model = {})".format(
                source, data_source, model_name, run_id
            )
        )
        return None

    else:
        return run.iloc[0]

import_features_per_source

import_features_per_source(
    source: str,
    run: Series,
    cfg: DictConfig,
    subdir: str = "features",
) -> dict

Import features from an MLflow run artifact.

PARAMETER DESCRIPTION
source

Data source name.

TYPE: str

run

MLflow run information.

TYPE: Series

cfg

Configuration dictionary.

TYPE: DictConfig

subdir

Artifact subdirectory, by default 'features'.

TYPE: str DEFAULT: 'features'

RETURNS DESCRIPTION
dict

Features dictionary with structure: - data: dict with 'train' and 'test' pl.DataFrames - mlflow_run_imputation: MLflow run info

RAISES DESCRIPTION
Exception

If artifact download fails.

Source code in src/featurization/feature_log.py
def import_features_per_source(
    source: str, run: pd.Series, cfg: DictConfig, subdir: str = "features"
) -> dict:
    """Import features from an MLflow run artifact.

    Parameters
    ----------
    source : str
        Data source name.
    run : pd.Series
        MLflow run information.
    cfg : DictConfig
        Configuration dictionary.
    subdir : str, optional
        Artifact subdirectory, by default 'features'.

    Returns
    -------
    dict
        Features dictionary with structure:
        - data: dict with 'train' and 'test' pl.DataFrames
        - mlflow_run_imputation: MLflow run info

    Raises
    ------
    Exception
        If artifact download fails.
    """
    # TODO! You could check here if there any artifacts saved (mlflow did not run until the end),
    #  or you simply have the name incorrect
    artifact_uri = get_feature_pickle_artifact_uri(run, source, cfg, subdir=subdir)
    try:
        feature_pickle = mlflow.artifacts.download_artifacts(artifact_uri=artifact_uri)
        features_per_source = load_results_dict(feature_pickle)
    except Exception as e:
        try:
            # Try to extract file path from error and load directly
            # This handles cases where MLflow URI doesn't match current system
            file_path = e.args[0].split(": '/")[1].replace('"', "").replace("'", "")
            if os.path.exists(file_path):
                features_per_source = load_results_dict(file_path)
            else:
                logger.error(f"File not found: {file_path}")
                raise e
        except Exception as e2:
            logger.error(f"Error downloading artifact: {e2}")
            raise e
    return features_per_source

import_features_from_best_runs

import_features_from_best_runs(
    best_runs: dict[str, Series], cfg: DictConfig
) -> dict

Import features from multiple best runs and add MLflow metadata.

PARAMETER DESCRIPTION
best_runs

Dictionary keyed by source name with MLflow run Series.

TYPE: dict

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Features dictionary with added mlflow_run_featurization and mlflow_run_outlier_detection information.

Source code in src/featurization/feature_log.py
def import_features_from_best_runs(
    best_runs: dict[str, pd.Series], cfg: DictConfig
) -> dict:
    """Import features from multiple best runs and add MLflow metadata.

    Parameters
    ----------
    best_runs : dict
        Dictionary keyed by source name with MLflow run Series.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    dict
        Features dictionary with added mlflow_run_featurization and
        mlflow_run_outlier_detection information.
    """
    features = {}
    for source, run in best_runs.items():
        if "embedding" in source:
            # from embeddings
            features[source] = import_features_per_source(
                source, run, cfg, subdir="embeddings"
            )
        else:
            # From hand-crafted features
            features[source] = import_features_per_source(source, run, cfg)

        if features[source] is not None:
            # rename the "mlflow_run" key to "mlflow_run_imputation"
            features[source]["mlflow_run_imputation"] = features[source].pop(
                "mlflow_run"
            )
            # add also the featurization run
            features[source]["mlflow_run_featurization"] = run
            if "params.Outlier_run_id" in run:
                if run["params.Outlier_run_id"] != "None":
                    if "params.model_name" in run:
                        model_name = run["params.model_name"]
                    else:
                        model_name = None
                    features[source]["mlflow_run_outlier_detection"] = (
                        get_mlflow_run_by_id(
                            run_id=run["params.Outlier_run_id"],
                            source=source,
                            data_source=run["params.Data source"],
                            model_name=model_name,
                            cfg=cfg,
                        )
                    )
                else:
                    # None when using the manually annotated data
                    features[source]["mlflow_run_outlier_detection"] = None
            else:
                features[source]["mlflow_run_outlier_detection"] = None

    return features

import_features_from_mlflow

import_features_from_mlflow(
    cfg: DictConfig,
    experiment_name: str = "PLR_Featurization",
) -> dict

Import features from MLflow for all data sources.

Retrieves best runs and downloads feature artifacts.

PARAMETER DESCRIPTION
cfg

Configuration dictionary.

TYPE: DictConfig

experiment_name

MLflow experiment name, by default 'PLR_Featurization'.

TYPE: str DEFAULT: 'PLR_Featurization'

RETURNS DESCRIPTION
dict

Features dictionary keyed by source name.

Source code in src/featurization/feature_log.py
def import_features_from_mlflow(
    cfg: DictConfig, experiment_name: str = "PLR_Featurization"
) -> dict:
    """Import features from MLflow for all data sources.

    Retrieves best runs and downloads feature artifacts.

    Parameters
    ----------
    cfg : DictConfig
        Configuration dictionary.
    experiment_name : str, optional
        MLflow experiment name, by default 'PLR_Featurization'.

    Returns
    -------
    dict
        Features dictionary keyed by source name.
    """
    # Get the best (latest) MLflow run for each data source
    best_runs = get_best_run_per_source(
        cfg,
        experiment_name,
        skip_embeddings=cfg["CLASSIFICATION_SETTINGS"]["train_from_embeddings"],
    )

    # Import the features from the best runs
    features = import_features_from_best_runs(best_runs, cfg)

    return features

Handcrafted Features

subflow_handcrafted_featurization

flow_handcrafted_featurization

flow_handcrafted_featurization(
    cfg: DictConfig,
    sources: dict,
    experiment_name: str,
    prev_experiment_name: str,
) -> None

Execute handcrafted featurization for all data sources.

Iterates through all imputation sources and feature configurations, running the featurization script for each combination.

PARAMETER DESCRIPTION
cfg

Configuration dictionary with PLR_FEATURIZATION settings.

TYPE: DictConfig

sources

Dictionary of data sources keyed by source name.

TYPE: dict

experiment_name

MLflow experiment name for featurization.

TYPE: str

prev_experiment_name

Previous experiment name (imputation).

TYPE: str

Source code in src/featurization/subflow_handcrafted_featurization.py
def flow_handcrafted_featurization(
    cfg: DictConfig, sources: dict, experiment_name: str, prev_experiment_name: str
) -> None:
    """Execute handcrafted featurization for all data sources.

    Iterates through all imputation sources and feature configurations,
    running the featurization script for each combination.

    Parameters
    ----------
    cfg : DictConfig
        Configuration dictionary with PLR_FEATURIZATION settings.
    sources : dict
        Dictionary of data sources keyed by source name.
    experiment_name : str
        MLflow experiment name for featurization.
    prev_experiment_name : str
        Previous experiment name (imputation).
    """
    # Define the featurization methods
    # 1) You could use multiple .YAML files to define the hand-crafted features
    # 2) You could train MOMENT embeddings and use those for your classifier on next stage (e.g. XGBoost
    # placeholder atm
    feature_cfgs = {get_feature_name_from_cfg(cfg): cfg["PLR_FEATURIZATION"]}

    no_of_runs = len(sources) * len(feature_cfgs)
    run_idx = 0
    # Featurize (or skip if previous results found from MLflow)
    for source_idx, (source_name, source_data) in enumerate(sources.items()):
        for idx, (featurization_method, feature_cfg) in enumerate(feature_cfgs.items()):
            logger.info(f"Source #{source_idx + 1}/{len(sources)}: {source_name}")
            logger.info(
                f"Running pipeline for featurization method #{idx + 1}/{len(feature_cfgs)}: {featurization_method}"
            )
            run_name = f"{featurization_method}__{source_name}"
            logger.info(f"Run name #{run_idx + 1}/{no_of_runs}: {run_name}")
            run_idx += 1

            featurization_script(
                experiment_name=experiment_name,
                prev_experiment_name=prev_experiment_name,
                cfg=cfg,
                source_name=source_name,
                source_data=source_data,
                featurization_method=featurization_method,
                feature_cfg=feature_cfg,
                run_name=run_name,
            )

Embedding Features

subflow_embedding

embedding_script

embedding_script(
    cfg: DictConfig,
    source_name: str,
    source_data: dict,
    model_name: str,
    embedding_cfg: DictConfig,
    run_name: str,
    pre_embedding_cfg: DictConfig,
)

Execute embedding extraction for a single source.

Dispatches to model-specific embedding functions.

PARAMETER DESCRIPTION
cfg

Main configuration dictionary.

TYPE: DictConfig

source_name

Name of the data source.

TYPE: str

source_data

Source data dictionary.

TYPE: dict

model_name

Embedding model name (e.g., 'MOMENT').

TYPE: str

embedding_cfg

Embedding-specific configuration.

TYPE: DictConfig

run_name

MLflow run name.

TYPE: str

pre_embedding_cfg

Pre-embedding/post-processing configuration (e.g., PCA).

TYPE: DictConfig

RAISES DESCRIPTION
NotImplementedError

If model_name is not supported.

Source code in src/featurization/embedding/subflow_embedding.py
def embedding_script(
    cfg: DictConfig,
    source_name: str,
    source_data: dict,
    model_name: str,
    embedding_cfg: DictConfig,
    run_name: str,
    pre_embedding_cfg: DictConfig,
):
    """Execute embedding extraction for a single source.

    Dispatches to model-specific embedding functions.

    Parameters
    ----------
    cfg : DictConfig
        Main configuration dictionary.
    source_name : str
        Name of the data source.
    source_data : dict
        Source data dictionary.
    model_name : str
        Embedding model name (e.g., 'MOMENT').
    embedding_cfg : DictConfig
        Embedding-specific configuration.
    run_name : str
        MLflow run name.
    pre_embedding_cfg : DictConfig
        Pre-embedding/post-processing configuration (e.g., PCA).

    Raises
    ------
    NotImplementedError
        If model_name is not supported.
    """
    if model_name == "MOMENT":
        moment_embedder(
            source_data=source_data,
            source_name=source_name,
            model_cfg=embedding_cfg,
            cfg=cfg,
            run_name=run_name,
            model_name=model_name,
            pre_embedding_cfg=pre_embedding_cfg,
        )

    else:
        logger.error("Model {} not implemented! Typo?".format(model_name))
        raise NotImplementedError("Model {} not implemented!".format(model_name))

if_embedding_not_done

if_embedding_not_done(run_name, experiment_name, cfg)

Check if embedding run has already been completed.

PARAMETER DESCRIPTION
run_name

Name of the run to check.

TYPE: str

experiment_name

MLflow experiment name.

TYPE: str

cfg

Configuration dictionary (currently unused).

TYPE: DictConfig

RETURNS DESCRIPTION
bool

True if embedding needs to be computed, False if already done.

Source code in src/featurization/embedding/subflow_embedding.py
def if_embedding_not_done(run_name, experiment_name, cfg):
    """Check if embedding run has already been completed.

    Parameters
    ----------
    run_name : str
        Name of the run to check.
    experiment_name : str
        MLflow experiment name.
    cfg : DictConfig
        Configuration dictionary (currently unused).

    Returns
    -------
    bool
        True if embedding needs to be computed, False if already done.
    """
    mlflow_runs: pd.DataFrame = mlflow.search_runs(experiment_names=[experiment_name])
    run_names = list(mlflow_runs["tags.mlflow.runName"])
    if run_name not in run_names:
        return True
    else:
        logger.info("Embedding featurization already done!")
        return False

flow_embedding

flow_embedding(
    cfg: DictConfig,
    sources: dict,
    experiment_name: str,
    prev_experiment_name: str,
)

Execute embedding extraction flow for all sources and configurations.

Iterates through sources, embedding models, and preprocessing methods, computing embeddings for each combination.

PARAMETER DESCRIPTION
cfg

Configuration dictionary with PLR_EMBEDDING and EMBEDDING settings.

TYPE: DictConfig

sources

Dictionary of data sources keyed by source name.

TYPE: dict

experiment_name

MLflow experiment name for featurization.

TYPE: str

prev_experiment_name

Previous experiment name (imputation).

TYPE: str

Source code in src/featurization/embedding/subflow_embedding.py
def flow_embedding(
    cfg: DictConfig, sources: dict, experiment_name: str, prev_experiment_name: str
):
    """Execute embedding extraction flow for all sources and configurations.

    Iterates through sources, embedding models, and preprocessing methods,
    computing embeddings for each combination.

    Parameters
    ----------
    cfg : DictConfig
        Configuration dictionary with PLR_EMBEDDING and EMBEDDING settings.
    sources : dict
        Dictionary of data sources keyed by source name.
    experiment_name : str
        MLflow experiment name for featurization.
    prev_experiment_name : str
        Previous experiment name (imputation).
    """
    embedding_cfgs = {"MOMENT": cfg["PLR_EMBEDDING"]}

    preprocessing_cfgs = {"HighDim": None, "PCA": cfg["EMBEDDING"]["PREPROCESSING"]}

    no_of_runs = len(sources) * len(embedding_cfgs) * len(preprocessing_cfgs)
    run_idx = 0
    # Featurize (or skip if previous results found from MLflow)
    for source_idx, (source_name, source_data) in enumerate(sources.items()):
        for idx, (model_name, embedding_cfg) in enumerate(embedding_cfgs.items()):
            for pre_idx, (preproc_name, pre_embedding_cfg) in enumerate(
                preprocessing_cfgs.items()
            ):
                logger.info(f"Source #{source_idx + 1}/{len(sources)}: {source_name}")
                logger.info(
                    f"Running pipeline for embedding method #{idx + 1}/{len(embedding_cfgs)}: {model_name}"
                )

                if pre_embedding_cfg is None:
                    run_name = f"{model_name}-embedding__{source_name}"
                else:
                    run_name = f"{model_name}-embedding-{preproc_name}__{source_name}"
                logger.info(f"Run name #{run_idx + 1}/{no_of_runs}: {run_name}")
                run_idx += 1

                if if_embedding_not_done(run_name, experiment_name, cfg):
                    embedding_script(
                        cfg=cfg,
                        source_name=source_name,
                        source_data=source_data,
                        model_name=model_name,
                        embedding_cfg=embedding_cfg[model_name],
                        run_name=run_name,
                        pre_embedding_cfg=pre_embedding_cfg,
                    )
                else:
                    logger.info("Embedding already done, skipping now")

moment_embedding

log_embeddings_to_mlflow

log_embeddings_to_mlflow(
    embeddings: dict[str, Any],
    run_name: str,
    model_name: str,
    source_name: str,
    save_as_numpy: bool = True,
) -> None

Save embeddings to disk and log as MLflow artifacts.

PARAMETER DESCRIPTION
embeddings

Dictionary with 'data' containing DataFrames per split.

TYPE: dict

run_name

Run name for file naming.

TYPE: str

model_name

Model name for file naming.

TYPE: str

source_name

Source name (currently unused).

TYPE: str

save_as_numpy

If True, save per-split numpy arrays, by default True.

TYPE: bool DEFAULT: True

Source code in src/featurization/embedding/moment_embedding.py
def log_embeddings_to_mlflow(
    embeddings: dict[str, Any],
    run_name: str,
    model_name: str,
    source_name: str,
    save_as_numpy: bool = True,
) -> None:
    """Save embeddings to disk and log as MLflow artifacts.

    Parameters
    ----------
    embeddings : dict
        Dictionary with 'data' containing DataFrames per split.
    run_name : str
        Run name for file naming.
    model_name : str
        Model name for file naming.
    source_name : str
        Source name (currently unused).
    save_as_numpy : bool, optional
        If True, save per-split numpy arrays, by default True.
    """
    dir_out = Path(get_artifacts_dir("embeddings"))

    # as numpy arrays
    if save_as_numpy:
        for split, df in embeddings["data"].items():
            embedding_fname = get_embedding_npy_fname(model_name, split)
            path_out = dir_out / embedding_fname
            if path_out.exists():
                path_out.unlink()
            np.save(str(path_out), df.to_numpy())  # e.g. (16, 1024)
            mlflow.log_artifact(str(path_out), "embeddings")

    # as single pickled thing
    # e.g. MOMENT-embedding__pupil-gt__pupil-gt.pickle
    path_out = dir_out / get_features_pickle_fname(run_name)
    save_results_dict(embeddings, str(path_out))
    mlflow.log_artifact(str(path_out), "embeddings")

get_dataframe_from_dict

get_dataframe_from_dict(
    split_dict_subject: dict[str, dict[str, ndarray]],
    cfg: DictConfig,
    drop_col_wildcard: str = "mask",
) -> DataFrame

Convert subject dictionary to metadata DataFrame.

PARAMETER DESCRIPTION
split_dict_subject

Subject dictionary with metadata and labels.

TYPE: dict

cfg

Configuration dictionary (currently unused).

TYPE: DictConfig

drop_col_wildcard

Wildcard for columns to drop, by default 'mask'.

TYPE: str DEFAULT: 'mask'

RETURNS DESCRIPTION
DataFrame

DataFrame with metadata columns prefixed with 'metadata_'.

Source code in src/featurization/embedding/moment_embedding.py
def get_dataframe_from_dict(
    split_dict_subject: dict[str, dict[str, np.ndarray]],
    cfg: DictConfig,
    drop_col_wildcard: str = "mask",
) -> pl.DataFrame:
    """Convert subject dictionary to metadata DataFrame.

    Parameters
    ----------
    split_dict_subject : dict
        Subject dictionary with metadata and labels.
    cfg : DictConfig
        Configuration dictionary (currently unused).
    drop_col_wildcard : str, optional
        Wildcard for columns to drop, by default 'mask'.

    Returns
    -------
    pl.DataFrame
        DataFrame with metadata columns prefixed with 'metadata_'.
    """
    df = convert_subject_dict_of_arrays_to_df(
        split_dict_subject, wildcard_categories=["metadata", "labels"]
    )
    # drop the mask columns (that do not make any sense as a single row)
    drop_cols = [i for i in df.columns if drop_col_wildcard in i]
    df = df.drop(drop_cols)
    df = df.rename(lambda column_name: "metadata_" + column_name)
    return df

create_pseudo_embedding_std

create_pseudo_embedding_std(
    embeddings_out: ndarray | DataFrame,
) -> DataFrame

Create placeholder standard deviation columns for embeddings.

Creates zero-filled columns to match handcrafted feature format.

PARAMETER DESCRIPTION
embeddings_out

Embedding array with shape (n_samples, n_features).

TYPE: ndarray or DataFrame

RETURNS DESCRIPTION
DataFrame

DataFrame with columns 'embedding{i}_std' filled with zeros.

Source code in src/featurization/embedding/moment_embedding.py
def create_pseudo_embedding_std(
    embeddings_out: np.ndarray | pl.DataFrame,
) -> pl.DataFrame:
    """Create placeholder standard deviation columns for embeddings.

    Creates zero-filled columns to match handcrafted feature format.

    Parameters
    ----------
    embeddings_out : np.ndarray or pl.DataFrame
        Embedding array with shape (n_samples, n_features).

    Returns
    -------
    pl.DataFrame
        DataFrame with columns 'embedding{i}_std' filled with zeros.
    """
    df_stdev = pl.DataFrame(
        np.zeros_like(embeddings_out),
        schema=[f"embedding{i}_std" for i in range(embeddings_out.shape[1])],
    )
    return df_stdev

create_embeddings_df

create_embeddings_df(embeddings_out: ndarray) -> DataFrame

Create DataFrame from embedding array.

PARAMETER DESCRIPTION
embeddings_out

Embedding array with shape (n_samples, n_features).

TYPE: ndarray

RETURNS DESCRIPTION
DataFrame

DataFrame with columns 'embedding{i}_value'.

Source code in src/featurization/embedding/moment_embedding.py
def create_embeddings_df(embeddings_out: np.ndarray) -> pl.DataFrame:
    """Create DataFrame from embedding array.

    Parameters
    ----------
    embeddings_out : np.ndarray
        Embedding array with shape (n_samples, n_features).

    Returns
    -------
    pl.DataFrame
        DataFrame with columns 'embedding{i}_value'.
    """
    df_embeddings = pl.DataFrame(
        embeddings_out,
        schema=[f"embedding{i}_value" for i in range(embeddings_out.shape[1])],
    )
    return df_embeddings

create_split_embedding_df

create_split_embedding_df(
    embeddings_out: ndarray,
    subject_codes: list[str],
    df_metadata: DataFrame,
) -> DataFrame

Create complete embedding DataFrame with metadata and codes.

PARAMETER DESCRIPTION
embeddings_out

Embedding array with shape (n_samples, n_features).

TYPE: ndarray

subject_codes

List of subject identifiers.

TYPE: list

df_metadata

Metadata DataFrame for subjects.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

Combined DataFrame with subject_code, embeddings, std, and metadata.

Source code in src/featurization/embedding/moment_embedding.py
def create_split_embedding_df(
    embeddings_out: np.ndarray,
    subject_codes: list[str],
    df_metadata: pl.DataFrame,
) -> pl.DataFrame:
    """Create complete embedding DataFrame with metadata and codes.

    Parameters
    ----------
    embeddings_out : np.ndarray
        Embedding array with shape (n_samples, n_features).
    subject_codes : list
        List of subject identifiers.
    df_metadata : pl.DataFrame
        Metadata DataFrame for subjects.

    Returns
    -------
    pl.DataFrame
        Combined DataFrame with subject_code, embeddings, std, and metadata.
    """
    df_codes = pl.DataFrame(subject_codes, schema=["subject_code"])
    df_embeddings = create_embeddings_df(embeddings_out)
    # we don't have no stdev for the embeddings, but to make downstream code manage with less exceptions, let's add them
    df_stdev = create_pseudo_embedding_std(df_embeddings)

    df = pl.concat([df_codes, df_embeddings, df_stdev, df_metadata], how="horizontal")

    return df

get_subject_dict_for_df

get_subject_dict_for_df(
    embeddings_out: ndarray,
    split_dict: dict[str, dict[str, ndarray]],
    cfg: DictConfig,
) -> dict[str, dict[str, ndarray]]

Prepare subject dictionary for DataFrame conversion.

Extracts first timepoint from arrays for scalar metadata.

PARAMETER DESCRIPTION
embeddings_out

Embedding array for validation of subject count.

TYPE: ndarray

split_dict

Split dictionary with metadata arrays.

TYPE: dict

cfg

Configuration dictionary (currently unused).

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Subject dictionary with scalar values per category.

RAISES DESCRIPTION
AssertionError

If embedding and input subject counts don't match.

Source code in src/featurization/embedding/moment_embedding.py
def get_subject_dict_for_df(
    embeddings_out: np.ndarray,
    split_dict: dict[str, dict[str, np.ndarray]],
    cfg: DictConfig,
) -> dict[str, dict[str, np.ndarray]]:
    """Prepare subject dictionary for DataFrame conversion.

    Extracts first timepoint from arrays for scalar metadata.

    Parameters
    ----------
    embeddings_out : np.ndarray
        Embedding array for validation of subject count.
    split_dict : dict
        Split dictionary with metadata arrays.
    cfg : DictConfig
        Configuration dictionary (currently unused).

    Returns
    -------
    dict
        Subject dictionary with scalar values per category.

    Raises
    ------
    AssertionError
        If embedding and input subject counts don't match.
    """
    no_of_embedding_subjects = embeddings_out.shape[0]
    no_of_input_subjects = split_dict["metadata"]["subject_code"].shape[0]
    assert no_of_input_subjects == no_of_embedding_subjects
    split_dict_subject = deepcopy(split_dict)
    for category in split_dict:
        for variable in split_dict[category]:
            array = split_dict[category][variable][:, 0]
            array = convert_object_type(array)
            split_dict_subject[category][variable] = array
    return split_dict_subject

get_subject_codes

get_subject_codes(
    split_dict: dict[str, dict[str, ndarray]],
) -> list[str]

Extract subject codes from split dictionary.

PARAMETER DESCRIPTION
split_dict

Split dictionary with metadata containing subject_code array.

TYPE: dict

RETURNS DESCRIPTION
list

List of subject code strings.

Source code in src/featurization/embedding/moment_embedding.py
def get_subject_codes(
    split_dict: dict[str, dict[str, np.ndarray]],
) -> list[str]:
    """Extract subject codes from split dictionary.

    Parameters
    ----------
    split_dict : dict
        Split dictionary with metadata containing subject_code array.

    Returns
    -------
    list
        List of subject code strings.
    """
    subject_codes = []
    for i in range(split_dict["metadata"]["subject_code"].shape[0]):
        subject_codes.append(split_dict["metadata"]["subject_code"][i, 0])
    return subject_codes

combine_embeddings_with_metadata_for_df

combine_embeddings_with_metadata_for_df(
    embeddings_out: ndarray,
    split_dict: dict[str, dict[str, ndarray]],
    cfg: DictConfig,
) -> DataFrame

Combine embedding array with subject metadata into DataFrame.

PARAMETER DESCRIPTION
embeddings_out

Embedding array with shape (n_samples, n_features).

TYPE: ndarray

split_dict

Split dictionary with metadata.

TYPE: dict

cfg

Configuration dictionary.

TYPE: DictConfig

RETURNS DESCRIPTION
DataFrame

Combined DataFrame with embeddings and metadata.

See Also

compute_features_from_dict : Similar function for handcrafted features.

Source code in src/featurization/embedding/moment_embedding.py
def combine_embeddings_with_metadata_for_df(
    embeddings_out: np.ndarray,
    split_dict: dict[str, dict[str, np.ndarray]],
    cfg: DictConfig,
) -> pl.DataFrame:
    """Combine embedding array with subject metadata into DataFrame.

    Parameters
    ----------
    embeddings_out : np.ndarray
        Embedding array with shape (n_samples, n_features).
    split_dict : dict
        Split dictionary with metadata.
    cfg : DictConfig
        Configuration dictionary.

    Returns
    -------
    pl.DataFrame
        Combined DataFrame with embeddings and metadata.

    See Also
    --------
    compute_features_from_dict : Similar function for handcrafted features.
    """
    subject_codes = get_subject_codes(split_dict)
    split_dict_subject = get_subject_dict_for_df(embeddings_out, split_dict, cfg)
    df_metadata = get_dataframe_from_dict(split_dict_subject, cfg)
    df_metadata = fix_pl_schema(df_metadata)

    # Combine 4 "sub-dataframes" into one
    df = create_split_embedding_df(embeddings_out, subject_codes, df_metadata)

    return df

get_embeddings_per_split

get_embeddings_per_split(
    model: Module,
    dataloader: DataLoader,
    split_dict: dict[str, dict[str, ndarray]],
    model_cfg: DictConfig,
    cfg: DictConfig,
) -> DataFrame

Compute embeddings for all batches in a data split.

PARAMETER DESCRIPTION
model

MOMENT model for embedding extraction.

TYPE: Module

dataloader

PyTorch dataloader for the split.

TYPE: DataLoader

split_dict

Split dictionary with metadata.

TYPE: dict

model_cfg

Model configuration.

TYPE: DictConfig

cfg

Main configuration with DEVICE settings.

TYPE: DictConfig

RETURNS DESCRIPTION
DataFrame

DataFrame with embeddings and metadata.

RAISES DESCRIPTION
AssertionError

If embeddings are None (model initialization issue).

Source code in src/featurization/embedding/moment_embedding.py
def get_embeddings_per_split(
    model: torch.nn.Module,
    dataloader: DataLoader,
    split_dict: dict[str, dict[str, np.ndarray]],
    model_cfg: DictConfig,
    cfg: DictConfig,
) -> pl.DataFrame:
    """Compute embeddings for all batches in a data split.

    Parameters
    ----------
    model : torch.nn.Module
        MOMENT model for embedding extraction.
    dataloader : DataLoader
        PyTorch dataloader for the split.
    split_dict : dict
        Split dictionary with metadata.
    model_cfg : DictConfig
        Model configuration.
    cfg : DictConfig
        Main configuration with DEVICE settings.

    Returns
    -------
    pl.DataFrame
        DataFrame with embeddings and metadata.

    Raises
    ------
    AssertionError
        If embeddings are None (model initialization issue).
    """
    embeddings_out = None
    for i, (batch_x, labels, input_masks) in enumerate(dataloader):
        x = batch_x.unsqueeze(1)
        # x = torch.randn(16, 1, 1981)
        outputs = model(x_enc=x.to(cfg["DEVICE"]["device"]))
        embeddings = outputs.embeddings.detach().cpu().numpy()
        assert embeddings is not None, (
            "Embeddings are None, problem with initalizing the model?"
        )
        if embeddings_out is None:
            embeddings_out = embeddings
        else:
            embeddings_out = np.concatenate((embeddings_out, embeddings), axis=0)

    # combine metadata with embeddings, and return a Polars dataframe so that this matches the handcrafed features
    # and you can do downstream classification more easily
    embeddings_df = combine_embeddings_with_metadata_for_df(
        embeddings_out, split_dict, cfg
    )

    return embeddings_df

get_embeddings

get_embeddings(
    model: Module,
    dataloaders: dict[str, DataLoader],
    source_data: dict[str, Any],
    model_cfg: DictConfig,
    cfg: DictConfig,
) -> dict[str, Any]

Compute embeddings for all data splits.

PARAMETER DESCRIPTION
model

MOMENT model for embedding extraction.

TYPE: Module

dataloaders

Dictionary of dataloaders keyed by split name.

TYPE: dict

source_data

Source data with 'df' and 'mlflow' keys.

TYPE: dict

model_cfg

Model configuration.

TYPE: DictConfig

cfg

Main configuration.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Dictionary with 'data' (embeddings per split) and 'mlflow_run'.

Source code in src/featurization/embedding/moment_embedding.py
def get_embeddings(
    model: torch.nn.Module,
    dataloaders: dict[str, DataLoader],
    source_data: dict[str, Any],
    model_cfg: DictConfig,
    cfg: DictConfig,
) -> dict[str, Any]:
    """Compute embeddings for all data splits.

    Parameters
    ----------
    model : torch.nn.Module
        MOMENT model for embedding extraction.
    dataloaders : dict
        Dictionary of dataloaders keyed by split name.
    source_data : dict
        Source data with 'df' and 'mlflow' keys.
    model_cfg : DictConfig
        Model configuration.
    cfg : DictConfig
        Main configuration.

    Returns
    -------
    dict
        Dictionary with 'data' (embeddings per split) and 'mlflow_run'.
    """
    embeddings = {}
    for split, dataloader in dataloaders.items():
        logger.info(f"Computing embeddings for split {split}")
        embeddings[split] = get_embeddings_per_split(
            model,
            dataloader,
            split_dict=source_data["df"][split],
            model_cfg=model_cfg,
            cfg=cfg,
        )

    features = {"data": embeddings, "mlflow_run": source_data["mlflow"]}

    return features

import_moment_embedder

import_moment_embedder(
    cfg: DictConfig, model_cfg: DictConfig
) -> Module

Import MOMENT model configured for embedding extraction.

PARAMETER DESCRIPTION
cfg

Main configuration with DEVICE settings.

TYPE: DictConfig

model_cfg

Model configuration for MOMENT.

TYPE: DictConfig

RETURNS DESCRIPTION
Module

MOMENT model ready for embedding extraction.

See Also

https://github.com/moment-timeseries-foundation-model/moment/blob/main/tutorials/representation_learning.ipynb

Source code in src/featurization/embedding/moment_embedding.py
def import_moment_embedder(cfg: DictConfig, model_cfg: DictConfig) -> torch.nn.Module:
    """Import MOMENT model configured for embedding extraction.

    Parameters
    ----------
    cfg : DictConfig
        Main configuration with DEVICE settings.
    model_cfg : DictConfig
        Model configuration for MOMENT.

    Returns
    -------
    torch.nn.Module
        MOMENT model ready for embedding extraction.

    See Also
    --------
    https://github.com/moment-timeseries-foundation-model/moment/blob/main/tutorials/representation_learning.ipynb
    """
    model_kwargs = {"task_name": "embedding"}
    # This is the same load as with imputation thus the task name a bit confusing
    model = import_moment_from_mlflow(
        model_cfg=model_cfg, cfg=cfg, task="embedding", model_kwargs=model_kwargs
    )
    model = model.to(cfg["DEVICE"]["device"]).float()
    return model

moment_embedder

moment_embedder(
    source_data: dict[str, Any],
    source_name: str,
    model_cfg: DictConfig,
    cfg: DictConfig,
    run_name: str,
    model_name: str,
    pre_embedding_cfg: Optional[DictConfig],
) -> None

Extract MOMENT embeddings for a data source with MLflow tracking.

Imports model, computes embeddings, optionally applies PCA, and logs results to MLflow.

PARAMETER DESCRIPTION
source_data

Source data dictionary with 'df' and 'mlflow' keys.

TYPE: dict

source_name

Name of the data source.

TYPE: str

model_cfg

MOMENT model configuration.

TYPE: DictConfig

cfg

Main configuration dictionary.

TYPE: DictConfig

run_name

MLflow run name.

TYPE: str

model_name

Model name for logging.

TYPE: str

pre_embedding_cfg

Pre-embedding configuration (e.g., PCA).

TYPE: DictConfig or None

RAISES DESCRIPTION
ValueError

If pre_embedding_cfg has unknown preprocessing method.

Source code in src/featurization/embedding/moment_embedding.py
def moment_embedder(
    source_data: dict[str, Any],
    source_name: str,
    model_cfg: DictConfig,
    cfg: DictConfig,
    run_name: str,
    # artifacts_dir: str,
    model_name: str,
    pre_embedding_cfg: Optional[DictConfig],
    # experiment_name: str
) -> None:
    """Extract MOMENT embeddings for a data source with MLflow tracking.

    Imports model, computes embeddings, optionally applies PCA, and logs
    results to MLflow.

    Parameters
    ----------
    source_data : dict
        Source data dictionary with 'df' and 'mlflow' keys.
    source_name : str
        Name of the data source.
    model_cfg : DictConfig
        MOMENT model configuration.
    cfg : DictConfig
        Main configuration dictionary.
    run_name : str
        MLflow run name.
    model_name : str
        Model name for logging.
    pre_embedding_cfg : DictConfig or None
        Pre-embedding configuration (e.g., PCA).

    Raises
    ------
    ValueError
        If pre_embedding_cfg has unknown preprocessing method.
    """
    # Get the model
    model = import_moment_embedder(cfg, model_cfg)

    # init stuff
    dataloaders = init_torch_training(
        data_dict=source_data,
        cfg=cfg,
        model_cfg=model_cfg,
        run_name=run_name,
        task="imputation",
        create_outlier_dataloaders=False,
    )

    with mlflow.start_run(run_name=run_name):
        # Log params and metrics to MLflow
        featurization_mlflow_metrics_and_params(
            mlflow_run=source_data["mlflow"], source_name=source_name, cfg=cfg
        )

        mlflow.log_param("model_name", model_name)
        mlflow.log_param(
            "pretrained_model_name_or_path",
            model_cfg["MODEL"]["pretrained_model_name_or_path"],
        )

        # Get the embeddings
        embeddings = get_embeddings(model, dataloaders, source_data, model_cfg, cfg)

        # Possible preprocessing (well preprocessing for classifier, post-processing for embedding)
        if pre_embedding_cfg is not None:
            if len(pre_embedding_cfg.keys()) == 1:
                if list(pre_embedding_cfg.keys())[0] == "PCA":
                    from src.featurization.embedding.dim_reduction import (
                        apply_PCA_for_embedding,
                    )

                    embeddings = apply_PCA_for_embedding(
                        embeddings, pca_config=pre_embedding_cfg["PCA"]
                    )
                else:
                    logger.error(
                        "Unknown preprocessing method = {}".format(
                            list(pre_embedding_cfg.keys())[0]
                        )
                    )
                    raise ValueError(
                        "Unknown preprocessing method {}".format(
                            list(pre_embedding_cfg.keys())[0]
                        )
                    )
            else:
                logger.error(
                    "Only one key allowed here, pre_embedding_cfg = {}".format(
                        pre_embedding_cfg
                    )
                )
                raise ValueError(
                    "Only one key allowed here, pre_embedding_cfg = {}".format(
                        pre_embedding_cfg
                    )
                )

        # Log to Mlflow as an artifact
        log_embeddings_to_mlflow(embeddings, run_name, model_name, source_name)
        mlflow.end_run()

dim_reduction

cap_dimensionality_of_PCA

cap_dimensionality_of_PCA(
    train_pcs: ndarray, test_pcs: ndarray, max_dim: int = 96
) -> tuple[ndarray, ndarray]

Cap PCA dimensionality to a maximum number of components.

Prevents downstream issues with classifiers that have feature limits (e.g., TabPFN max 100 features).

PARAMETER DESCRIPTION
train_pcs

Training principal components with shape (n_samples, n_components).

TYPE: ndarray

test_pcs

Test principal components with shape (n_samples, n_components).

TYPE: ndarray

max_dim

Maximum number of dimensions to keep, by default 96.

TYPE: int DEFAULT: 96

RETURNS DESCRIPTION
tuple

(train_pcs, test_pcs) with components capped at max_dim.

Source code in src/featurization/embedding/dim_reduction.py
def cap_dimensionality_of_PCA(
    train_pcs: np.ndarray, test_pcs: np.ndarray, max_dim: int = 96
) -> tuple[np.ndarray, np.ndarray]:
    """Cap PCA dimensionality to a maximum number of components.

    Prevents downstream issues with classifiers that have feature limits
    (e.g., TabPFN max 100 features).

    Parameters
    ----------
    train_pcs : np.ndarray
        Training principal components with shape (n_samples, n_components).
    test_pcs : np.ndarray
        Test principal components with shape (n_samples, n_components).
    max_dim : int, optional
        Maximum number of dimensions to keep, by default 96.

    Returns
    -------
    tuple
        (train_pcs, test_pcs) with components capped at max_dim.
    """
    max_dim_in = max(train_pcs.shape[1], test_pcs.shape[1])
    if max_dim_in > max_dim:
        logger.warning(
            f"capping dimensions from max_dim_in {max_dim_in} > max_dim {max_dim}"
        )
        if train_pcs.shape[1] > max_dim:
            train_pcs = train_pcs[:, :max_dim]
        if test_pcs.shape[1] > max_dim:
            test_pcs = test_pcs[:, :max_dim]

    return train_pcs, test_pcs

apply_PCA_for_embedding

apply_PCA_for_embedding(
    embeddings: dict, pca_config: DictConfig
) -> dict

Apply PCA dimensionality reduction to embeddings.

Standardizes features, fits PCA on training data, transforms both train and test, and logs results to MLflow.

PARAMETER DESCRIPTION
embeddings

Dictionary with 'data' containing train/test DataFrames.

TYPE: dict

pca_config

Configuration with 'explained_variance' and 'max_dim'.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Embeddings with PCA-transformed features.

Source code in src/featurization/embedding/dim_reduction.py
def apply_PCA_for_embedding(embeddings: dict, pca_config: DictConfig) -> dict:
    """Apply PCA dimensionality reduction to embeddings.

    Standardizes features, fits PCA on training data, transforms both
    train and test, and logs results to MLflow.

    Parameters
    ----------
    embeddings : dict
        Dictionary with 'data' containing train/test DataFrames.
    pca_config : DictConfig
        Configuration with 'explained_variance' and 'max_dim'.

    Returns
    -------
    dict
        Embeddings with PCA-transformed features.
    """
    # Get just the features
    train_df: pd.DataFrame = get_df_features(df=embeddings["data"]["train"]).to_pandas()
    test_df: pd.DataFrame = get_df_features(df=embeddings["data"]["test"]).to_pandas()

    # Standardize
    scalar = StandardScaler()
    scalar.fit(train_df)
    train_df_scaled = pd.DataFrame(scalar.transform(train_df))
    test_df_scaled = pd.DataFrame(scalar.transform(test_df))

    # PCA
    pca = PCA(n_components=pca_config["explained_variance"])
    pca.fit(train_df_scaled)  # data: (n_samples, n_features)
    # print(pca.explained_variance_ratio_)
    # print(np.cumsum(pca.explained_variance_ratio_))
    train_pcs = pca.transform(train_df_scaled)
    test_pcs = pca.transform(test_df_scaled)
    assert train_pcs.shape[1] == test_pcs.shape[1], (
        "number of features (PCs) does not match"
    )

    logger.info(
        f"PCA kept {train_pcs.shape[1]} components with explained variance"
        f" = {np.cumsum(pca.explained_variance_ratio_)[-1]:.3f}"
    )

    # pupil-gt__pupil-gt ground truth got down to 21 principal components
    # what if some outlier+imputation combo gives larger dimensionality
    train_pcs, test_pcs = cap_dimensionality_of_PCA(
        train_pcs, test_pcs, max_dim=pca_config["max_dim"]
    )

    for param, value in pca_config.items():
        mlflow.log_param(param, value)
    mlflow.log_param("no_of_PCs", train_pcs.shape[1])

    # assign to the input dataframe with some metadata as well
    embeddings = assign_features_back_to_full_df(embeddings, train_pcs, test_pcs)

    return embeddings

assign_features_back_to_full_df

assign_features_back_to_full_df(
    embeddings: dict, train_pcs: ndarray, test_pcs: ndarray
) -> dict

Replace embedding features with PCA-transformed components.

PARAMETER DESCRIPTION
embeddings

Original embeddings dictionary.

TYPE: dict

train_pcs

PCA-transformed training data.

TYPE: ndarray

test_pcs

PCA-transformed test data.

TYPE: ndarray

RETURNS DESCRIPTION
dict

Embeddings with replaced feature columns.

Source code in src/featurization/embedding/dim_reduction.py
def assign_features_back_to_full_df(
    embeddings: dict, train_pcs: np.ndarray, test_pcs: np.ndarray
) -> dict:
    """Replace embedding features with PCA-transformed components.

    Parameters
    ----------
    embeddings : dict
        Original embeddings dictionary.
    train_pcs : np.ndarray
        PCA-transformed training data.
    test_pcs : np.ndarray
        PCA-transformed test data.

    Returns
    -------
    dict
        Embeddings with replaced feature columns.
    """

    def get_metadata_cols(df_out):
        cols = df_out.columns
        cols_to_drop = [i for i in cols if "embedding" in i]
        df = df_out.drop(cols_to_drop)
        return df

    def assign_per_df(pcs, df_out):
        df_meta = get_metadata_cols(df_out).to_pandas()
        for idx in range(pcs.shape[1]):
            df_meta["embedding{}_value".format(idx)] = pcs[:, idx]
            df_meta["embedding{}_std".format(idx)] = 0  # placeholder
        return df_meta

    embeddings["data"]["train"] = assign_per_df(
        pcs=train_pcs, df_out=embeddings["data"]["train"]
    )
    embeddings["data"]["test"] = assign_per_df(
        pcs=test_pcs, df_out=embeddings["data"]["test"]
    )

    return embeddings

apply_dimensionality_reduction_for_feature_sources

apply_dimensionality_reduction_for_feature_sources(
    features: dict, cfg: DictConfig
) -> dict

Apply dimensionality reduction to all embedding feature sources.

PARAMETER DESCRIPTION
features

Features dictionary keyed by source name.

TYPE: dict

cfg

Configuration with DIM_REDUCTION settings.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Features with dimensionality-reduced embeddings.

Source code in src/featurization/embedding/dim_reduction.py
def apply_dimensionality_reduction_for_feature_sources(
    features: dict, cfg: DictConfig
) -> dict:
    """Apply dimensionality reduction to all embedding feature sources.

    Parameters
    ----------
    features : dict
        Features dictionary keyed by source name.
    cfg : DictConfig
        Configuration with DIM_REDUCTION settings.

    Returns
    -------
    dict
        Features with dimensionality-reduced embeddings.
    """
    dim_cfg = cfg["CLASSIFICATION_SETTINGS"]["DIM_REDUCTION"]
    if dim_cfg["enable"]:
        logger.info("Applying dimensionality reduction for embedding feature sources")
        logger.info(dim_cfg)

        start_time = datetime.now()
        n = 0
        for source_name, source_data in tqdm(
            features.items(), desc="Applying dimensionality reduction"
        ):
            if "embedding" in source_name:
                n += 1
                if dim_cfg["enable"]:
                    features[source_name]["data"] = embedding_dim_reduction_wrapper(
                        embeddings=source_data["data"],
                        dim_cfg=dim_cfg,
                        source_name=source_name,
                    )

        dim_time_per_sample = ((datetime.now() - start_time).total_seconds()) / n
        logger.info(
            f"Dimensionality reduction done in {dim_time_per_sample:.2f} seconds per source"
        )

    return features

torch_to_numpy

torch_to_numpy(_torch_tensor) -> None

Convert PyTorch tensor to numpy array.

PARAMETER DESCRIPTION
_torch_tensor

Input tensor (currently unused — placeholder).

TYPE: Tensor

RETURNS DESCRIPTION
ndarray

Numpy array.

Notes

Not implemented - placeholder function.

Source code in src/featurization/embedding/dim_reduction.py
def torch_to_numpy(_torch_tensor) -> None:
    """Convert PyTorch tensor to numpy array.

    Parameters
    ----------
    _torch_tensor : torch.Tensor
        Input tensor (currently unused — placeholder).

    Returns
    -------
    np.ndarray
        Numpy array.

    Notes
    -----
    Not implemented - placeholder function.
    """
    print("todo!")

get_df_features

get_df_features(df: DataFrame) -> DataFrame

Extract feature columns (ending in '_value') from DataFrame.

PARAMETER DESCRIPTION
df

Input DataFrame with embedding columns.

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

DataFrame with only feature value columns.

Source code in src/featurization/embedding/dim_reduction.py
def get_df_features(df: pl.DataFrame) -> pl.DataFrame:
    """Extract feature columns (ending in '_value') from DataFrame.

    Parameters
    ----------
    df : pl.DataFrame
        Input DataFrame with embedding columns.

    Returns
    -------
    pl.DataFrame
        DataFrame with only feature value columns.
    """
    cols = df.columns
    cols_to_keep = [
        i for i in cols if "_value" in i
    ]  # from MOMENT this should be length of 1024
    df = df.select(cols_to_keep)

    return df

get_feature_embedding_df

get_feature_embedding_df(
    df: DataFrame,
    label_col: str = "metadata_class_label",
    return_classes_as_int: bool = True,
) -> tuple[DataFrame, ndarray, DataFrame]

Extract features, labels, and metadata from embedding DataFrame.

Separates feature columns from metadata and encodes labels.

PARAMETER DESCRIPTION
df

Input DataFrame with embeddings and metadata.

TYPE: DataFrame

label_col

Column name for class labels, by default 'metadata_class_label'.

TYPE: str DEFAULT: 'metadata_class_label'

return_classes_as_int

If True, encode string labels as integers, by default True.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
tuple

(features_df, labels_array, metadata_df) where features_df contains only _value columns, labels_array is encoded (0/1), and metadata_df contains remaining columns.

RAISES DESCRIPTION
AssertionError

If labels don't have exactly 2 unique values (binary classification).

Source code in src/featurization/embedding/dim_reduction.py
def get_feature_embedding_df(
    df: pl.DataFrame,
    label_col: str = "metadata_class_label",
    return_classes_as_int: bool = True,
) -> tuple[pl.DataFrame, np.ndarray, pl.DataFrame]:
    """Extract features, labels, and metadata from embedding DataFrame.

    Separates feature columns from metadata and encodes labels.

    Parameters
    ----------
    df : pl.DataFrame
        Input DataFrame with embeddings and metadata.
    label_col : str, optional
        Column name for class labels, by default 'metadata_class_label'.
    return_classes_as_int : bool, optional
        If True, encode string labels as integers, by default True.

    Returns
    -------
    tuple
        (features_df, labels_array, metadata_df) where features_df contains
        only _value columns, labels_array is encoded (0/1), and metadata_df
        contains remaining columns.

    Raises
    ------
    AssertionError
        If labels don't have exactly 2 unique values (binary classification).
    """
    cols = df.columns
    df_in = deepcopy(df)
    labels = df[label_col].to_numpy()
    assert len(np.unique(labels)) == 2, "we are doing a binary classification"
    if return_classes_as_int:
        # convert string labels to integers
        le = preprocessing.LabelEncoder()
        le.fit(labels)
        labels = le.transform(labels)  # 0: control, 1: glaucoma
    cols_to_keep = [
        i for i in cols if "_value" in i
    ]  # from MOMENT this should be length of 1024
    cols_to_drop = [
        i for i in cols if "_std" in i
    ]  # from MOMENT this should be length of 1024
    df = df.select(cols_to_keep)
    df_out = df_in.drop(cols_to_drop + cols_to_keep)  # the metadata and code, etc

    return df, labels, df_out

combine_cols_to_out

combine_cols_to_out(
    embedding_train: ndarray,
    embedding_test: ndarray,
    train_df_out: DataFrame,
    test_df_out: DataFrame,
) -> tuple[DataFrame, DataFrame]

Combine reduced embeddings with metadata DataFrames.

PARAMETER DESCRIPTION
embedding_train

Reduced training embeddings.

TYPE: ndarray

embedding_test

Reduced test embeddings.

TYPE: ndarray

train_df_out

Training metadata.

TYPE: DataFrame

test_df_out

Test metadata.

TYPE: DataFrame

RETURNS DESCRIPTION
tuple

(df_train, df_test) combined DataFrames.

Source code in src/featurization/embedding/dim_reduction.py
def combine_cols_to_out(
    embedding_train: np.ndarray,
    embedding_test: np.ndarray,
    train_df_out: pl.DataFrame,
    test_df_out: pl.DataFrame,
) -> tuple[pl.DataFrame, pl.DataFrame]:
    """Combine reduced embeddings with metadata DataFrames.

    Parameters
    ----------
    embedding_train : np.ndarray
        Reduced training embeddings.
    embedding_test : np.ndarray
        Reduced test embeddings.
    train_df_out : pl.DataFrame
        Training metadata.
    test_df_out : pl.DataFrame
        Test metadata.

    Returns
    -------
    tuple
        (df_train, df_test) combined DataFrames.
    """

    def combine_per_split(embeddings: np.ndarray, df_out: pl.DataFrame):
        assert embeddings.shape[0] == df_out.shape[0]
        df_stdev = create_pseudo_embedding_std(embeddings)
        df_embeddings = create_embeddings_df(embeddings)
        df = pl.concat([df_embeddings, df_stdev, df_out], how="horizontal")
        assert df.shape[0] == df_out.shape[0]
        return df

    df_train = combine_per_split(embeddings=embedding_train, df_out=train_df_out)
    df_test = combine_per_split(embeddings=embedding_test, df_out=test_df_out)

    return df_train, df_test

umap_wrapper

umap_wrapper(
    embeddings: dict, dim_cfg: DictConfig, source_name: str
) -> dict

Apply UMAP dimensionality reduction to embeddings.

PARAMETER DESCRIPTION
embeddings

Dictionary with 'train' and 'test' DataFrames.

TYPE: dict

dim_cfg

Configuration with n_neighbors, n_components, random_state, transform_seed, and supervised settings.

TYPE: DictConfig

source_name

Source name for logging (currently unused).

TYPE: str

RETURNS DESCRIPTION
dict

Embeddings with UMAP-reduced features.

Notes

Uses default UMAP parameters - would need HPO for fair assessment.

Source code in src/featurization/embedding/dim_reduction.py
def umap_wrapper(embeddings: dict, dim_cfg: DictConfig, source_name: str) -> dict:
    """Apply UMAP dimensionality reduction to embeddings.

    Parameters
    ----------
    embeddings : dict
        Dictionary with 'train' and 'test' DataFrames.
    dim_cfg : DictConfig
        Configuration with n_neighbors, n_components, random_state,
        transform_seed, and supervised settings.
    source_name : str
        Source name for logging (currently unused).

    Returns
    -------
    dict
        Embeddings with UMAP-reduced features.

    Notes
    -----
    Uses default UMAP parameters - would need HPO for fair assessment.
    """

    train_df, train_labels, train_df_out = get_feature_embedding_df(
        df=embeddings["train"]
    )
    test_df, test_labels, test_df_out = get_feature_embedding_df(df=embeddings["test"])
    assert train_df.shape[1] == test_df.shape[1], (
        "Number of features do not match between test and train"
    )

    # unsupervised baseline
    # https://umap-learn.readthedocs.io/en/latest/supervised.html#umap-on-fashion-mnist
    mapper = umap.UMAP(
        n_neighbors=dim_cfg["n_neighbors"],
        n_components=dim_cfg["n_components"],
        random_state=dim_cfg["random_state"],
        transform_seed=dim_cfg["transform_seed"],
        n_jobs=1,
    )

    if dim_cfg["supervised"]:
        # https://umap-learn.readthedocs.io/en/latest/supervised.html#training-with-labels-and-embedding-unlabelled-test-data-metric-learning-with-umap
        mapper = mapper.fit(train_df.to_numpy(), np.array(train_labels))

    embedding_train = mapper.fit_transform(
        train_df.to_numpy()
    )  # e.g. (145,1024) -> (145,8)
    embedding_test = mapper.fit_transform(test_df.to_numpy())

    # combine with the metadata and create the pseudostdev again
    embeddings["train"], embeddings["test"] = combine_cols_to_out(
        embedding_train, embedding_test, train_df_out, test_df_out
    )

    return embeddings

embedding_dim_reduction_wrapper

embedding_dim_reduction_wrapper(
    embeddings: dict, dim_cfg: DictConfig, source_name: str
) -> dict

Apply dimensionality reduction to embeddings based on configuration.

Reduces high-dimensional embeddings (e.g., 1024) to lower dimensions for visualization or classification.

PARAMETER DESCRIPTION
embeddings

Dictionary with 'train' and 'test' DataFrames.

TYPE: dict

dim_cfg

Configuration with 'method' and method-specific parameters.

TYPE: DictConfig

source_name

Source name for logging.

TYPE: str

RETURNS DESCRIPTION
dict

Embeddings with reduced dimensionality.

RAISES DESCRIPTION
NotImplementedError

If dim_cfg['method'] is not supported.

Source code in src/featurization/embedding/dim_reduction.py
def embedding_dim_reduction_wrapper(
    embeddings: dict, dim_cfg: DictConfig, source_name: str
) -> dict:
    """Apply dimensionality reduction to embeddings based on configuration.

    Reduces high-dimensional embeddings (e.g., 1024) to lower dimensions
    for visualization or classification.

    Parameters
    ----------
    embeddings : dict
        Dictionary with 'train' and 'test' DataFrames.
    dim_cfg : DictConfig
        Configuration with 'method' and method-specific parameters.
    source_name : str
        Source name for logging.

    Returns
    -------
    dict
        Embeddings with reduced dimensionality.

    Raises
    ------
    NotImplementedError
        If dim_cfg['method'] is not supported.
    """
    if dim_cfg["method"] == "UMAP":
        embeddings = umap_wrapper(embeddings, dim_cfg, source_name)
    else:
        logger.error("Method {} not implemented! Typo?".format(dim_cfg["method"]))
        raise NotImplementedError(
            "Method {} not implemented!".format(dim_cfg["method"])
        )

    return embeddings

Visualization

visualize_features

visualize_features_of_all_sources

visualize_features_of_all_sources(
    features: dict, mlflow_infos: dict, cfg: DictConfig
)

Generate and export feature visualizations for all data sources.

Creates visualizations combining features from multiple sources and logs them as MLflow artifacts.

PARAMETER DESCRIPTION
features

Dictionary of features keyed by data source.

TYPE: dict

mlflow_infos

MLflow run information for artifact logging.

TYPE: dict

cfg

Configuration with VISUALIZATION settings.

TYPE: DictConfig

Source code in src/featurization/visualize_features.py
def visualize_features_of_all_sources(
    features: dict, mlflow_infos: dict, cfg: DictConfig
):
    """Generate and export feature visualizations for all data sources.

    Creates visualizations combining features from multiple sources and
    logs them as MLflow artifacts.

    Parameters
    ----------
    features : dict
        Dictionary of features keyed by data source.
    mlflow_infos : dict
        MLflow run information for artifact logging.
    cfg : DictConfig
        Configuration with VISUALIZATION settings.
    """
    if cfg["PLR_FEATURIZATION"]["VISUALIZATION"]["visualize_features"]:
        # Visualize features from all the sources in one figure
        fig_paths = visualize_features(features=features, cfg=cfg)

        # Note! Now that we had all different sources in one figure, we cannot log image to a specific run,
        # so as a compromise, we write the same figure to each of the runs (sources, models) visualized
        export_viz_as_artifacts(
            fig_paths,
            flow_type="featurization",
            cfg=cfg,
            mlflow_infos=mlflow_infos,
        )

        # If you want to loop and visualize source (model) by source
        # for data_source in features.keys():
        #     data_features = features[data_source]["data"]
        #     mlflow_run = features[data_source]["mlflow_run"]

    else:
        logger.info("Skipping the visualization of the features")