Skip to content

log_helpers

Logging, MLflow, and utility functions.

Overview

Centralized utilities for:

  • MLflow experiment tracking
  • Hydra configuration
  • Artifact management
  • System utilities

MLflow Integration

mlflow_utils

init_mlflow

init_mlflow(cfg: DictConfig) -> None

Initialize MLflow tracking URI from configuration.

PARAMETER DESCRIPTION
cfg

Configuration containing SERVICES.mlflow_tracking_uri.

TYPE: DictConfig

Notes

If no URI is specified, MLflow uses a local 'mlruns' directory.

Source code in src/log_helpers/mlflow_utils.py
def init_mlflow(cfg: DictConfig) -> None:
    """
    Initialize MLflow tracking URI from configuration.

    Parameters
    ----------
    cfg : DictConfig
        Configuration containing SERVICES.mlflow_tracking_uri.

    Notes
    -----
    If no URI is specified, MLflow uses a local 'mlruns' directory.
    """
    # Set the MLflow tracking URI (export MLFLOW_TRACKING_URI='file:////home/petteri/Dropbox/mlruns')
    if cfg["SERVICES"]["mlflow_tracking_uri"] is not None:
        mlflow.set_tracking_uri(cfg["SERVICES"]["mlflow_tracking_uri"])
    else:
        logger.warning(
            "You did not specify any MLflow tracking URI. Using the 'mlruns' dir inside 'src'"
        )
    logger.info(f"{mlflow.get_tracking_uri()}")

init_mlflow_experiment

init_mlflow_experiment(
    mlflow_cfg: Optional[DictConfig] = None,
    experiment_name: str = "PLR_imputation",
    override_default_location: bool = False,
    _permanent_delete: bool = True,
) -> None

Initialize or get an MLflow experiment.

PARAMETER DESCRIPTION
mlflow_cfg

MLflow configuration (currently unused).

TYPE: DictConfig DEFAULT: None

experiment_name

Name of the experiment to create/get.

TYPE: str DEFAULT: "PLR_imputation"

override_default_location

If True, use custom artifact location.

TYPE: bool DEFAULT: False

_permanent_delete

Permanent deletion flag (currently unused).

TYPE: bool DEFAULT: True

RAISES DESCRIPTION
Exception

If experiment creation fails (e.g., permission issues).

Source code in src/log_helpers/mlflow_utils.py
def init_mlflow_experiment(
    mlflow_cfg: Optional[DictConfig] = None,
    experiment_name: str = "PLR_imputation",
    override_default_location: bool = False,
    _permanent_delete: bool = True,
) -> None:
    """
    Initialize or get an MLflow experiment.

    Parameters
    ----------
    mlflow_cfg : DictConfig, optional
        MLflow configuration (currently unused).
    experiment_name : str, default "PLR_imputation"
        Name of the experiment to create/get.
    override_default_location : bool, default False
        If True, use custom artifact location.
    _permanent_delete : bool, default True
        Permanent deletion flag (currently unused).

    Raises
    ------
    Exception
        If experiment creation fails (e.g., permission issues).
    """
    # https://mlflow.org/docs/latest/getting-started/logging-first-model/step3-create-experiment.html
    if override_default_location:
        logger.info("Overriding default MLflow location")
        logger.warning(
            'Leads to permission denied error?! Set "override_default_location = True" in the code'
        )
        mlruns_dir = get_artifacts_dir("mlflow", "mlruns")
        mlflow.set_tracking_uri(f"file://{mlruns_dir}")
    else:
        logger.debug("Using default MLflow location")

    try:
        mlflow.set_experiment(experiment_name)
    except Exception as e:
        logger.error(f"Failed to set MLflow experiment: {e}")
        logger.error(
            "Failed to set MLflow experiment, but not auto-deleting the experiment. Solve this manually"
        )
        logger.error("See e.g. https://stackoverflow.com/a/60869104/6412152")
        logger.error("e.g. '' or 'mlflow gc [OPTIONS]'")
        raise e
    logger.info(
        f"MLflow | Initializing MLflow Experiment tracking (Server) at {mlflow.get_tracking_uri()}"
    )
    set_artifact_store_location()

set_artifact_store_location

set_artifact_store_location() -> None

Set MLflow artifact store location.

Currently a placeholder for future remote storage (e.g., S3) configuration.

RETURNS DESCRIPTION
None

No artifact store location is set currently.

Source code in src/log_helpers/mlflow_utils.py
def set_artifact_store_location() -> None:
    """Set MLflow artifact store location.

    Currently a placeholder for future remote storage (e.g., S3) configuration.

    Returns
    -------
    None
        No artifact store location is set currently.
    """
    # https://mlflow.org/docs/latest/tracking/artifacts-stores.html
    # TODO! Some remote, e.g. S3
    return None

init_mlflow_run

init_mlflow_run(
    mlflow_cfg: DictConfig,
    run_name: str,
    cfg: DictConfig,
    experiment_name: str,
) -> None

Start a new MLflow run.

PARAMETER DESCRIPTION
mlflow_cfg

MLflow configuration with 'log_system_metrics' flag.

TYPE: DictConfig

run_name

Name for the MLflow run.

TYPE: str

cfg

Full Hydra configuration to log.

TYPE: DictConfig

experiment_name

Name of the MLflow experiment.

TYPE: str

RAISES DESCRIPTION
Exception

If run creation fails.

Source code in src/log_helpers/mlflow_utils.py
def init_mlflow_run(
    mlflow_cfg: DictConfig, run_name: str, cfg: DictConfig, experiment_name: str
) -> None:
    """
    Start a new MLflow run.

    Parameters
    ----------
    mlflow_cfg : DictConfig
        MLflow configuration with 'log_system_metrics' flag.
    run_name : str
        Name for the MLflow run.
    cfg : DictConfig
        Full Hydra configuration to log.
    experiment_name : str
        Name of the MLflow experiment.

    Raises
    ------
    Exception
        If run creation fails.
    """
    try:
        mlflow.start_run(
            run_name=run_name, log_system_metrics=mlflow_cfg["log_system_metrics"]
        )
    except Exception as e:
        logger.error(f"Failed to start MLflow run: {e}")
        mlflow_info = get_mlflow_info()
        logger.error(mlflow_info)
        raise e

    logger.info(f"MLflow | Starting MLflow Run with name {run_name}")
    log_hydra_cfg_to_mlflow(cfg)

    if "OutlierDetection" in run_name:
        if mlflow_cfg["test_artifact_store"]:
            try:
                test_artifact_write()
                logger.debug("MLflow artifact store test passed")
            except Exception as e:
                logger.error(f"Failed to write MLflow artifact: {e}")
                raise e

log_hydra_cfg_to_mlflow

log_hydra_cfg_to_mlflow(cfg: DictConfig) -> None

Log Hydra configuration to MLflow as a YAML artifact.

PARAMETER DESCRIPTION
cfg

Hydra configuration to log.

TYPE: DictConfig

Source code in src/log_helpers/mlflow_utils.py
def log_hydra_cfg_to_mlflow(cfg: DictConfig) -> None:
    """Log Hydra configuration to MLflow as a YAML artifact.

    Parameters
    ----------
    cfg : DictConfig
        Hydra configuration to log.
    """
    # Log the Hydra config to MLflow
    logger.info("Logging Hydra config to MLflow")
    # TODO! save as YAML and log as an artifact?
    hydra_dir = get_hydra_output_dir()
    path_out = save_hydra_cfg_as_yaml(cfg, dir_output=hydra_dir)
    mlflow.log_artifact(path_out, artifact_path="config")

get_mlflow_info

get_mlflow_info() -> Dict[str, Any]

Get current MLflow run information as a dictionary.

Collects tags, run info, and experiment info from the active MLflow run. Useful for storing MLflow metadata alongside model artifacts for later reference when logging metrics or additional artifacts.

RETURNS DESCRIPTION
dict

Dictionary with 'run_tags', 'run_info', and 'experiment' keys.

Source code in src/log_helpers/mlflow_utils.py
def get_mlflow_info() -> Dict[str, Any]:
    """Get current MLflow run information as a dictionary.

    Collects tags, run info, and experiment info from the active MLflow run.
    Useful for storing MLflow metadata alongside model artifacts for later
    reference when logging metrics or additional artifacts.

    Returns
    -------
    dict
        Dictionary with 'run_tags', 'run_info', and 'experiment' keys.
    """
    # ToOptimize, now we are running multiple times the same training module with different hyperparameters
    # and only do the "forward pass" evaluation to get the imputation results, and keep the performance metric
    # evaluation on a separate "Prefect task" allowing greater flexibility in the future so that you can implement
    # new metrics if desired without having to retrain the model
    # This means we need to know the experiment_name and run_name of the initial runs (assuming we want to still
    # log to MLflow)

    client = MlflowClient()
    mlflow_dict = {
        "run_tags": mlflow.active_run().data.tags,
        "run_info": dict(mlflow.active_run().info),
        "experiment": dict(
            client.get_experiment(mlflow.active_run().info.experiment_id)
        ),
    }

    return mlflow_dict

log_metrics_as_mlflow_artifact

log_metrics_as_mlflow_artifact(
    metrics_subjectwise: Dict[str, Any],
    model_name: str,
    model_artifacts: Dict[str, Any],
    cfg: DictConfig,
) -> None

Log subject-wise metrics as a pickled MLflow artifact.

PARAMETER DESCRIPTION
metrics_subjectwise

Dictionary containing per-subject metrics.

TYPE: dict

model_name

Name of the model for filename generation.

TYPE: str

model_artifacts

Model artifacts containing MLflow info.

TYPE: dict

cfg

Configuration object (currently unused).

TYPE: DictConfig

Source code in src/log_helpers/mlflow_utils.py
def log_metrics_as_mlflow_artifact(
    metrics_subjectwise: Dict[str, Any],
    model_name: str,
    model_artifacts: Dict[str, Any],
    cfg: DictConfig,
) -> None:
    """Log subject-wise metrics as a pickled MLflow artifact.

    Parameters
    ----------
    metrics_subjectwise : dict
        Dictionary containing per-subject metrics.
    model_name : str
        Name of the model for filename generation.
    model_artifacts : dict
        Model artifacts containing MLflow info.
    cfg : DictConfig
        Configuration object (currently unused).
    """
    # Where are things saved locally, could be an ephemeral location, and the script logs
    # artifacts from here to MLflow that should be then in a non-ephemeral location
    output_dir, fname, artifacts_path = define_pypots_outputs(
        model_name=model_name, artifact_type="metrics"
    )

    # Save as a pickle
    save_results_dict(metrics_subjectwise, artifacts_path)

    # Save the subject-wise metrics as a pickled artifact
    mlflow_info = get_mlflow_info_from_model_dict(model_artifacts)
    experiment_id, run_id = get_mlflow_params(mlflow_info)
    with mlflow.start_run(run_id):
        logger.info("Logging metrics as a pickled artifact to MLflow")
        mlflow.log_artifact(artifacts_path, artifact_path="metrics")

mlflow_imputation_metrics_logger

mlflow_imputation_metrics_logger(
    metrics_global: Dict[str, Any], split: str
) -> None

Log global imputation metrics to MLflow.

Handles both scalar metrics and array metrics (e.g., confidence intervals).

PARAMETER DESCRIPTION
metrics_global

Dictionary of metric names to values.

TYPE: dict

split

Data split name for metric naming.

TYPE: str

Source code in src/log_helpers/mlflow_utils.py
def mlflow_imputation_metrics_logger(
    metrics_global: Dict[str, Any], split: str
) -> None:
    """Log global imputation metrics to MLflow.

    Handles both scalar metrics and array metrics (e.g., confidence intervals).

    Parameters
    ----------
    metrics_global : dict
        Dictionary of metric names to values.
    split : str
        Data split name for metric naming.
    """
    for metric_key in metrics_global:
        metric_out = get_mlflow_metric_name(split, metric_key)
        metric_value = metrics_global[metric_key]
        logger.debug(f"Logging metric {metric_out} to MLflow, value {metric_value}")
        if isinstance(metric_value, np.ndarray):
            mlflow.log_metric(metric_out + "_lo", metric_value[0])
            mlflow.log_metric(metric_out + "_hi", metric_value[1])
        else:
            mlflow.log_metric(metric_out, metric_value)

log_mlflow_imputation_metrics

log_mlflow_imputation_metrics(
    metrics_global: Dict[str, Any],
    model_name: str,
    split: str,
    model_artifacts: Dict[str, Any],
    cfg: DictConfig,
) -> None

Log imputation metrics and Hydra log to MLflow for an existing run.

PARAMETER DESCRIPTION
metrics_global

Global metrics dictionary.

TYPE: dict

model_name

Name of the imputation model (currently unused).

TYPE: str

split

Data split name.

TYPE: str

model_artifacts

Model artifacts with MLflow info.

TYPE: dict

cfg

Configuration object (currently unused).

TYPE: DictConfig

Source code in src/log_helpers/mlflow_utils.py
def log_mlflow_imputation_metrics(
    metrics_global: Dict[str, Any],
    model_name: str,
    split: str,
    model_artifacts: Dict[str, Any],
    cfg: DictConfig,
) -> None:
    """Log imputation metrics and Hydra log to MLflow for an existing run.

    Parameters
    ----------
    metrics_global : dict
        Global metrics dictionary.
    model_name : str
        Name of the imputation model (currently unused).
    split : str
        Data split name.
    model_artifacts : dict
        Model artifacts with MLflow info.
    cfg : DictConfig
        Configuration object (currently unused).
    """
    mlflow_info = get_mlflow_info_from_model_dict(model_artifacts)
    experiment_id, run_id = get_mlflow_params(mlflow_info)

    # Log the metrics MLflow
    if mlflow.active_run() is not None:
        mlflow.end_run()

    with mlflow.start_run(run_id):
        mlflow_imputation_metrics_logger(metrics_global, split)

        # Intermediate Hydra log with the suffix
        hydra_log = get_intermediate_hydra_log_path()
        log_the_hydra_log_as_mlflow_artifact(
            hydra_log, suffix="_metrics", intermediate=True
        )

log_system_params_to_mlflow

log_system_params_to_mlflow(prefix: str = 'sys/') -> None

Log system parameters (hardware, library versions) to MLflow.

PARAMETER DESCRIPTION
prefix

Prefix for parameter names in MLflow.

TYPE: str DEFAULT: "sys/"

Source code in src/log_helpers/mlflow_utils.py
def log_system_params_to_mlflow(prefix: str = "sys/") -> None:
    """Log system parameters (hardware, library versions) to MLflow.

    Parameters
    ----------
    prefix : str, default "sys/"
        Prefix for parameter names in MLflow.
    """
    dict = get_system_param_dict()
    logger.info("Logging system parameters to MLflow")
    for key1, value1 in dict.items():
        for key2, value2 in dict[key1].items():
            logger.debug(f"Param type = {key1}, logging {prefix + key2} to MLflow")
            mlflow.log_param(prefix + key2, value2)

log_mlflow_params

log_mlflow_params(
    mlflow_params: Dict[str, Any],
    model_name: Optional[str] = None,
    run_name: Optional[str] = None,
) -> None

Log model parameters and system info to MLflow.

PARAMETER DESCRIPTION
mlflow_params

Dictionary of parameters to log.

TYPE: dict

model_name

Model name to log as 'model' parameter.

TYPE: str DEFAULT: None

run_name

Run name (currently unused).

TYPE: str DEFAULT: None

Source code in src/log_helpers/mlflow_utils.py
def log_mlflow_params(
    mlflow_params: Dict[str, Any],
    model_name: Optional[str] = None,
    run_name: Optional[str] = None,
) -> None:
    """Log model parameters and system info to MLflow.

    Parameters
    ----------
    mlflow_params : dict
        Dictionary of parameters to log.
    model_name : str, optional
        Model name to log as 'model' parameter.
    run_name : str, optional
        Run name (currently unused).
    """
    logger.info("Logging MLflow parameters")
    try:
        mlflow.log_param("model", model_name)
    except Exception as e:
        logger.error(f"Failed to log model name to MLflow: {e}")

    for key, value in mlflow_params.items():
        mlflow.log_param(key, value)
    log_system_params_to_mlflow()

save_pypots_model_to_mlflow

save_pypots_model_to_mlflow(
    entry: DirEntry,
    model: Any,
    cfg: DictConfig,
    as_artifact: bool = False,
) -> None

Save PyPOTS model to MLflow as artifact or registered model.

PARAMETER DESCRIPTION
entry

Directory entry for the model file.

TYPE: DirEntry

model

PyPOTS model object.

TYPE: object

cfg

Configuration object.

TYPE: DictConfig

as_artifact

If True, log as simple artifact; if False, use MLflow model logging.

TYPE: bool DEFAULT: False

Source code in src/log_helpers/mlflow_utils.py
def save_pypots_model_to_mlflow(
    entry: os.DirEntry, model: Any, cfg: DictConfig, as_artifact: bool = False
) -> None:
    """Save PyPOTS model to MLflow as artifact or registered model.

    Parameters
    ----------
    entry : os.DirEntry
        Directory entry for the model file.
    model : object
        PyPOTS model object.
    cfg : DictConfig
        Configuration object.
    as_artifact : bool, default False
        If True, log as simple artifact; if False, use MLflow model logging.
    """
    # Log the model to the models directory
    if as_artifact:
        mlflow.log_artifact(entry.path, artifact_path="models")
    else:
        mlflow_log_pytorch_model(model, path=entry.path, cfg=cfg)

mlflow_log_pytorch_model

mlflow_log_pytorch_model(
    model: Any, path: str, cfg: DictConfig
) -> None

Log PyTorch model to MLflow.

PARAMETER DESCRIPTION
model

PyTorch model to log.

TYPE: Module

path

Artifact path for the model.

TYPE: str

cfg

Configuration object (currently unused).

TYPE: DictConfig

Notes

This is a basic implementation without model signature. PyPOTS models may require special handling as they are not standard torch.nn.Module.

Source code in src/log_helpers/mlflow_utils.py
def mlflow_log_pytorch_model(model: Any, path: str, cfg: DictConfig) -> None:
    """Log PyTorch model to MLflow.

    Parameters
    ----------
    model : torch.nn.Module
        PyTorch model to log.
    path : str
        Artifact path for the model.
    cfg : DictConfig
        Configuration object (currently unused).

    Notes
    -----
    This is a basic implementation without model signature. PyPOTS models
    may require special handling as they are not standard torch.nn.Module.
    """
    # https://mlflow.org/docs/latest/python_api/mlflow.pytorch.html#mlflow.pytorch.log_model
    # TODO! impelment with signature and all when you are about the use this for inference,
    #  at this point for the paper, we only really need the results from the imputation, not the
    #  object model itself yet
    # TODO! PyPOTS model logging,
    #  TypeError: Argument 'pytorch_model' should be a torch.nn.Module
    mlflow.pytorch.log_model(
        model, path, conda_env=None, code_paths=None, registered_model_name=None
    )

pytpots_artifact_wrapper

pytpots_artifact_wrapper(
    pypots_dir: str,
    model: Any,
    cfg: DictConfig,
    model_ext: str = ".pypots",
    as_artifact: bool = True,
) -> None

Log all PyPOTS artifacts from a directory to MLflow.

Iterates through the PyPOTS output directory and logs directories, model files, and other artifacts appropriately.

PARAMETER DESCRIPTION
pypots_dir

Path to PyPOTS output directory.

TYPE: str

model

PyPOTS model object.

TYPE: object

cfg

Configuration object.

TYPE: DictConfig

model_ext

File extension for model files.

TYPE: str DEFAULT: ".pypots"

as_artifact

If True, log model as artifact; if False, use MLflow model logging.

TYPE: bool DEFAULT: True

RAISES DESCRIPTION
Exception

If artifact logging fails.

Source code in src/log_helpers/mlflow_utils.py
def pytpots_artifact_wrapper(
    pypots_dir: str,
    model: Any,
    cfg: DictConfig,
    model_ext: str = ".pypots",
    as_artifact: bool = True,
) -> None:
    """Log all PyPOTS artifacts from a directory to MLflow.

    Iterates through the PyPOTS output directory and logs directories,
    model files, and other artifacts appropriately.

    Parameters
    ----------
    pypots_dir : str
        Path to PyPOTS output directory.
    model : object
        PyPOTS model object.
    cfg : DictConfig
        Configuration object.
    model_ext : str, default ".pypots"
        File extension for model files.
    as_artifact : bool, default True
        If True, log model as artifact; if False, use MLflow model logging.

    Raises
    ------
    Exception
        If artifact logging fails.
    """
    logger.debug("Logging PyPOTS artifacts")
    obj = os.scandir(pypots_dir)
    try:
        for entry in obj:
            if entry.is_dir():
                logger.debug("dir ", entry.name)
                mlflow.log_artifacts(
                    entry.path, artifact_path="pypots/{}".format(entry.name)
                )
            elif entry.is_file():
                logger.debug("file ", entry.name)
                fname, ext = os.path.splitext(entry.name)
                if ext == model_ext:
                    save_pypots_model_to_mlflow(
                        entry=entry, as_artifact=as_artifact, model=model, cfg=cfg
                    )
                else:
                    mlflow.log_artifact(entry.path, artifact_path="pypots")
            else:
                logger.debug(
                    "Unknown entry type (not logging as PyPots artifact: ", entry.name
                )

    except Exception as e:
        logger.error(f"Failed to log results artifact: {e}")
        raise e

log_mlflow_artifacts_after_pypots_model_train

log_mlflow_artifacts_after_pypots_model_train(
    results_path: str,
    pypots_dir: str,
    model: Any,
    cfg: DictConfig,
) -> None

Log results and PyPOTS artifacts to MLflow after training.

PARAMETER DESCRIPTION
results_path

Path to results pickle file.

TYPE: str

pypots_dir

Path to PyPOTS output directory.

TYPE: str

model

PyPOTS model object.

TYPE: object

cfg

Configuration object.

TYPE: DictConfig

Source code in src/log_helpers/mlflow_utils.py
def log_mlflow_artifacts_after_pypots_model_train(
    results_path: str, pypots_dir: str, model: Any, cfg: DictConfig
) -> None:
    """Log results and PyPOTS artifacts to MLflow after training.

    Parameters
    ----------
    results_path : str
        Path to results pickle file.
    pypots_dir : str
        Path to PyPOTS output directory.
    model : object
        PyPOTS model object.
    cfg : DictConfig
        Configuration object.
    """
    # The results .pickle
    try:
        mlflow.log_artifact(results_path, artifact_path="results")
    except Exception as e:
        logger.error(f"Failed to log results artifact: {e}")
        # https://www.restack.io/docs/mlflow-knowledge-mlflow-log-artifact-permission-denied
        # TODO! Inspect more why this happens? makedir fails
        #  PermissionError: [Errno 13] Permission denied: '/petteri'
        # https://github.com/mlflow/mlflow/issues/212#issuecomment-409260757
        # The artifact store (used for log_model or log_artifact) is used to persist the larger data such as models,
        # which is why we rely on an external persistent store. This is why the log_metric and log_param calls work
        # -- they only need to talk to the server -- while the log_model call is failing.

    # The pypots artifacts
    pytpots_artifact_wrapper(pypots_dir, model, cfg)

log_imputation_db_to_mlflow

log_imputation_db_to_mlflow(
    db_path: str,
    mlflow_cfg: Dict[str, Any],
    model: str,
    cfg: DictConfig,
) -> None

Log imputation DuckDB database to MLflow.

PARAMETER DESCRIPTION
db_path

Path to DuckDB file.

TYPE: str

mlflow_cfg

MLflow configuration with run_info.

TYPE: dict

model

Model name (currently unused).

TYPE: str

cfg

Configuration object (currently unused).

TYPE: DictConfig

Source code in src/log_helpers/mlflow_utils.py
def log_imputation_db_to_mlflow(
    db_path: str, mlflow_cfg: Dict[str, Any], model: str, cfg: DictConfig
) -> None:
    """Log imputation DuckDB database to MLflow.

    Parameters
    ----------
    db_path : str
        Path to DuckDB file.
    mlflow_cfg : dict
        MLflow configuration with run_info.
    model : str
        Model name (currently unused).
    cfg : DictConfig
        Configuration object (currently unused).
    """
    with mlflow.start_run(run_id=mlflow_cfg["run_info"]["run_id"]):
        logger.info("Logging imputation database to MLflow as DuckDB")
        mlflow.log_artifact(db_path, artifact_path="imputation_db")

post_imputation_model_training_mlflow_log

post_imputation_model_training_mlflow_log(
    metrics_model: Dict[str, Any],
    model_artifacts: Dict[str, Any],
    cfg: DictConfig,
) -> None

Check if current model improved over previous best and log accordingly.

Compares current model metrics against previously logged best model and logs to MLflow Model Registry if improved.

PARAMETER DESCRIPTION
metrics_model

Current model metrics.

TYPE: dict

model_artifacts

Model artifacts with MLflow info.

TYPE: dict

cfg

Configuration object.

TYPE: DictConfig

Source code in src/log_helpers/mlflow_utils.py
def post_imputation_model_training_mlflow_log(
    metrics_model: Dict[str, Any], model_artifacts: Dict[str, Any], cfg: DictConfig
) -> None:
    """Check if current model improved over previous best and log accordingly.

    Compares current model metrics against previously logged best model
    and logs to MLflow Model Registry if improved.

    Parameters
    ----------
    metrics_model : dict
        Current model metrics.
    model_artifacts : dict
        Model artifacts with MLflow info.
    cfg : DictConfig
        Configuration object.
    """
    best_previous_run = get_best_previous_mlflow_logged_model(
        model_dict=model_artifacts, cfg=cfg
    )
    model_improved = is_current_better_than_previous(
        metrics_model=metrics_model,
        model_dict=model_artifacts,
        best_previous_run=best_previous_run,
        cfg=cfg,
    )

    if model_improved:
        # TODO! Implement actually the registering, and the model logging during previous MLflow logging
        logger.warning("Model improved, now possible to register MLflow Model Registry")
    else:
        logger.info(
            "Model did not improve, not registering to MLflow Model Registry "
            "as the best model (Staging) TO-BE-IMPLEMENTED!"
        )

check_if_improved_with_direction

check_if_improved_with_direction(
    metric_string: str,
    metric_direction: str,
    current_metric_value: float,
    best_metric_value: float,
) -> bool

Check if current metric is better than previous best based on direction.

PARAMETER DESCRIPTION
metric_string

Name of the metric for logging.

TYPE: str

metric_direction

'ASC' if lower is better, 'DESC' if higher is better.

TYPE: str

current_metric_value

Current model's metric value.

TYPE: float

best_metric_value

Previous best metric value.

TYPE: float

RETURNS DESCRIPTION
bool

True if current is better than previous best.

RAISES DESCRIPTION
ValueError

If metric_direction is not 'ASC' or 'DESC'.

Source code in src/log_helpers/mlflow_utils.py
def check_if_improved_with_direction(
    metric_string: str,
    metric_direction: str,
    current_metric_value: float,
    best_metric_value: float,
) -> bool:
    """Check if current metric is better than previous best based on direction.

    Parameters
    ----------
    metric_string : str
        Name of the metric for logging.
    metric_direction : str
        'ASC' if lower is better, 'DESC' if higher is better.
    current_metric_value : float
        Current model's metric value.
    best_metric_value : float
        Previous best metric value.

    Returns
    -------
    bool
        True if current is better than previous best.

    Raises
    ------
    ValueError
        If metric_direction is not 'ASC' or 'DESC'.
    """
    is_improved = False
    if metric_direction == "ASC":
        if current_metric_value < best_metric_value:
            logger.info(
                f"Current metric ({metric_string} = {current_metric_value:.5f}) is better than the previous best"
            )
            is_improved = True
        else:
            logger.info(
                f"Current metric ({metric_string} = {current_metric_value:.5f}) is worse (or equal) than the "
                f"previous best ({best_metric_value:.5f})"
            )
    elif metric_direction == "DESC":
        if current_metric_value > best_metric_value:
            logger.info(
                f"Current metric ({metric_string} = {current_metric_value:.5f}) is better than the previous best"
            )
            is_improved = True
        else:
            logger.info(
                f"Current metric ({metric_string} = {current_metric_value:.5f}) is worse (or equal) than the "
                f"previous best ({best_metric_value:.5f})"
            )
    else:
        logger.error(f"Unknown metric direction = {metric_direction}")
        raise ValueError(f"Unknown metric direction = {metric_direction}")

    return is_improved

is_current_better_than_previous

is_current_better_than_previous(
    metrics_model: Dict[str, Any],
    model_dict: Dict[str, Any],
    best_previous_run: Dict[str, Any],
    cfg: DictConfig,
) -> bool

Determine if current model outperforms the previous best.

PARAMETER DESCRIPTION
metrics_model

Current model metrics.

TYPE: dict

model_dict

Model artifacts with MLflow info.

TYPE: dict

best_previous_run

Previous best run data.

TYPE: dict

cfg

Configuration object.

TYPE: DictConfig

RETURNS DESCRIPTION
bool

True if current model is better.

Source code in src/log_helpers/mlflow_utils.py
def is_current_better_than_previous(
    metrics_model: Dict[str, Any],
    model_dict: Dict[str, Any],
    best_previous_run: Dict[str, Any],
    cfg: DictConfig,
) -> bool:
    """Determine if current model outperforms the previous best.

    Parameters
    ----------
    metrics_model : dict
        Current model metrics.
    model_dict : dict
        Model artifacts with MLflow info.
    best_previous_run : dict
        Previous best run data.
    cfg : DictConfig
        Configuration object.

    Returns
    -------
    bool
        True if current model is better.
    """
    mlflow_info = get_mlflow_info_from_model_dict(model_dict)
    current_experiment, metric_string, split_key, metric_direction = (
        what_to_search_from_mlflow(
            run_name=mlflow_info["run_info"]["run_name"], cfg=cfg
        )
    )

    best_metric_value = best_previous_run[f"metrics.{split_key}/{metric_string}"]
    logger.info(
        f"Best metric ({metric_string} = {best_metric_value}) from the logged MLflow runs"
    )
    current_metric_value = get_best_metric_from_current_run(
        metrics_model=metrics_model, split_key=split_key, metric_string=metric_string
    )

    return check_if_improved_with_direction(
        metric_string, metric_direction, current_metric_value, best_metric_value
    )

mlflow_artifacts

get_mlflow_run_ids_from_imputation_artifacts

get_mlflow_run_ids_from_imputation_artifacts(
    imputation_artifacts: Dict[str, Any],
) -> Dict[str, str]

Extract MLflow run IDs from imputation artifacts dictionary.

PARAMETER DESCRIPTION
imputation_artifacts

Dictionary containing 'artifacts' key with model-specific MLflow info.

TYPE: dict

RETURNS DESCRIPTION
dict

Mapping of model names to their MLflow run IDs.

Source code in src/log_helpers/mlflow_artifacts.py
def get_mlflow_run_ids_from_imputation_artifacts(
    imputation_artifacts: Dict[str, Any],
) -> Dict[str, str]:
    """Extract MLflow run IDs from imputation artifacts dictionary.

    Parameters
    ----------
    imputation_artifacts : dict
        Dictionary containing 'artifacts' key with model-specific MLflow info.

    Returns
    -------
    dict
        Mapping of model names to their MLflow run IDs.
    """
    run_ids: Dict[str, str] = {}
    for model_name in imputation_artifacts["artifacts"].keys():
        mlflow_info = imputation_artifacts["artifacts"][model_name]["mlflow"]
        run_ids[model_name] = mlflow_info["run_info"]["run_id"]
    return run_ids

get_mlflow_metric_params

get_mlflow_metric_params(
    metrics: Dict[str, Any],
    cfg: DictConfig,
    splitkey: str = "gt",
    metrictype: str = "global",
    metricname: str = "mae",
) -> Dict[str, Any]

Extract specific metric parameters from nested metrics dictionary for MLflow logging.

Filters metrics by split key, metric type, and metric name to keep the MLflow dashboard clean while still allowing programmatic access to all metrics.

PARAMETER DESCRIPTION
metrics

Nested metrics dictionary with structure: {model_name: {split: {split_key: {metric_type: {metric: value}}}}}.

TYPE: dict

cfg

Configuration object (currently unused).

TYPE: DictConfig

splitkey

Split key to filter (e.g., 'gt' for ground truth).

TYPE: str DEFAULT: "gt"

metrictype

Metric type to filter (e.g., 'global', 'per_subject').

TYPE: str DEFAULT: "global"

metricname

Specific metric name to extract.

TYPE: str DEFAULT: "mae"

RETURNS DESCRIPTION
dict

Dictionary with model name and filtered metrics suitable for MLflow logging.

RAISES DESCRIPTION
ValueError

If more than one model is found in the metrics dictionary.

Source code in src/log_helpers/mlflow_artifacts.py
def get_mlflow_metric_params(
    metrics: Dict[str, Any],
    cfg: DictConfig,
    splitkey: str = "gt",
    metrictype: str = "global",
    metricname: str = "mae",
) -> Dict[str, Any]:
    """Extract specific metric parameters from nested metrics dictionary for MLflow logging.

    Filters metrics by split key, metric type, and metric name to keep the
    MLflow dashboard clean while still allowing programmatic access to all metrics.

    Parameters
    ----------
    metrics : dict
        Nested metrics dictionary with structure:
        {model_name: {split: {split_key: {metric_type: {metric: value}}}}}.
    cfg : DictConfig
        Configuration object (currently unused).
    splitkey : str, default "gt"
        Split key to filter (e.g., 'gt' for ground truth).
    metrictype : str, default "global"
        Metric type to filter (e.g., 'global', 'per_subject').
    metricname : str, default "mae"
        Specific metric name to extract.

    Returns
    -------
    dict
        Dictionary with model name and filtered metrics suitable for MLflow logging.

    Raises
    ------
    ValueError
        If more than one model is found in the metrics dictionary.
    """
    # You could obviously just get all, but taking the main metric to keep the Dashboard clean
    # you can always get all the metrics programatically from the MLflow API
    for i, model_name in enumerate(metrics.keys()):
        if i > 0:
            logger.error(
                "More than one model found, this should not happen now, as all the subflows should"
                "operate independently, and you should only have one model in the metrics dict"
            )
            raise ValueError("Too many models in the metrics dictionary")
        metric_params = {"model": model_name}
        for split in metrics[model_name].keys():
            for split_key in metrics[model_name][split].keys():
                for metric_type in metrics[model_name][split][split_key].keys():
                    for metric in metrics[model_name][split][split_key][
                        metric_type
                    ].keys():
                        if (
                            split_key == splitkey
                            and metric_type == metrictype
                            and metric == metricname
                        ):
                            key_out = f"imp_{split}/{metric}"
                            value_in = metrics[model_name][split][split_key][
                                metric_type
                            ][metric]
                            metric_params[key_out] = value_in

    return metric_params

get_mlflow_params

get_mlflow_params(
    mlflow_info: Dict[str, Any],
) -> Tuple[str, str]

Extract and set MLflow experiment and run ID from info dictionary.

PARAMETER DESCRIPTION
mlflow_info

Dictionary containing 'experiment' and 'run_info' keys with MLflow metadata.

TYPE: dict

RETURNS DESCRIPTION
tuple of str

Tuple of (experiment_id, run_id).

Notes

Also sets the MLflow experiment as a side effect.

Source code in src/log_helpers/mlflow_artifacts.py
def get_mlflow_params(mlflow_info: Dict[str, Any]) -> Tuple[str, str]:
    """Extract and set MLflow experiment and run ID from info dictionary.

    Parameters
    ----------
    mlflow_info : dict
        Dictionary containing 'experiment' and 'run_info' keys with MLflow metadata.

    Returns
    -------
    tuple of str
        Tuple of (experiment_id, run_id).

    Notes
    -----
    Also sets the MLflow experiment as a side effect.
    """
    # Get the MLflow experiment and run ID that was used during the training
    experiment_id = mlflow_info["experiment"]["name"]
    run_id = mlflow_info["run_info"]["run_id"]
    mlflow.set_experiment(experiment_id)
    return experiment_id, run_id

get_mlflow_info_from_model_dict

get_mlflow_info_from_model_dict(
    model_dict: Dict[str, Any],
) -> Dict[str, Any]

Extract MLflow info dictionary from model artifacts dictionary.

PARAMETER DESCRIPTION
model_dict

Model artifacts dictionary containing 'mlflow' key with run/experiment info.

TYPE: dict

RETURNS DESCRIPTION
dict

MLflow info dictionary with run_info, experiment, and artifact_uri.

RAISES DESCRIPTION
Exception

If 'mlflow' key is missing from model_dict.

Source code in src/log_helpers/mlflow_artifacts.py
def get_mlflow_info_from_model_dict(model_dict: Dict[str, Any]) -> Dict[str, Any]:
    """Extract MLflow info dictionary from model artifacts dictionary.

    Parameters
    ----------
    model_dict : dict
        Model artifacts dictionary containing 'mlflow' key with run/experiment info.

    Returns
    -------
    dict
        MLflow info dictionary with run_info, experiment, and artifact_uri.

    Raises
    ------
    Exception
        If 'mlflow' key is missing from model_dict.
    """
    # If everything went ok, you should have the MLflow run/experiment/artifact_uri/etc. info saved here
    try:
        mlflow_info = model_dict["mlflow"]
    except Exception as e:
        logger.error(f"Failed to get the MLflow info: {e}")
        logger.error(
            "How come did this happen, and you never saved the 'mlflow' key in the model_dict?"
        )
        raise e

    return mlflow_info

get_duckdb_from_mlflow

get_duckdb_from_mlflow(
    artifact_uri: str,
    dir_name: str = "data",
    wildcard: str = ".db",
) -> str

Download and locate DuckDB file from MLflow artifacts.

PARAMETER DESCRIPTION
artifact_uri

MLflow artifact URI to search.

TYPE: str

dir_name

Directory name within artifacts containing the database.

TYPE: str DEFAULT: "data"

wildcard

File extension to match.

TYPE: str DEFAULT: ".db"

RETURNS DESCRIPTION
str

Local path to downloaded DuckDB file.

RAISES DESCRIPTION
FileNotFoundError

If no DuckDB artifact is found.

Source code in src/log_helpers/mlflow_artifacts.py
def get_duckdb_from_mlflow(
    artifact_uri: str, dir_name: str = "data", wildcard: str = ".db"
) -> str:
    """Download and locate DuckDB file from MLflow artifacts.

    Parameters
    ----------
    artifact_uri : str
        MLflow artifact URI to search.
    dir_name : str, default "data"
        Directory name within artifacts containing the database.
    wildcard : str, default ".db"
        File extension to match.

    Returns
    -------
    str
        Local path to downloaded DuckDB file.

    Raises
    ------
    FileNotFoundError
        If no DuckDB artifact is found.
    """
    db_path = None
    artifacts = mlflow.artifacts.list_artifacts(artifact_uri=artifact_uri)
    if len(artifacts) == 0:
        logger.error(
            "No DuckDB artifact found from the MLflow run, artifact_uri = {}".format(
                artifact_uri
            )
        )
        raise FileNotFoundError(
            "No DuckDB artifact found from the MLflow run, artifact_uri = {}".format(
                artifact_uri
            )
        )

    for artifact in artifacts:
        if dir_name in artifact.path:
            folder = mlflow.artifacts.download_artifacts(
                artifact_uri=f"{artifact_uri}/{dir_name}"
            )
            for root, dirs, files in Path(folder).walk():
                for file in files:
                    if wildcard in file:
                        db_path = str(root / file)
    if db_path is None:
        logger.error("Could not find the DuckDB file from the MLflow artifacts")
        raise FileNotFoundError(
            "Could not find the DuckDB file from the MLflow artifacts"
        )
    return db_path

write_new_col_to_mlflow

write_new_col_to_mlflow(
    model_best_runs: DataFrame,
    col_name: str,
    col_name_init: str,
) -> None

Write a new metric column to MLflow runs.

Used for harmonizing column names by writing values under a new metric name.

PARAMETER DESCRIPTION
model_best_runs

DataFrame containing run_id and the column to write.

TYPE: DataFrame

col_name

Source column name in the DataFrame.

TYPE: str

col_name_init

Target metric name for MLflow (will have 'metrics.' prefix stripped).

TYPE: str

Source code in src/log_helpers/mlflow_artifacts.py
def write_new_col_to_mlflow(
    model_best_runs: pd.DataFrame, col_name: str, col_name_init: str
) -> None:
    """Write a new metric column to MLflow runs.

    Used for harmonizing column names by writing values under a new metric name.

    Parameters
    ----------
    model_best_runs : pd.DataFrame
        DataFrame containing run_id and the column to write.
    col_name : str
        Source column name in the DataFrame.
    col_name_init : str
        Target metric name for MLflow (will have 'metrics.' prefix stripped).
    """
    no_of_runs = model_best_runs.shape[0]
    for i in range(no_of_runs):
        run_id = model_best_runs.iloc[i]["run_id"]
        with mlflow.start_run(run_id=run_id):
            value = model_best_runs.iloc[i][col_name]
            col_name_out = col_name_init.replace("metrics.", "")
            logger.info(f"Writing the new column {col_name_out} with value {value}")
            mlflow.log_metric(col_name_out, value)
            mlflow.end_run()

get_col_for_for_best_anomaly_detection_metric

get_col_for_for_best_anomaly_detection_metric(
    best_metric_cfg: DictConfig, task: str
) -> str

Get DataFrame column name for best metric based on task type.

PARAMETER DESCRIPTION
best_metric_cfg

Configuration with 'string' (metric name) and 'split' keys.

TYPE: DictConfig

task

Task type: 'anomaly_detection', 'outlier_detection', or 'imputation'.

TYPE: str

RETURNS DESCRIPTION
str

Column name in format 'metrics.{split}/{metric}' or direct string.

RAISES DESCRIPTION
ValueError

If task type is not recognized.

Source code in src/log_helpers/mlflow_artifacts.py
def get_col_for_for_best_anomaly_detection_metric(
    best_metric_cfg: DictConfig, task: str
) -> str:
    """Get DataFrame column name for best metric based on task type.

    Parameters
    ----------
    best_metric_cfg : DictConfig
        Configuration with 'string' (metric name) and 'split' keys.
    task : str
        Task type: 'anomaly_detection', 'outlier_detection', or 'imputation'.

    Returns
    -------
    str
        Column name in format 'metrics.{split}/{metric}' or direct string.

    Raises
    ------
    ValueError
        If task type is not recognized.
    """
    if task == "anomaly_detection" or task == "outlier_detection":
        # use only one name eventually
        best_metric_name = best_metric_cfg["string"]
        split = best_metric_cfg["split"]
        col_name = f"metrics.{split}/{best_metric_name}"
    elif task == "imputation":  # or task == "outlier_detection":
        # TODO! This is a bit hacky, but the best metric is always the same for imputation
        #  as not this is directly the col_name of anomaly detection
        col_name = best_metric_cfg["string"]
    else:
        logger.error("Unknon task = {}".format(task))
        raise ValueError("Unknon task = {}".format(task))
    return col_name

harmonize_anomaly_col_name

harmonize_anomaly_col_name(
    col_name: str,
    model_best_runs: DataFrame,
    best_metric_cfg: DictConfig,
    model: str,
) -> str

Harmonize metric column name if not found in DataFrame.

Falls back to 'test' split if the specified column is missing, and writes the harmonized values back to MLflow.

PARAMETER DESCRIPTION
col_name

Expected column name.

TYPE: str

model_best_runs

DataFrame with MLflow run data.

TYPE: DataFrame

best_metric_cfg

Best metric configuration.

TYPE: DictConfig

model

Model name for logging.

TYPE: str

RETURNS DESCRIPTION
str

Harmonized column name that exists in the DataFrame.

RAISES DESCRIPTION
ValueError

If harmonized column contains only NaN values.

Source code in src/log_helpers/mlflow_artifacts.py
def harmonize_anomaly_col_name(
    col_name: str,
    model_best_runs: pd.DataFrame,
    best_metric_cfg: DictConfig,
    model: str,
) -> str:
    """Harmonize metric column name if not found in DataFrame.

    Falls back to 'test' split if the specified column is missing, and writes
    the harmonized values back to MLflow.

    Parameters
    ----------
    col_name : str
        Expected column name.
    model_best_runs : pd.DataFrame
        DataFrame with MLflow run data.
    best_metric_cfg : DictConfig
        Best metric configuration.
    model : str
        Model name for logging.

    Returns
    -------
    str
        Harmonized column name that exists in the DataFrame.

    Raises
    ------
    ValueError
        If harmonized column contains only NaN values.
    """
    if col_name not in model_best_runs.columns:
        col_name_init = col_name
        col_name = f"metrics.test/{best_metric_cfg['string']}"
        # best_series = model_best_runs.iloc[0]
        best_values = model_best_runs[col_name].to_numpy()
        if np.all(np.isnan(best_values)):
            logger.error(
                f"Could not find the column {col_name} in the model_best_runs dataframe"
            )
            raise ValueError(
                f"Could not find the column {col_name} in the model_best_runs dataframe"
            )
        else:
            # harmonize the column name and write this with the new column name
            logger.info("Harmonizing the column name to test")
            write_new_col_to_mlflow(model_best_runs, col_name, col_name_init)

    return col_name

threshold_filter_run

threshold_filter_run(
    best_run: Union[Series, DataFrame],
    col_name: str,
    best_metric_cfg: DictConfig,
) -> Optional[Union[Series, DataFrame]]

Filter run based on ensemble quality threshold.

Returns None if the run's metric does not meet the threshold requirement.

PARAMETER DESCRIPTION
best_run

Run data to filter.

TYPE: Series or DataFrame

col_name

Column name containing the metric to check.

TYPE: str

best_metric_cfg

Configuration with 'ensemble_quality_threshold' and 'direction' keys.

TYPE: DictConfig

RETURNS DESCRIPTION
pd.Series, pd.DataFrame, or None

Original run data if threshold is met, None otherwise.

Source code in src/log_helpers/mlflow_artifacts.py
def threshold_filter_run(
    best_run: Union[pd.Series, pd.DataFrame], col_name: str, best_metric_cfg: DictConfig
) -> Optional[Union[pd.Series, pd.DataFrame]]:
    """Filter run based on ensemble quality threshold.

    Returns None if the run's metric does not meet the threshold requirement.

    Parameters
    ----------
    best_run : pd.Series or pd.DataFrame
        Run data to filter.
    col_name : str
        Column name containing the metric to check.
    best_metric_cfg : DictConfig
        Configuration with 'ensemble_quality_threshold' and 'direction' keys.

    Returns
    -------
    pd.Series, pd.DataFrame, or None
        Original run data if threshold is met, None otherwise.
    """
    input_was_df = False
    if isinstance(best_run, pd.DataFrame):
        input_was_df = True
        best_run = pd.Series(best_run.iloc[0])

    if best_metric_cfg["ensemble_quality_threshold"] is not None:
        if best_metric_cfg["direction"] == "ASC":
            if best_run[col_name] > best_metric_cfg["ensemble_quality_threshold"]:
                # logger.warning(
                #     f"Model did not reach the ensemble quality threshold of "
                #     f"{best_metric_cfg['ensemble_quality_threshold']}"
                # )
                return None
        elif best_metric_cfg["direction"] == "DESC":
            if best_run[col_name] < best_metric_cfg["ensemble_quality_threshold"]:
                # logger.warning(
                #     f"Model did not reach the ensemble quality threshold of "
                #     f"{best_metric_cfg['ensemble_quality_threshold']}"
                # )
                return None
        else:
            logger.error("The direction of the best metric is not recognized")
            raise ValueError("The direction of the best metric is not recognized")

    if input_was_df:
        best_run = pd.DataFrame(best_run).T

    return best_run

get_best_run_of_pd_dataframe

get_best_run_of_pd_dataframe(
    model_best_runs: DataFrame,
    cfg: DictConfig,
    best_metric_cfg: DictConfig,
    task: str,
    model: str,
    include_all_variants: bool = False,
) -> Tuple[
    Optional[Union[Series, DataFrame]], Optional[float]
]

Find the best MLflow run from a DataFrame based on metric configuration.

PARAMETER DESCRIPTION
model_best_runs

DataFrame containing MLflow runs for the model.

TYPE: DataFrame

cfg

Full configuration object.

TYPE: DictConfig

best_metric_cfg

Configuration specifying best metric, direction, and threshold.

TYPE: DictConfig

task

Task type for determining column name format.

TYPE: str

model

Model name for logging.

TYPE: str

include_all_variants

If True, return all runs sorted; if False, return only the best run.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
tuple

Tuple of (best_run, best_metric) where best_run is a Series/DataFrame and best_metric is the metric value (or None if all variants returned).

Source code in src/log_helpers/mlflow_artifacts.py
def get_best_run_of_pd_dataframe(
    model_best_runs: pd.DataFrame,
    cfg: DictConfig,
    best_metric_cfg: DictConfig,
    task: str,
    model: str,
    include_all_variants: bool = False,
) -> Tuple[Optional[Union[pd.Series, pd.DataFrame]], Optional[float]]:
    """Find the best MLflow run from a DataFrame based on metric configuration.

    Parameters
    ----------
    model_best_runs : pd.DataFrame
        DataFrame containing MLflow runs for the model.
    cfg : DictConfig
        Full configuration object.
    best_metric_cfg : DictConfig
        Configuration specifying best metric, direction, and threshold.
    task : str
        Task type for determining column name format.
    model : str
        Model name for logging.
    include_all_variants : bool, default False
        If True, return all runs sorted; if False, return only the best run.

    Returns
    -------
    tuple
        Tuple of (best_run, best_metric) where best_run is a Series/DataFrame
        and best_metric is the metric value (or None if all variants returned).
    """
    col_name = get_col_for_for_best_anomaly_detection_metric(best_metric_cfg, task)
    col_name = harmonize_anomaly_col_name(
        col_name, model_best_runs, best_metric_cfg, model
    )

    try:
        if best_metric_cfg["direction"] == "ASC":
            sorted_runs = model_best_runs.sort_values(by=col_name, ascending=True)
        elif best_metric_cfg["direction"] == "DESC":
            sorted_runs = model_best_runs.sort_values(by=col_name, ascending=False)
        else:
            logger.error("The direction of the best metric is not recognized")
            raise ValueError("The direction of the best metric is not recognized")
    except Exception as e:
        logger.error(f"Failed to sort the runs based on the best metric: {e}")
        raise e

    if include_all_variants:
        # when you just want to recompute the metrics
        best_run = sorted_runs
        best_metric = None
    else:
        best_run = sorted_runs.iloc[0]
        logger.info(
            f"{model}: The best {best_metric_cfg['string']} is {best_run[col_name]:.3f}"
        )
        best_run = threshold_filter_run(best_run, col_name, best_metric_cfg)
        if best_run is not None:
            best_metric = best_run[col_name]
        else:
            best_metric = None

    return best_run, best_metric

get_imputation_results_from_mlflow

get_imputation_results_from_mlflow(
    mlflow_run: Series,
    model_name: str,
    cfg: DictConfig,
    dir_name: str = "imputation",
) -> Dict[str, Any]

Download imputation results from MLflow artifact store.

PARAMETER DESCRIPTION
mlflow_run

MLflow run data containing run_id and tags.

TYPE: Series

model_name

Name of the imputation model.

TYPE: str

cfg

Configuration object (currently unused).

TYPE: DictConfig

dir_name

Artifact subdirectory name.

TYPE: str DEFAULT: "imputation"

RETURNS DESCRIPTION
dict

Loaded imputation results dictionary with 'mlflow_run' key added.

RAISES DESCRIPTION
FileNotFoundError

If imputation results cannot be found or downloaded.

Source code in src/log_helpers/mlflow_artifacts.py
def get_imputation_results_from_mlflow(
    mlflow_run: pd.Series,
    model_name: str,
    cfg: DictConfig,
    dir_name: str = "imputation",
) -> Dict[str, Any]:
    """Download imputation results from MLflow artifact store.

    Parameters
    ----------
    mlflow_run : pd.Series
        MLflow run data containing run_id and tags.
    model_name : str
        Name of the imputation model.
    cfg : DictConfig
        Configuration object (currently unused).
    dir_name : str, default "imputation"
        Artifact subdirectory name.

    Returns
    -------
    dict
        Loaded imputation results dictionary with 'mlflow_run' key added.

    Raises
    ------
    FileNotFoundError
        If imputation results cannot be found or downloaded.
    """
    if "ensemble" in mlflow_run["tags.mlflow.runName"]:
        fname = get_ensemble_pickle_name(ensemble_name=model_name)
        logger.debug(f"Ensemble model found, loading the ensemble pickle: {fname}")
    else:
        fname = get_imputation_pickle_name(model_name)

    artifact_uri = "runs:/{}/{}/{}".format(mlflow_run["run_id"], dir_name, fname)
    try:
        path_dir = mlflow.artifacts.download_artifacts(artifact_uri)
    except Exception as e:
        logger.error(f"Could not download the imputation results from MLflow: {e}")
        logger.info("mlflow_run: {}".format(mlflow_run))
        raise e

    if path_dir is not None:
        logger.info(
            f"Imputation results downloaded from MLflow, artifact_uri = {artifact_uri}"
        )
        dict_out = load_results_dict(path_dir)
    else:
        logger.error(
            f"Could not find imputation results for model = {model_name}, artifact_uri: {artifact_uri}"
        )
        raise FileNotFoundError(
            f"Could not find imputation results for model = {model_name}, artifact_uri: {artifact_uri}"
        )

    # Add the artifact_uri to the dictionary
    dict_out["mlflow_run"] = mlflow_run

    return dict_out

get_mlflow_artifact_uri_from_run

get_mlflow_artifact_uri_from_run(
    best_run: Union[Dict[str, Any], Series],
) -> str

Get artifact URI from MLflow run.

PARAMETER DESCRIPTION
best_run

Run data containing 'run_id'.

TYPE: dict or Series

RETURNS DESCRIPTION
str

Artifact URI for the run.

Source code in src/log_helpers/mlflow_artifacts.py
def get_mlflow_artifact_uri_from_run(best_run: Union[Dict[str, Any], pd.Series]) -> str:
    """Get artifact URI from MLflow run.

    Parameters
    ----------
    best_run : dict or pd.Series
        Run data containing 'run_id'.

    Returns
    -------
    str
        Artifact URI for the run.
    """
    artifact_uri: str = mlflow.get_run(best_run["run_id"]).info.artifact_uri
    return artifact_uri

get_best_metric_from_current_run

get_best_metric_from_current_run(
    metrics_model: dict, split_key: str, metric_string: str
) -> float

Extract specific metric value from current run's metrics dictionary.

PARAMETER DESCRIPTION
metrics_model

Metrics dictionary with structure {split_key: {global: {metric: value}}}.

TYPE: dict

split_key

Data split key (e.g., 'test', 'val').

TYPE: str

metric_string

Name of the metric to extract.

TYPE: str

RETURNS DESCRIPTION
float

The metric value.

Source code in src/log_helpers/mlflow_artifacts.py
def get_best_metric_from_current_run(
    metrics_model: dict, split_key: str, metric_string: str
) -> float:
    """Extract specific metric value from current run's metrics dictionary.

    Parameters
    ----------
    metrics_model : dict
        Metrics dictionary with structure {split_key: {global: {metric: value}}}.
    split_key : str
        Data split key (e.g., 'test', 'val').
    metric_string : str
        Name of the metric to extract.

    Returns
    -------
    float
        The metric value.
    """
    logger.info(
        f"Getting the best metric from the current run, metric = {metric_string}, "
        f"split = {split_key}"
    )
    return metrics_model[split_key]["global"][metric_string]

get_best_previous_mlflow_logged_model

get_best_previous_mlflow_logged_model(
    model_dict: Dict[str, Any], cfg: DictConfig
) -> Optional[Dict[str, Any]]

Find the best previously logged MLflow model matching current configuration.

PARAMETER DESCRIPTION
model_dict

Model artifacts dictionary containing MLflow info.

TYPE: dict

cfg

Configuration for determining search parameters.

TYPE: DictConfig

RETURNS DESCRIPTION
dict

Best previous run data, or None if no matching runs found.

Source code in src/log_helpers/mlflow_artifacts.py
def get_best_previous_mlflow_logged_model(
    model_dict: Dict[str, Any], cfg: DictConfig
) -> Optional[Dict[str, Any]]:
    """Find the best previously logged MLflow model matching current configuration.

    Parameters
    ----------
    model_dict : dict
        Model artifacts dictionary containing MLflow info.
    cfg : DictConfig
        Configuration for determining search parameters.

    Returns
    -------
    dict
        Best previous run data, or None if no matching runs found.
    """
    mlflow_info = get_mlflow_info_from_model_dict(model_dict)
    experiment_id, run_id = get_mlflow_params(mlflow_info)
    current_experiment, metric_string, split_key, metric_direction = (
        what_to_search_from_mlflow(
            run_name=mlflow_info["run_info"]["run_name"], cfg=cfg
        )
    )

    best_previous_run = return_best_mlflow_run(
        current_experiment,
        metric_string,
        split_key,
        metric_direction,
        run_name=mlflow_info["run_info"]["run_name"],
    )

    return best_previous_run

iterate_through_mlflow_run_artifacts

iterate_through_mlflow_run_artifacts(
    run_artifacts: List[FileInfo],
    fname: str,
    run_id: str,
    dir_download: str,
    artifacts_string: str = "imputation",
) -> Optional[Dict[str, Any]]

Iterate through MLflow artifacts to find and download a specific file.

PARAMETER DESCRIPTION
run_artifacts

List of MLflow artifact objects.

TYPE: list

fname

Filename to find and download.

TYPE: str

run_id

MLflow run ID.

TYPE: str

dir_download

Local directory for downloads (currently unused).

TYPE: str

artifacts_string

Artifact path to match.

TYPE: str DEFAULT: "imputation"

RETURNS DESCRIPTION
dict or None

Loaded results dictionary, or None if not found.

RAISES DESCRIPTION
FileNotFoundError

If the specified artifact cannot be found.

Source code in src/log_helpers/mlflow_artifacts.py
def iterate_through_mlflow_run_artifacts(
    run_artifacts: List[FileInfo],
    fname: str,
    run_id: str,
    dir_download: str,
    artifacts_string: str = "imputation",
) -> Optional[Dict[str, Any]]:
    """Iterate through MLflow artifacts to find and download a specific file.

    Parameters
    ----------
    run_artifacts : list
        List of MLflow artifact objects.
    fname : str
        Filename to find and download.
    run_id : str
        MLflow run ID.
    dir_download : str
        Local directory for downloads (currently unused).
    artifacts_string : str, default "imputation"
        Artifact path to match.

    Returns
    -------
    dict or None
        Loaded results dictionary, or None if not found.

    Raises
    ------
    FileNotFoundError
        If the specified artifact cannot be found.
    """
    dict_out = None
    for artifact in run_artifacts:
        if artifact.path == artifacts_string:
            artifact_uri = "runs:/{}/{}/{}".format(run_id, artifact.path, fname)
            path_dir = mlflow.artifacts.download_artifacts(artifact_uri)
            if path_dir is not None:
                dict_out = load_results_dict(path_dir)
            else:
                logger.warning("MLFLOW | Could not find the artifact: {}".format(fname))
                raise FileNotFoundError(
                    "MLFLOW | Could not find the artifact: {}".format(fname)
                )

    return dict_out

download_mlflow_artifacts

download_mlflow_artifacts(
    run_id: str, fname: str, run_artifacts: List[FileInfo]
) -> Optional[Dict[str, Any]]

Download MLflow artifacts for a specific run.

PARAMETER DESCRIPTION
run_id

MLflow run ID.

TYPE: str

fname

Filename to download.

TYPE: str

run_artifacts

List of available artifacts.

TYPE: list

RETURNS DESCRIPTION
dict

Loaded artifacts dictionary.

Source code in src/log_helpers/mlflow_artifacts.py
def download_mlflow_artifacts(
    run_id: str, fname: str, run_artifacts: List[FileInfo]
) -> Optional[Dict[str, Any]]:
    """Download MLflow artifacts for a specific run.

    Parameters
    ----------
    run_id : str
        MLflow run ID.
    fname : str
        Filename to download.
    run_artifacts : list
        List of available artifacts.

    Returns
    -------
    dict
        Loaded artifacts dictionary.
    """
    dir_download = get_artifacts_dir("mlflow")
    dir_download.mkdir(parents=True, exist_ok=True)
    imputer_artifacts = iterate_through_mlflow_run_artifacts(
        run_artifacts, fname, run_id, str(dir_download)
    )

    return imputer_artifacts

retrieve_mlflow_artifacts_from_best_run

retrieve_mlflow_artifacts_from_best_run(
    best_run: Dict[str, Any],
    cfg: DictConfig,
    model_name: str,
) -> Tuple[Dict[str, Any], List[FileInfo]]

Retrieve imputation artifacts from the best MLflow run.

PARAMETER DESCRIPTION
best_run

Best run data containing 'run_id'.

TYPE: dict

cfg

Configuration object (currently unused).

TYPE: DictConfig

model_name

Name of the model for filename generation.

TYPE: str

RETURNS DESCRIPTION
tuple

Tuple of (imputer_artifacts, run_artifacts).

RAISES DESCRIPTION
FileNotFoundError

If no results are found in the best run.

Source code in src/log_helpers/mlflow_artifacts.py
def retrieve_mlflow_artifacts_from_best_run(
    best_run: Dict[str, Any], cfg: DictConfig, model_name: str
) -> Tuple[Dict[str, Any], List[FileInfo]]:
    """Retrieve imputation artifacts from the best MLflow run.

    Parameters
    ----------
    best_run : dict
        Best run data containing 'run_id'.
    cfg : DictConfig
        Configuration object (currently unused).
    model_name : str
        Name of the model for filename generation.

    Returns
    -------
    tuple
        Tuple of (imputer_artifacts, run_artifacts).

    Raises
    ------
    FileNotFoundError
        If no results are found in the best run.
    """
    fnames = {"imputation": get_imputation_pickle_name(model_name)}
    # NOT DONE ATM 'model': f"model_{model_name}.pickle"}

    run_id = best_run["run_id"]
    run_artifacts = mlflow.artifacts.list_artifacts(run_id=run_id)

    if run_artifacts is not None:
        imputer_artifacts = download_mlflow_artifacts(
            run_id, fname=fnames["imputation"], run_artifacts=run_artifacts
        )
        if imputer_artifacts is None:
            logger.error("MLflow | No imputation results found from the best run")
            raise FileNotFoundError("No imputation results found from the best run")
    else:
        # No we assume that you always saved "results", you may later wanna relax
        # this if you have some "mixed runs", or not?
        logger.error("MLflow | No results found from the best run")
        raise FileNotFoundError("No results found from the best run")

    return imputer_artifacts, run_artifacts

get_mlflow_artifact_from_run_name

get_mlflow_artifact_from_run_name(
    run_name: str, filter_for_finished: bool = True
) -> Optional[Dict[str, str]]

Find MLflow artifact info by run name across all experiments.

PARAMETER DESCRIPTION
run_name

Name of the run to find.

TYPE: str

filter_for_finished

If True, only search finished runs.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
dict or None

Dictionary with run_id, experiment_id, and artifact_uri if found.

Source code in src/log_helpers/mlflow_artifacts.py
def get_mlflow_artifact_from_run_name(
    run_name: str, filter_for_finished: bool = True
) -> Optional[Dict[str, str]]:
    """Find MLflow artifact info by run name across all experiments.

    Parameters
    ----------
    run_name : str
        Name of the run to find.
    filter_for_finished : bool, default True
        If True, only search finished runs.

    Returns
    -------
    dict or None
        Dictionary with run_id, experiment_id, and artifact_uri if found.
    """
    all_runs = mlflow.search_runs(search_all_experiments=True)
    if filter_for_finished:
        # Filter for only "FINISHED" jobs
        all_runs: pd.DataFrame = all_runs[all_runs["status"] == "FINISHED"]

    # Check if the run_name exist (as if you have already run training with this name)
    if all_runs.shape[0] > 0:
        runs_remaining: pd.DataFrame = all_runs[
            all_runs["tags.mlflow.runName"] == run_name
        ]

        if runs_remaining.shape[0] > 0:
            mlflow_artifact = {
                "run_id": runs_remaining.iloc[0]["run_id"],
                "experiment_id": runs_remaining.iloc[0]["experiment_id"],
                "artifact_uri": runs_remaining.iloc[0]["artifact_uri"],
            }
            return mlflow_artifact
        else:
            logger.debug("No runs found with the run_name = {}".format(run_name))
            return None
    else:
        logger.debug("No runs found")
        return None

return_best_mlflow_run

return_best_mlflow_run(
    current_experiment: Dict[str, Any],
    metric_string: str,
    split_key: str,
    metric_direction: str,
    run_name: str,
) -> Optional[Dict[str, Any]]

Find the best MLflow run matching the given criteria.

Searches for runs with the specified name, filters out NaN metrics, and returns the best run based on metric direction.

PARAMETER DESCRIPTION
current_experiment

Experiment dictionary with 'experiment_id'.

TYPE: dict

metric_string

Metric name to optimize.

TYPE: str

split_key

Data split for the metric.

TYPE: str

metric_direction

'ASC' for minimization, 'DESC' for maximization.

TYPE: str

run_name

Exact run name to match.

TYPE: str

RETURNS DESCRIPTION
dict or None

Best run as dictionary, or None if no valid runs found.

Source code in src/log_helpers/mlflow_artifacts.py
def return_best_mlflow_run(
    current_experiment: Dict[str, Any],
    metric_string: str,
    split_key: str,
    metric_direction: str,
    run_name: str,
) -> Optional[Dict[str, Any]]:
    """Find the best MLflow run matching the given criteria.

    Searches for runs with the specified name, filters out NaN metrics,
    and returns the best run based on metric direction.

    Parameters
    ----------
    current_experiment : dict
        Experiment dictionary with 'experiment_id'.
    metric_string : str
        Metric name to optimize.
    split_key : str
        Data split for the metric.
    metric_direction : str
        'ASC' for minimization, 'DESC' for maximization.
    run_name : str
        Exact run name to match.

    Returns
    -------
    dict or None
        Best run as dictionary, or None if no valid runs found.
    """

    def drop_nan_rows(df_runs: pd.DataFrame, metric_col: str) -> Optional[pd.DataFrame]:
        if metric_col in df_runs.columns:
            try:
                df_runs = df_runs.dropna(subset=[metric_col])
                return df_runs
            except Exception as e:
                logger.error("MLflow | Failed to drop NaN rows, e = {}".format(e))
                raise e
        else:
            logger.error(
                "MLflow | Could not find the metric column = {} in the dataframe".format(
                    metric_col
                )
            )
            logger.error(
                "Cannot pick the best model without the metric column, so returning an empty dictionary"
            )
            logger.error(
                "Handle better the runs that did not finish, so this metric easily might be missing!"
            )
            logger.error("Re-computing this part now!")
            logger.error(f"columns = {df_runs.columns}")
            return None

    def sort_runs_based_on_metric(
        df_runs: pd.DataFrame, metric_col: str, metric_direction: str
    ) -> Optional[Dict[str, Any]]:
        # Sort just to be sure (glitch while devving, should not be needed)
        if metric_direction == "ASC":
            df_runs = df_runs.sort_values(by=[best_metric_col], ascending=True)
        elif metric_direction == "DESC":
            df_runs = df_runs.sort_values(by=[best_metric_col], ascending=False)
        else:
            logger.error(
                "MLflow | Unknown metric direction = {}".format(metric_direction)
            )
            raise ValueError("Unknown metric direction = {}".format(metric_direction))

        if df_runs.shape[0] == 0:
            logger.warning(
                "MLflow | No runs found with the run_name = {}".format(run_name)
            )
            return None
        else:
            # first row is the best one, and we can convert it to a dictionary
            best_run_dict = df_runs.iloc[0].to_dict()
            logger.info(
                "MLflow | Found previous best run | Best run id = {}, best {} = {:.3f}".format(
                    best_run_dict["run_id"],
                    best_metric_col,
                    best_run_dict[best_metric_col],
                )
            )
            return best_run_dict

    # All runs in the experiment
    best_metric_col = f"metrics.{split_key}/{metric_string}"
    df: pd.DataFrame = mlflow.search_runs(
        [current_experiment["experiment_id"], f"{best_metric_col} {metric_direction}"]
    )
    logger.debug("MLflow | Found {} runs".format(len(df)))

    # Check for exact match of the run name
    df_runs = df[df["tags.mlflow.runName"] == run_name]
    logger.debug(
        "MLflow | Number of runs per this config version = {} (run_name = {})".format(
            df_runs.shape[0], run_name
        )
    )

    # Drop NaN rows (as in the best metric column, if you had unfinished runs)
    metric_col = f"metrics.{split_key}/{metric_string}"
    df_runs = drop_nan_rows(df_runs, metric_col)

    # Sort the runs based on the metric
    if df_runs is not None:
        df_runs = sort_runs_based_on_metric(df_runs, metric_col, metric_direction)

    return df_runs

what_to_search_from_mlflow

what_to_search_from_mlflow(
    run_name: str,
    cfg: DictConfig,
    model_type: Optional[str] = None,
) -> Tuple[
    Optional[Dict[str, Any]],
    Optional[str],
    Optional[str],
    Optional[str],
]

Determine MLflow search parameters from run name and configuration.

PARAMETER DESCRIPTION
run_name

Name of the MLflow run.

TYPE: str

cfg

Configuration containing IMPUTATION_METRICS settings.

TYPE: DictConfig

model_type

Model type (currently unused).

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
tuple

Tuple of (current_experiment, metric_string, split_key, metric_direction), or (None, None, None, None) if run not found.

Source code in src/log_helpers/mlflow_artifacts.py
def what_to_search_from_mlflow(
    run_name: str, cfg: DictConfig, model_type: Optional[str] = None
) -> Tuple[Optional[Dict[str, Any]], Optional[str], Optional[str], Optional[str]]:
    """Determine MLflow search parameters from run name and configuration.

    Parameters
    ----------
    run_name : str
        Name of the MLflow run.
    cfg : DictConfig
        Configuration containing IMPUTATION_METRICS settings.
    model_type : str, optional
        Model type (currently unused).

    Returns
    -------
    tuple
        Tuple of (current_experiment, metric_string, split_key, metric_direction),
        or (None, None, None, None) if run not found.
    """
    mlflow_artifacts = get_mlflow_artifact_from_run_name(run_name=run_name)

    if mlflow_artifacts is not None:
        client = MlflowClient()
        experiment_name = client.get_experiment(mlflow_artifacts["experiment_id"]).name
        current_experiment = dict(mlflow.get_experiment_by_name(experiment_name))
        best_metrics = cfg["IMPUTATION_METRICS"]["best_metric"]
        split = cfg["IMPUTATION_METRICS"]["best_metric"]["split"]
        split_key = f"{split}"

        # best_metric = list(best_metrics.keys())[0]
        metric_string = best_metrics["string"]
        metric_direction = best_metrics["direction"]

        return current_experiment, metric_string, split_key, metric_direction

    else:
        return None, None, None, None

check_if_run_exists

check_if_run_exists(
    experiment_name: str, run_name: str
) -> bool

Check if an MLflow run with the given name exists in the experiment.

PARAMETER DESCRIPTION
experiment_name

Name of the MLflow experiment.

TYPE: str

run_name

Run name to search for.

TYPE: str

RETURNS DESCRIPTION
bool

True if run exists, False otherwise.

Source code in src/log_helpers/mlflow_artifacts.py
def check_if_run_exists(experiment_name: str, run_name: str) -> bool:
    """Check if an MLflow run with the given name exists in the experiment.

    Parameters
    ----------
    experiment_name : str
        Name of the MLflow experiment.
    run_name : str
        Run name to search for.

    Returns
    -------
    bool
        True if run exists, False otherwise.
    """
    runs = mlflow.search_runs(experiment_names=[experiment_name])
    if runs.shape[0] > 0:
        run_names = runs["tags.mlflow.runName"].values
        if run_name in run_names:
            logger.info(f"Run with the name {run_name} already exists")
            return True
        else:
            logger.info(f"Run with the name {run_name} does not exist")
            return False
    else:
        logger.info(f"No runs found for experiment: {experiment_name}")
        return False

Logging

log_utils

define_run_name

define_run_name(cfg) -> str

Define run name from configuration name and version.

PARAMETER DESCRIPTION
cfg

Configuration with 'NAME' and 'VERSION' keys.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Run name in format '{name}_v{version}'.

Source code in src/log_helpers/log_utils.py
def define_run_name(cfg) -> str:
    """Define run name from configuration name and version.

    Parameters
    ----------
    cfg : DictConfig
        Configuration with 'NAME' and 'VERSION' keys.

    Returns
    -------
    str
        Run name in format '{name}_v{version}'.
    """
    return "{}_v{}".format(cfg["NAME"], cfg["VERSION"])

define_suffix_to_run_name

define_suffix_to_run_name(model_name) -> str

Generate suffix for run name based on model name.

PARAMETER DESCRIPTION
model_name

Name of the model.

TYPE: str

RETURNS DESCRIPTION
str

Suffix in format '_{model_name}_ph1'.

Notes

This is a placeholder implementation.

Source code in src/log_helpers/log_utils.py
def define_suffix_to_run_name(model_name) -> str:
    """Generate suffix for run name based on model name.

    Parameters
    ----------
    model_name : str
        Name of the model.

    Returns
    -------
    str
        Suffix in format '_{model_name}_ph1'.

    Notes
    -----
    This is a placeholder implementation.
    """
    # Placeholder atm
    return f"_{model_name}_ph1"

update_run_name

update_run_name(run_name, base_run_name) -> str

Append base run name to existing run name.

PARAMETER DESCRIPTION
run_name

Existing run name.

TYPE: str

base_run_name

Base name to append.

TYPE: str

RETURNS DESCRIPTION
str

Combined run name with underscore separator.

Source code in src/log_helpers/log_utils.py
def update_run_name(run_name, base_run_name) -> str:
    """Append base run name to existing run name.

    Parameters
    ----------
    run_name : str
        Existing run name.
    base_run_name : str
        Base name to append.

    Returns
    -------
    str
        Combined run name with underscore separator.
    """
    return run_name + "_" + base_run_name

setup_loguru

setup_loguru() -> str

Configure loguru logger for console and file output.

Sets up logging to stderr with color and to a file in the artifacts directory. Removes any existing log file before starting.

RETURNS DESCRIPTION
str

Path to the log file.

Source code in src/log_helpers/log_utils.py
def setup_loguru() -> str:
    """Configure loguru logger for console and file output.

    Sets up logging to stderr with color and to a file in the artifacts
    directory. Removes any existing log file before starting.

    Returns
    -------
    str
        Path to the log file.
    """
    min_level = "INFO"

    def my_filter(record):
        return record["level"].no >= logger.level(min_level).no

    logger.remove()
    # https://stackoverflow.com/a/76583603/6412152
    log_dir = get_artifacts_dir(
        service_name="hydra"
    )  # harmonize naming maybe later? as this not Hydra log per se
    log_dir.mkdir(parents=True, exist_ok=True)
    log_file_path = log_dir / "pipeline_PLR.log"
    if log_file_path.exists():
        log_file_path.unlink()
    logger.add(
        sys.stderr, filter=my_filter, colorize=True, backtrace=True, diagnose=True
    )
    logger.add(
        str(log_file_path),
        level=min_level,
        colorize=False,
        backtrace=True,
        diagnose=True,
    )

    return str(log_file_path)

log_loguru_log_to_prefect

log_loguru_log_to_prefect(
    filepath: str, description: str
) -> None

Log contents of loguru log file as Prefect markdown artifact.

PARAMETER DESCRIPTION
filepath

Path to the log file.

TYPE: str

description

Description for the Prefect artifact.

TYPE: str

Source code in src/log_helpers/log_utils.py
def log_loguru_log_to_prefect(filepath: str, description: str) -> None:
    """Log contents of loguru log file as Prefect markdown artifact.

    Parameters
    ----------
    filepath : str
        Path to the log file.
    description : str
        Description for the Prefect artifact.
    """
    # https://docs.prefect.io/3.0/develop/artifacts#create-markdown-artifacts
    # Hacky solution to get the final log without any nice formatting
    try:
        with open(filepath, "r") as f:
            log_content = f.read()
        try:
            create_markdown_artifact(
                key="loguru-log",
                markdown=log_content,
                description=description,
            )
        except exception as e:
            logger.error(f"Failed to log the loguru-log as markdown to Prefect: {e}")
            return
    except exception as e:
        logger.error(f"Failed to read the log file: {e}")
        return

get_datetime_as_string

get_datetime_as_string(use_gmt_time=False) -> str

Get current datetime as formatted string.

PARAMETER DESCRIPTION
use_gmt_time

If True, use UTC time; otherwise use local time.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

Datetime string in format 'YYYYMMDD-HHMMSS'.

Source code in src/log_helpers/log_utils.py
def get_datetime_as_string(use_gmt_time=False) -> str:
    """Get current datetime as formatted string.

    Parameters
    ----------
    use_gmt_time : bool, default False
        If True, use UTC time; otherwise use local time.

    Returns
    -------
    str
        Datetime string in format 'YYYYMMDD-HHMMSS'.
    """
    if use_gmt_time:
        dt_now = datetime.datetime.now(datetime.timezone.utc)
    else:
        dt_now = datetime.datetime.now()
    date_string = dt_now.strftime("%Y%m%d-%H%M%S")
    return date_string

Hydra Utilities

hydra_utils

update_hydra_ouput_dir

update_hydra_ouput_dir(use_gmt_time: bool = False)

Generate Hydra CLI argument for custom output directory.

Creates a timestamped output directory path for Hydra runs.

PARAMETER DESCRIPTION
use_gmt_time

If True, use GMT time for timestamp (currently unused).

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

Hydra CLI argument string in format 'hydra.run.dir={path}'.

Source code in src/log_helpers/hydra_utils.py
def update_hydra_ouput_dir(use_gmt_time: bool = False):
    """Generate Hydra CLI argument for custom output directory.

    Creates a timestamped output directory path for Hydra runs.

    Parameters
    ----------
    use_gmt_time : bool, default False
        If True, use GMT time for timestamp (currently unused).

    Returns
    -------
    str
        Hydra CLI argument string in format 'hydra.run.dir={path}'.
    """
    # Fake the CLI argument (update if there is more elegant method
    # TODO! This works obviously for local repo, but it does not scale to
    #  defining the artifacts directory in the config file
    # https://stackoverflow.com/a/67720433/6412152
    # Extra background
    # https://hydra.cc/docs/tutorials/basic/running_your_app/working_directory/
    # https://github.com/facebookresearch/hydra/discussions/2819#discussioncomment-7899912
    # https://stackoverflow.com/a/70777327/6412152
    artifacts_dir = get_artifacts_dir(service_name="hydra")
    date_string = get_datetime_as_string()
    artifacts_dir_string = f"hydra.run.dir={artifacts_dir / date_string}"
    return artifacts_dir_string

get_hydra_output_dir

get_hydra_output_dir()

Get Hydra output directory from runtime config or fallback.

RETURNS DESCRIPTION
str

Path to Hydra output directory.

Notes

Falls back to default artifacts directory if Hydra runtime config is not available (e.g., when using Hydra Compose API).

Source code in src/log_helpers/hydra_utils.py
def get_hydra_output_dir():
    """Get Hydra output directory from runtime config or fallback.

    Returns
    -------
    str
        Path to Hydra output directory.

    Notes
    -----
    Falls back to default artifacts directory if Hydra runtime config
    is not available (e.g., when using Hydra Compose API).
    """
    try:
        return hydra.core.hydra_config.HydraConfig.get().runtime.output_dir
    except Exception:
        hydra_dir = get_artifacts_dir(service_name="hydra")
        logger.debug(
            f"Failed to get the hydra output directory (you used Compose?), using now: {hydra_dir}"
        )
        return hydra_dir

get_intermediate_hydra_log_path

get_intermediate_hydra_log_path()

Get path to intermediate Hydra log file.

RETURNS DESCRIPTION
str or None

Path to the log file, or None if not found.

RAISES DESCRIPTION
NotImplementedError

If multiple log files are found in the output directory.

Source code in src/log_helpers/hydra_utils.py
def get_intermediate_hydra_log_path():
    """Get path to intermediate Hydra log file.

    Returns
    -------
    str or None
        Path to the log file, or None if not found.

    Raises
    ------
    NotImplementedError
        If multiple log files are found in the output directory.
    """
    output_dir = get_hydra_output_dir()
    log_files = glob.glob(f"{output_dir}/*.log")
    if len(log_files) == 0:
        logger.warning("No Hydra log files found in the output directory")
        return None
    elif len(log_files) > 1:
        # TODO! Pick the latest log file
        logger.error(
            "Multiple log files found in the output directory? {}".format(log_files)
        )
        raise NotImplementedError(
            "Multiple log files found in the output directory? {}".format(log_files)
        )
    else:
        return log_files[0]

save_hydra_cfg_as_yaml

save_hydra_cfg_as_yaml(cfg, dir_output)

Save Hydra configuration as YAML file.

PARAMETER DESCRIPTION
cfg

Hydra configuration to save.

TYPE: DictConfig

dir_output

Output directory path.

TYPE: str

RETURNS DESCRIPTION
str

Path to saved YAML file.

Source code in src/log_helpers/hydra_utils.py
def save_hydra_cfg_as_yaml(cfg, dir_output):
    """Save Hydra configuration as YAML file.

    Parameters
    ----------
    cfg : DictConfig
        Hydra configuration to save.
    dir_output : str
        Output directory path.

    Returns
    -------
    str
        Path to saved YAML file.
    """
    # yaml_data: str = OmegaConf.to_yaml(cfg)
    cfg_path = Path(dir_output) / "hydra_cfg.yaml"
    with open(cfg_path, "w") as f:
        OmegaConf.save(cfg, f)
    logger.info(f"Hydra config saved as {cfg_path}")
    return str(cfg_path)

get_cfg_HydraCompose

get_cfg_HydraCompose(args, config_dir: str = 'configs')

Load Hydra configuration using Compose API.

Uses the Hydra Compose API instead of the decorator-based approach for more flexible configuration loading.

PARAMETER DESCRIPTION
args

Arguments with 'config_file' attribute.

TYPE: Namespace

config_dir

Directory containing configuration files.

TYPE: str DEFAULT: "configs"

RETURNS DESCRIPTION
DictConfig

Loaded Hydra configuration.

RAISES DESCRIPTION
FileNotFoundError

If the configuration file does not exist.

Source code in src/log_helpers/hydra_utils.py
def get_cfg_HydraCompose(args, config_dir: str = "configs"):
    """Load Hydra configuration using Compose API.

    Uses the Hydra Compose API instead of the decorator-based approach
    for more flexible configuration loading.

    Parameters
    ----------
    args : argparse.Namespace
        Arguments with 'config_file' attribute.
    config_dir : str, default "configs"
        Directory containing configuration files.

    Returns
    -------
    DictConfig
        Loaded Hydra configuration.

    Raises
    ------
    FileNotFoundError
        If the configuration file does not exist.
    """
    # https://stackoverflow.com/a/61169706/6412152
    # The not recommended route by Hydra, but in the end not using many of the Hydra's "automatic" features
    # TO-OPTIMIZE! Re-assess this decision maybe later?
    # https://hydra.cc/docs/advanced/compose_api/
    repo_root = get_repo_root()
    abs_config_path = repo_root / config_dir
    yaml_path = abs_config_path / f"{args.config_file}.yaml"
    if not yaml_path.exists():
        logger.error(f"Config file not found: {abs_config_path}")
        raise FileNotFoundError(f"Config file not found: {abs_config_path}")
    else:
        logger.info(f"Using Hydra config file: {abs_config_path}")

    rel_config_path = Path("..") / ".." / config_dir  # from "hydra_utils.py" directory
    with initialize(version_base=None, config_path=str(rel_config_path)):
        cfg = compose(config_name=args.config_file)

    return cfg

add_hydra_cli_args

add_hydra_cli_args(args)

Add Hydra CLI arguments to sys.argv.

Appends config path, config name, and custom output directory arguments to sys.argv for Hydra decorator-based initialization.

PARAMETER DESCRIPTION
args

Arguments with 'config_path' and 'config_name' attributes.

TYPE: Namespace

Source code in src/log_helpers/hydra_utils.py
def add_hydra_cli_args(args):
    """Add Hydra CLI arguments to sys.argv.

    Appends config path, config name, and custom output directory
    arguments to sys.argv for Hydra decorator-based initialization.

    Parameters
    ----------
    args : argparse.Namespace
        Arguments with 'config_path' and 'config_name' attributes.
    """
    # e.g. ['pipeline_PLR.py', '--config-path', '../configs', '--config-name', 'defaults.yaml']
    logger.info('Hydra config path: "{}"'.format(args.config_path))
    logger.info('Hydra config name: "{}"'.format(args.config_name))
    sys.argv.append(
        update_hydra_ouput_dir()
    )  # Hack to change the Hydra output directory
    sys.argv.append(
        "--config-path"
    )  # https://github.com/facebookresearch/hydra/issues/386
    sys.argv.append(f"{args.config_path}")
    sys.argv.append(
        "--config-name"
    )  # https://github.com/facebookresearch/hydra/issues/874
    sys.argv.append(f"{args.config_name}")
    logger.debug(sys.argv)

log_the_hydra_log_as_mlflow_artifact

log_the_hydra_log_as_mlflow_artifact(
    hydra_log,
    suffix: str = "_train",
    intermediate: bool = False,
)

Log Hydra log file as MLflow artifact with optional suffix.

Creates a copy of the log file with a suffix and logs it to MLflow. The copy is removed after logging.

PARAMETER DESCRIPTION
hydra_log

Path to Hydra log file.

TYPE: str or None

suffix

Suffix to append to log filename.

TYPE: str DEFAULT: "_train"

intermediate

If True, log to 'hydra_logs/intermediate' path.

TYPE: bool DEFAULT: False

Source code in src/log_helpers/hydra_utils.py
def log_the_hydra_log_as_mlflow_artifact(
    hydra_log, suffix: str = "_train", intermediate: bool = False
):
    """Log Hydra log file as MLflow artifact with optional suffix.

    Creates a copy of the log file with a suffix and logs it to MLflow.
    The copy is removed after logging.

    Parameters
    ----------
    hydra_log : str or None
        Path to Hydra log file.
    suffix : str, default "_train"
        Suffix to append to log filename.
    intermediate : bool, default False
        If True, log to 'hydra_logs/intermediate' path.
    """
    if hydra_log is not None:
        hydra_log_path = Path(hydra_log)
        fname_out = hydra_log_path.stem + f"{suffix}" + hydra_log_path.suffix
        fname_out_path = hydra_log_path.parent / fname_out
        try:
            shutil.copy(hydra_log, fname_out_path)
        except Exception as e:
            logger.error(
                "Fail to make a local copy of the hydra log (cannot log as an artifact): {}".format(
                    e
                )
            )
            return None

        try:
            if intermediate:
                mlflow.log_artifact(
                    str(fname_out_path), artifact_path="hydra_logs/intermediate"
                )
            else:
                mlflow.log_artifact(str(fname_out_path), artifact_path="hydra_logs")
        except Exception as e:
            logger.error(f"Failed to log hydra log artifact: {e}")

        # remove the temp file after it has been registered as an artifact
        try:
            fname_out_path.unlink()
        except Exception as e:
            logger.error(f"Failed to remove the local copy of the hydra log: {e}")

    else:
        logger.warning(
            "No hydra log found to log as an artifact (normal if you use Hydra Compose)"
        )

log_hydra_artifacts_to_mlflow

log_hydra_artifacts_to_mlflow(
    artifacts_dir, model_name, cfg, run_name
)

Log Hydra artifacts to MLflow for imputation runs.

PARAMETER DESCRIPTION
artifacts_dir

Artifacts directory path (currently unused).

TYPE: str

model_name

Model name (currently unused).

TYPE: str

cfg

Configuration object (currently unused).

TYPE: DictConfig

run_name

Run name (currently unused).

TYPE: str

Source code in src/log_helpers/hydra_utils.py
def log_hydra_artifacts_to_mlflow(artifacts_dir, model_name, cfg, run_name):
    """Log Hydra artifacts to MLflow for imputation runs.

    Parameters
    ----------
    artifacts_dir : str
        Artifacts directory path (currently unused).
    model_name : str
        Model name (currently unused).
    cfg : DictConfig
        Configuration object (currently unused).
    run_name : str
        Run name (currently unused).
    """
    # Hydra log with the suffix
    logger.debug("Logging the Hydra log to MLflow")
    hydra_log = get_intermediate_hydra_log_path()
    log_the_hydra_log_as_mlflow_artifact(
        hydra_log, suffix="_imputation", intermediate=True
    )

Local Artifacts

local_artifacts

if_dicts_match

if_dicts_match(_dict1, _dict2) -> bool

Check if two dictionaries match (placeholder implementation).

PARAMETER DESCRIPTION
_dict1

First dictionary (unused in placeholder).

TYPE: dict

_dict2

Second dictionary (unused in placeholder).

TYPE: dict

RETURNS DESCRIPTION
bool

Always returns True (placeholder - TODO: implement actual comparison).

Source code in src/log_helpers/local_artifacts.py
def if_dicts_match(_dict1, _dict2) -> bool:
    """Check if two dictionaries match (placeholder implementation).

    Parameters
    ----------
    _dict1 : dict
        First dictionary (unused in placeholder).
    _dict2 : dict
        Second dictionary (unused in placeholder).

    Returns
    -------
    bool
        Always returns True (placeholder - TODO: implement actual comparison).
    """
    return True

pickle_save

pickle_save(results, results_path, debug_load=True) -> None

Save results to pickle file with optional verification.

PARAMETER DESCRIPTION
results

Data to save.

TYPE: object

results_path

Path to save the pickle file.

TYPE: str

debug_load

If True, reload and verify the saved file.

TYPE: bool DEFAULT: True

Source code in src/log_helpers/local_artifacts.py
def pickle_save(results, results_path, debug_load=True) -> None:
    """Save results to pickle file with optional verification.

    Parameters
    ----------
    results : object
        Data to save.
    results_path : str
        Path to save the pickle file.
    debug_load : bool, default True
        If True, reload and verify the saved file.
    """
    with open(results_path, "wb") as handle:
        pickle.dump(results, handle, protocol=pickle.HIGHEST_PROTOCOL)
    if debug_load:
        results_loaded = load_results_dict(results_path)
        if_dicts_match(results, results_loaded)

save_results_dict

save_results_dict(
    results_dict: dict,
    results_path: str,
    name: str = None,
    debug_load: bool = True,
) -> None

Save results dictionary to pickle file.

Removes existing file if present before saving.

PARAMETER DESCRIPTION
results_dict

Dictionary to save.

TYPE: dict

results_path

Path for the pickle file (must have .pickle extension).

TYPE: str

name

Name for logging purposes.

TYPE: str DEFAULT: None

debug_load

If True, verify saved file by reloading.

TYPE: bool DEFAULT: True

RAISES DESCRIPTION
NotImplementedError

If results_path does not have .pickle extension.

Source code in src/log_helpers/local_artifacts.py
def save_results_dict(
    results_dict: dict,
    results_path: str,
    name: str = None,
    debug_load: bool = True,
) -> None:
    """Save results dictionary to pickle file.

    Removes existing file if present before saving.

    Parameters
    ----------
    results_dict : dict
        Dictionary to save.
    results_path : str
        Path for the pickle file (must have .pickle extension).
    name : str, optional
        Name for logging purposes.
    debug_load : bool, default True
        If True, verify saved file by reloading.

    Raises
    ------
    NotImplementedError
        If results_path does not have .pickle extension.
    """
    if os.path.exists(results_path):
        logger.info(
            "Removing the existing results dictionary at {}".format(results_path)
        )
        os.remove(results_path)

    logger.info("Saving the {} dictionary to {}".format(name, results_path))
    if ".pickle" in results_path:
        pickle_save(results_dict, results_path, debug_load=debug_load)
    else:
        raise NotImplementedError(
            "Only pickle format is supported at the moment, not {}".format(format)
        )

pickle_load

pickle_load(results_path) -> object

Load data from pickle file.

PARAMETER DESCRIPTION
results_path

Path to pickle file.

TYPE: str

RETURNS DESCRIPTION
object

Loaded data.

RAISES DESCRIPTION
Exception

If loading fails, often due to NumPy version mismatch.

Source code in src/log_helpers/local_artifacts.py
def pickle_load(results_path) -> object:
    """Load data from pickle file.

    Parameters
    ----------
    results_path : str
        Path to pickle file.

    Returns
    -------
    object
        Loaded data.

    Raises
    ------
    Exception
        If loading fails, often due to NumPy version mismatch.
    """
    with open(results_path, "rb") as handle:
        try:
            return pickle.load(handle)
        except Exception as e:
            logger.error(
                "Could not load the results dictionary from pickle: {}".format(e)
            )
            import numpy

            logger.error("Numpy version: {}".format(numpy.__version__))
            logger.error(
                "If you get 'No module named 'numpy._core'' it might be an issue with Numpy versions?"
            )
            logger.error(
                "You saved with another Numpy version that you are trying to read them?"
            )
            logger.error(
                "TODO! Try to switch to something more platform-independent way of saving data"
            )
            logger.error("JSON? for the nested dictionaries?")
            raise e

load_results_dict

load_results_dict(results_path) -> dict

Load results dictionary from file.

PARAMETER DESCRIPTION
results_path

Path to results file (must be .pickle).

TYPE: str

RETURNS DESCRIPTION
dict

Loaded results dictionary.

RAISES DESCRIPTION
NotImplementedError

If file is not a pickle file.

Source code in src/log_helpers/local_artifacts.py
def load_results_dict(results_path) -> dict:
    """Load results dictionary from file.

    Parameters
    ----------
    results_path : str
        Path to results file (must be .pickle).

    Returns
    -------
    dict
        Loaded results dictionary.

    Raises
    ------
    NotImplementedError
        If file is not a pickle file.
    """
    if ".pickle" in results_path:
        return pickle_load(results_path)
    else:
        raise NotImplementedError(
            "Only pickle format is supported at the moment, not {}".format(format)
        )

save_object_to_pickle

save_object_to_pickle(obj, path) -> None

Save any object to pickle file.

PARAMETER DESCRIPTION
obj

Object to save.

TYPE: object

path

Output file path.

TYPE: str

Source code in src/log_helpers/local_artifacts.py
def save_object_to_pickle(obj, path) -> None:
    """Save any object to pickle file.

    Parameters
    ----------
    obj : object
        Object to save.
    path : str
        Output file path.
    """
    with open(path, "wb") as handle:
        pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)

save_array_as_csv

save_array_as_csv(array: ndarray, path: str) -> None

Save NumPy array as CSV file.

PARAMETER DESCRIPTION
array

Array to save.

TYPE: ndarray

path

Output CSV file path.

TYPE: str

Source code in src/log_helpers/local_artifacts.py
def save_array_as_csv(array: np.ndarray, path: str) -> None:
    """Save NumPy array as CSV file.

    Parameters
    ----------
    array : np.ndarray
        Array to save.
    path : str
        Output CSV file path.
    """
    df = pd.DataFrame(array)
    df.to_csv(path, index=False)

Naming and URIs

log_naming_uris_and_dirs

get_feature_pickle_artifact_uri

get_feature_pickle_artifact_uri(
    run: Dict[str, Any],
    source: str,
    cfg: DictConfig,
    subdir: str = "features",
) -> str

Construct MLflow artifact URI for feature pickle files.

PARAMETER DESCRIPTION
run

MLflow run dictionary containing 'run_id'.

TYPE: dict

source

Data source name used for filename generation.

TYPE: str

cfg

Configuration object (currently unused but kept for API consistency).

TYPE: DictConfig

subdir

Subdirectory within the MLflow artifact store.

TYPE: str DEFAULT: "features"

RETURNS DESCRIPTION
str

MLflow artifact URI in format 'runs:/{run_id}/{subdir}/{filename}'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_feature_pickle_artifact_uri(
    run: Dict[str, Any], source: str, cfg: DictConfig, subdir: str = "features"
) -> str:
    """Construct MLflow artifact URI for feature pickle files.

    Parameters
    ----------
    run : dict
        MLflow run dictionary containing 'run_id'.
    source : str
        Data source name used for filename generation.
    cfg : DictConfig
        Configuration object (currently unused but kept for API consistency).
    subdir : str, default "features"
        Subdirectory within the MLflow artifact store.

    Returns
    -------
    str
        MLflow artifact URI in format 'runs:/{run_id}/{subdir}/{filename}'.
    """
    return f"runs:/{run['run_id']}/{subdir}/{get_feature_pickle_base(source)}"

get_feature_pickle_base

get_feature_pickle_base(run_name: str) -> str

Generate base filename for feature pickle files.

PARAMETER DESCRIPTION
run_name

Name of the run to use as the base filename.

TYPE: str

RETURNS DESCRIPTION
str

Filename with .pickle extension.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_feature_pickle_base(run_name: str) -> str:
    """Generate base filename for feature pickle files.

    Parameters
    ----------
    run_name : str
        Name of the run to use as the base filename.

    Returns
    -------
    str
        Filename with .pickle extension.
    """
    return f"{run_name}.pickle"

get_features_pickle_fname

get_features_pickle_fname(data_source: str) -> str

Generate pickle filename for feature data.

PARAMETER DESCRIPTION
data_source

Name of the data source.

TYPE: str

RETURNS DESCRIPTION
str

Filename with .pickle extension.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_features_pickle_fname(data_source: str) -> str:
    """Generate pickle filename for feature data.

    Parameters
    ----------
    data_source : str
        Name of the data source.

    Returns
    -------
    str
        Filename with .pickle extension.
    """
    return get_feature_pickle_base(data_source)

get_baseline_names

get_baseline_names() -> List[str]

Get list of baseline method names for PLR preprocessing.

RETURNS DESCRIPTION
list of str

Baseline method names: denoised ground truth and outlier-removed raw.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_baseline_names() -> List[str]:
    """Get list of baseline method names for PLR preprocessing.

    Returns
    -------
    list of str
        Baseline method names: denoised ground truth and outlier-removed raw.
    """
    return ["BASELINE_DenoisedGT", "BASELINE_OutlierRemovedRaw"]

get_feature_name_from_cfg

get_feature_name_from_cfg(cfg: DictConfig) -> str

Extract feature name and version from configuration.

PARAMETER DESCRIPTION
cfg

Configuration containing PLR_FEATURIZATION.FEATURES_METADATA with 'name' and 'version' keys.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Combined feature name and version string.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_feature_name_from_cfg(cfg: DictConfig) -> str:
    """Extract feature name and version from configuration.

    Parameters
    ----------
    cfg : DictConfig
        Configuration containing PLR_FEATURIZATION.FEATURES_METADATA with
        'name' and 'version' keys.

    Returns
    -------
    str
        Combined feature name and version string.
    """
    return (
        f"{cfg['PLR_FEATURIZATION']['FEATURES_METADATA']['name']}"
        f"{cfg['PLR_FEATURIZATION']['FEATURES_METADATA']['version']}"
    )

define_featurization_run_name_from_base

define_featurization_run_name_from_base(
    base_name: str, cfg: DictConfig
) -> str

Construct featurization run name from base name and configuration.

PARAMETER DESCRIPTION
base_name

Base name to append to the run name.

TYPE: str

cfg

Configuration containing feature metadata.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Run name in format 'features-{feature_name}{version}_{base_name}'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def define_featurization_run_name_from_base(base_name: str, cfg: DictConfig) -> str:
    """Construct featurization run name from base name and configuration.

    Parameters
    ----------
    base_name : str
        Base name to append to the run name.
    cfg : DictConfig
        Configuration containing feature metadata.

    Returns
    -------
    str
        Run name in format 'features-{feature_name}{version}_{base_name}'.
    """
    return f"features-{get_feature_name_from_cfg(cfg)}_{base_name}"

xgboost_variant_run_name

xgboost_variant_run_name(
    run_name: str,
    xgboost_cfg: DictConfig,
    model_name: str = "XGBOOST",
) -> str

Modify run name to include XGBoost variant suffix.

PARAMETER DESCRIPTION
run_name

Original run name containing the model name.

TYPE: str

xgboost_cfg

XGBoost configuration containing 'variant_name'.

TYPE: DictConfig

model_name

Model name string to find and replace in run_name.

TYPE: str DEFAULT: "XGBOOST"

RETURNS DESCRIPTION
str

Modified run name with variant suffix, or original if no variant.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def xgboost_variant_run_name(
    run_name: str, xgboost_cfg: DictConfig, model_name: str = "XGBOOST"
) -> str:
    """Modify run name to include XGBoost variant suffix.

    Parameters
    ----------
    run_name : str
        Original run name containing the model name.
    xgboost_cfg : DictConfig
        XGBoost configuration containing 'variant_name'.
    model_name : str, default "XGBOOST"
        Model name string to find and replace in run_name.

    Returns
    -------
    str
        Modified run name with variant suffix, or original if no variant.
    """
    variant_name = xgboost_cfg["variant_name"]
    if len(variant_name) > 0:
        return run_name.replace(model_name, f"{model_name}_{variant_name}")
    else:
        return run_name

get_pypots_model_path

get_pypots_model_path(
    results_path: str, ext_out: str = ".pypots"
) -> str

Convert results path to PyPOTS model path.

PARAMETER DESCRIPTION
results_path

Path to results file.

TYPE: str

ext_out

Extension for the output model file.

TYPE: str DEFAULT: ".pypots"

RETURNS DESCRIPTION
str

Path to PyPOTS model file with 'results' replaced by 'model'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_pypots_model_path(results_path: str, ext_out: str = ".pypots") -> str:
    """Convert results path to PyPOTS model path.

    Parameters
    ----------
    results_path : str
        Path to results file.
    ext_out : str, default ".pypots"
        Extension for the output model file.

    Returns
    -------
    str
        Path to PyPOTS model file with 'results' replaced by 'model'.
    """
    results_path = Path(results_path)
    fname = results_path.stem.replace("results", "model")
    return str(results_path.parent / (fname + ext_out))

get_mlflow_metric_name

get_mlflow_metric_name(split: str, metric_key: str) -> str

Construct MLflow metric name from split and metric key.

PARAMETER DESCRIPTION
split

Data split name (e.g., 'train', 'test', 'val').

TYPE: str

metric_key

Metric identifier (e.g., 'auroc', 'mae').

TYPE: str

RETURNS DESCRIPTION
str

MLflow metric name in format '{split}/{metric_key}'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_mlflow_metric_name(split: str, metric_key: str) -> str:
    """Construct MLflow metric name from split and metric key.

    Parameters
    ----------
    split : str
        Data split name (e.g., 'train', 'test', 'val').
    metric_key : str
        Metric identifier (e.g., 'auroc', 'mae').

    Returns
    -------
    str
        MLflow metric name in format '{split}/{metric_key}'.
    """
    return f"{split}/{metric_key}"

get_outlier_pickle_name

get_outlier_pickle_name(model_name: str) -> str

Generate pickle filename for outlier detection results.

PARAMETER DESCRIPTION
model_name

Name of the outlier detection model.

TYPE: str

RETURNS DESCRIPTION
str

Filename in format 'outlierDetection_{model_name}.pickle'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_outlier_pickle_name(model_name: str) -> str:
    """Generate pickle filename for outlier detection results.

    Parameters
    ----------
    model_name : str
        Name of the outlier detection model.

    Returns
    -------
    str
        Filename in format 'outlierDetection_{model_name}.pickle'.
    """
    return f"outlierDetection_{model_name}.pickle"

get_outlier_csv_name

get_outlier_csv_name(
    model_name: str, split: str, key: str
) -> str

Generate CSV filename for outlier detection data export.

PARAMETER DESCRIPTION
model_name

Name of the outlier detection model.

TYPE: str

split

Data split name (e.g., 'train', 'test').

TYPE: str

key

Data key identifier.

TYPE: str

RETURNS DESCRIPTION
str

Filename in format 'outlierDetection_{model_name}{split}.csv'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_outlier_csv_name(model_name: str, split: str, key: str) -> str:
    """Generate CSV filename for outlier detection data export.

    Parameters
    ----------
    model_name : str
        Name of the outlier detection model.
    split : str
        Data split name (e.g., 'train', 'test').
    key : str
        Data key identifier.

    Returns
    -------
    str
        Filename in format 'outlierDetection_{model_name}_{split}_{key}.csv'.
    """
    base_fname = get_outlier_pickle_name(model_name).replace(".pickle", "")
    return f"{base_fname}_{split}_{key}.csv"

get_duckdb_file

get_duckdb_file(
    data_cfg: DictConfig,
    use_demo_data: bool = False,
    demo_db_file: str = "PLR_demo_data.db",
    use_synthetic_data: bool = False,
) -> str

Get path to DuckDB database file.

PARAMETER DESCRIPTION
data_cfg

Data configuration containing 'data_path' and 'filename_DuckDB'.

TYPE: DictConfig

use_demo_data

If True, use demo database for testing.

TYPE: bool DEFAULT: False

demo_db_file

Filename of demo database.

TYPE: str DEFAULT: 'PLR_demo_data.db'

use_synthetic_data

If True, use synthetic database (SYNTH_PLR_DEMO.db) for CI/testing. This takes precedence over use_demo_data.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

Absolute path to the DuckDB file.

RAISES DESCRIPTION
FileNotFoundError

If the database file does not exist.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_duckdb_file(
    data_cfg: DictConfig,
    use_demo_data: bool = False,
    demo_db_file: str = "PLR_demo_data.db",
    use_synthetic_data: bool = False,
) -> str:
    """Get path to DuckDB database file.

    Parameters
    ----------
    data_cfg : DictConfig
        Data configuration containing 'data_path' and 'filename_DuckDB'.
    use_demo_data : bool, default False
        If True, use demo database for testing.
    demo_db_file : str, default 'PLR_demo_data.db'
        Filename of demo database.
    use_synthetic_data : bool, default False
        If True, use synthetic database (SYNTH_PLR_DEMO.db) for CI/testing.
        This takes precedence over use_demo_data.

    Returns
    -------
    str
        Absolute path to the DuckDB file.

    Raises
    ------
    FileNotFoundError
        If the database file does not exist.
    """
    # Check for synthetic data (highest priority - for CI/testing)
    if use_synthetic_data:
        from src.utils.paths import get_synthetic_db_path

        db_path = get_synthetic_db_path()
        logger.info(f"Using SYNTHETIC data for testing: {db_path}")
        if not db_path.is_file():
            logger.error(f"Synthetic database not found: {db_path}")
            logger.error("Run: python -m src.synthetic.demo_dataset to generate it")
            raise FileNotFoundError(str(db_path))
        return str(db_path)

    # Check for demo data
    if use_demo_data:
        data_dir = get_data_dir(data_path=data_cfg["data_path"])
        logger.warning(f"Using the demo data ({demo_db_file}) for testing the pipeline")
        db_path = data_dir / demo_db_file
        if not db_path.is_file():
            logger.error(f"File {db_path} does not exist")
            raise FileNotFoundError(str(db_path))
        return str(db_path)

    # Default: use configured database
    # Check if it's a synthetic path (data/synthetic/...)
    if "synthetic" in data_cfg.get("data_path", ""):
        from src.utils.paths import PROJECT_ROOT

        db_path = PROJECT_ROOT / data_cfg["data_path"] / data_cfg["filename_DuckDB"]
        logger.info(f"Using synthetic database: {db_path}")
    else:
        data_dir = get_data_dir(data_path=data_cfg["data_path"])
        db_path = data_dir / data_cfg["filename_DuckDB"]

    if not db_path.is_file():
        logger.error(f"File {db_path} does not exist")
        raise FileNotFoundError(str(db_path))

    return str(db_path)

update_outlier_detection_run_name

update_outlier_detection_run_name(cfg: DictConfig) -> str

Generate descriptive run name for outlier detection based on configuration.

Creates a run name that encodes the model type, detection method, variant, and training data source. For MOMENT models, includes finetune/zeroshot mode, model size (large/base/small), and training data type.

PARAMETER DESCRIPTION
cfg

Configuration containing OUTLIER_MODELS with model-specific settings.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Descriptive run name encoding model configuration.

RAISES DESCRIPTION
ValueError

If more than one model is specified in OUTLIER_MODELS.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def update_outlier_detection_run_name(cfg: DictConfig) -> str:
    """Generate descriptive run name for outlier detection based on configuration.

    Creates a run name that encodes the model type, detection method, variant,
    and training data source. For MOMENT models, includes finetune/zeroshot mode,
    model size (large/base/small), and training data type.

    Parameters
    ----------
    cfg : DictConfig
        Configuration containing OUTLIER_MODELS with model-specific settings.

    Returns
    -------
    str
        Descriptive run name encoding model configuration.

    Raises
    ------
    ValueError
        If more than one model is specified in OUTLIER_MODELS.
    """
    if len(cfg["OUTLIER_MODELS"].keys()) == 1:
        model_name = list(cfg["OUTLIER_MODELS"].keys())[0]
    else:
        logger.error("Only one model should be used for outlier detection")
        raise ValueError("Only one model should be used for outlier detection")
    if model_name == "MOMENT":
        # finetune or zeroshot
        detection_type = cfg["OUTLIER_MODELS"][model_name]["MODEL"][
            "detection_type"
        ].replace("-", "")
        # large, base, or small
        model_variant = cfg["OUTLIER_MODELS"][model_name]["MODEL"][
            "pretrained_model_name_or_path"
        ]
        model_variant = model_variant.split("/")[-1]
        # train on denoised gt, or noisier pupil_raw_imputed
        train_on = cfg["OUTLIER_MODELS"][model_name]["MODEL"]["train_on"]
        if train_on != "gt":
            if train_on == "pupil_raw_imputed":
                # shorter name
                suffix = "_raw"
            else:
                suffix = "_" + train_on
        else:
            suffix = ""

        run_name = f"{model_name}_{detection_type}_{model_variant}{suffix}"
    elif model_name == "NuwaTS":
        detection_type = cfg["OUTLIER_MODELS"][model_name]["MODEL"][
            "detection_type"
        ].replace("-", "")
        run_name = f"{model_name}_{detection_type}"
    else:
        logger.warning("No fancy run name for the model = {}".format(model_name))
        run_name = model_name
        logger.warning("Using the model name as the run name: {}".format(run_name))

    return run_name

update_imputation_run_name

update_imputation_run_name(cfg: DictConfig) -> str

Generate descriptive run name for imputation based on configuration.

Creates a run name that encodes the model type, detection method, variant, and training data source. For MOMENT models, includes finetune/zeroshot mode, model size (large/base/small), and training data type.

PARAMETER DESCRIPTION
cfg

Configuration containing MODELS with model-specific settings.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Descriptive run name encoding model configuration.

RAISES DESCRIPTION
ValueError

If more than one model is specified in MODELS.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def update_imputation_run_name(cfg: DictConfig) -> str:
    """Generate descriptive run name for imputation based on configuration.

    Creates a run name that encodes the model type, detection method, variant,
    and training data source. For MOMENT models, includes finetune/zeroshot mode,
    model size (large/base/small), and training data type.

    Parameters
    ----------
    cfg : DictConfig
        Configuration containing MODELS with model-specific settings.

    Returns
    -------
    str
        Descriptive run name encoding model configuration.

    Raises
    ------
    ValueError
        If more than one model is specified in MODELS.
    """
    if len(cfg["MODELS"].keys()) == 1:
        model_name = list(cfg["MODELS"].keys())[0]
    else:
        logger.error("Only one model should be used for outlier detection")
        raise ValueError("Only one model should be used for outlier detection")
    if model_name == "MOMENT":
        # finetune or zeroshot
        detection_type = cfg["MODELS"][model_name]["MODEL"]["detection_type"].replace(
            "-", ""
        )
        # large, base, or small
        model_variant = cfg["MODELS"][model_name]["MODEL"][
            "pretrained_model_name_or_path"
        ]
        model_variant = model_variant.split("/")[-1]
        # train on denoised gt, or noisier pupil_raw_imputed
        train_on = cfg["MODELS"][model_name]["MODEL"]["train_on"]
        if train_on != "gt":
            if train_on == "pupil_raw_imputed":
                # shorter name
                suffix = "_raw"
            else:
                suffix = "_" + train_on
        else:
            suffix = ""

        run_name = f"{model_name}_{detection_type}_{model_variant}{suffix}"
    else:
        logger.warning("No fancy run name for the model = {}".format(model_name))
        run_name = model_name
        logger.warning("Using the model name as the run name: {}".format(run_name))

    return run_name

get_torch_model_name

get_torch_model_name(run_name: str) -> str

Generate PyTorch model filename from run name.

PARAMETER DESCRIPTION
run_name

Name of the training run.

TYPE: str

RETURNS DESCRIPTION
str

Model filename with .pth extension (e.g., 'MOMENT_finetune_large_model.pth').

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_torch_model_name(run_name: str) -> str:
    """Generate PyTorch model filename from run name.

    Parameters
    ----------
    run_name : str
        Name of the training run.

    Returns
    -------
    str
        Model filename with .pth extension (e.g., 'MOMENT_finetune_large_model.pth').
    """
    # e.g. MOMENT_finetune_MOMENT-1-large_pupil_gt_model.pth
    return f"{run_name}_model.pth"

get_debug_string_to_add

get_debug_string_to_add() -> str

Get prefix string for debug experiment names.

RETURNS DESCRIPTION
str

Debug prefix '__DEBUG_'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_debug_string_to_add() -> str:
    """Get prefix string for debug experiment names.

    Returns
    -------
    str
        Debug prefix '__DEBUG_'.
    """
    return "__DEBUG_"

get_demo_string_to_add

get_demo_string_to_add() -> str

Get prefix string for demo data experiment names.

RETURNS DESCRIPTION
str

Demo data prefix '__DEMODATA_'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_demo_string_to_add() -> str:
    """Get prefix string for demo data experiment names.

    Returns
    -------
    str
        Demo data prefix '__DEMODATA_'.
    """
    return "__DEMODATA_"

get_synthetic_string_to_add

get_synthetic_string_to_add() -> str

Get prefix string for synthetic data experiment names.

Part of the 4-gate isolation architecture. See src/utils/data_mode.py.

RETURNS DESCRIPTION
str

Synthetic data prefix 'synth_'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_synthetic_string_to_add() -> str:
    """Get prefix string for synthetic data experiment names.

    Part of the 4-gate isolation architecture. See src/utils/data_mode.py.

    Returns
    -------
    str
        Synthetic data prefix 'synth_'.
    """
    from src.utils.data_mode import SYNTHETIC_EXPERIMENT_PREFIX

    return SYNTHETIC_EXPERIMENT_PREFIX

if_runname_is_debug

if_runname_is_debug(run_name: str) -> bool

Check if run name indicates a debug run.

PARAMETER DESCRIPTION
run_name

Name of the run to check.

TYPE: str

RETURNS DESCRIPTION
bool

True if run name contains the debug prefix.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def if_runname_is_debug(run_name: str) -> bool:
    """Check if run name indicates a debug run.

    Parameters
    ----------
    run_name : str
        Name of the run to check.

    Returns
    -------
    bool
        True if run name contains the debug prefix.
    """
    return get_debug_string_to_add() in run_name

experiment_name_wrapper

experiment_name_wrapper(
    experiment_name: str, cfg: DictConfig
) -> str

Add prefixes to experiment name based on configuration flags.

Prepends demo data, debug, and/or synthetic prefixes to the experiment name if the corresponding configuration flags are set.

Part of the 4-gate isolation architecture. See src/utils/data_mode.py.

Priority order (applied in reverse so first prefix appears first): 1. synthetic (synth_) - from EXPERIMENT.is_synthetic or data_mode detection 2. demo data (__DEMODATA_) - from EXPERIMENT.use_demo_data 3. debug (__DEBUG_) - from EXPERIMENT.debug

PARAMETER DESCRIPTION
experiment_name

Base experiment name.

TYPE: str

cfg

Configuration with EXPERIMENT.use_demo_data, EXPERIMENT.debug, and EXPERIMENT.is_synthetic flags.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Experiment name with appropriate prefixes.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def experiment_name_wrapper(experiment_name: str, cfg: DictConfig) -> str:
    """Add prefixes to experiment name based on configuration flags.

    Prepends demo data, debug, and/or synthetic prefixes to the experiment name
    if the corresponding configuration flags are set.

    Part of the 4-gate isolation architecture. See src/utils/data_mode.py.

    Priority order (applied in reverse so first prefix appears first):
    1. synthetic (synth_) - from EXPERIMENT.is_synthetic or data_mode detection
    2. demo data (__DEMODATA_) - from EXPERIMENT.use_demo_data
    3. debug (__DEBUG_) - from EXPERIMENT.debug

    Parameters
    ----------
    experiment_name : str
        Base experiment name.
    cfg : DictConfig
        Configuration with EXPERIMENT.use_demo_data, EXPERIMENT.debug,
        and EXPERIMENT.is_synthetic flags.

    Returns
    -------
    str
        Experiment name with appropriate prefixes.
    """
    from src.utils.data_mode import is_synthetic_from_config

    if cfg["EXPERIMENT"]["use_demo_data"]:
        experiment_name = get_demo_string_to_add() + experiment_name
    if cfg["EXPERIMENT"]["debug"]:
        experiment_name = get_debug_string_to_add() + experiment_name

    # Add synthetic prefix if detected from config
    # This includes EXPERIMENT.is_synthetic=true, experiment_prefix="synth_",
    # or DATA.data_path contains "synthetic"
    if is_synthetic_from_config(cfg):
        experiment_name = get_synthetic_string_to_add() + experiment_name

    return experiment_name

get_outlier_detection_experiment_name

get_outlier_detection_experiment_name(
    cfg: DictConfig,
) -> str

Get experiment name for outlier detection from configuration.

PARAMETER DESCRIPTION
cfg

Configuration containing PREFECT.FLOW_NAMES.OUTLIER_DETECTION.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Experiment name with appropriate prefixes applied.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_outlier_detection_experiment_name(cfg: DictConfig) -> str:
    """Get experiment name for outlier detection from configuration.

    Parameters
    ----------
    cfg : DictConfig
        Configuration containing PREFECT.FLOW_NAMES.OUTLIER_DETECTION.

    Returns
    -------
    str
        Experiment name with appropriate prefixes applied.
    """
    experiment_name = experiment_name_wrapper(
        experiment_name=cfg["PREFECT"]["FLOW_NAMES"]["OUTLIER_DETECTION"], cfg=cfg
    )
    return experiment_name

get_model_name_from_run_name

get_model_name_from_run_name(
    run_name: str, task: str
) -> Tuple[str, str]

Extract model name and key from run name.

For MOMENT models, strips version and size information to create a normalized key. For other models, the key equals the model name.

PARAMETER DESCRIPTION
run_name

Full run name containing model information.

TYPE: str

task

Task type (currently unused, reserved for future use).

TYPE: str

RETURNS DESCRIPTION
tuple of str

Tuple of (model_name, model_key) where model_key is normalized.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_model_name_from_run_name(run_name: str, task: str) -> Tuple[str, str]:
    """Extract model name and key from run name.

    For MOMENT models, strips version and size information to create a
    normalized key. For other models, the key equals the model name.

    Parameters
    ----------
    run_name : str
        Full run name containing model information.
    task : str
        Task type (currently unused, reserved for future use).

    Returns
    -------
    tuple of str
        Tuple of (model_name, model_key) where model_key is normalized.
    """
    model_name = run_name.split("_")[0]
    if "MOMENT" in run_name:
        model_key = (
            run_name.replace("MOMENT-1", "")
            .replace("-large", "")
            .replace("-base", "")
            .replace("-small", "")
            .replace("pupil", "")
        )
    else:
        model_key = model_name
    return model_name, model_key

get_foundation_model_names

get_foundation_model_names() -> List[str]

Get list of supported foundation model names.

RETURNS DESCRIPTION
list of str

Names of foundation models: MOMENT and UniTS.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_foundation_model_names() -> List[str]:
    """Get list of supported foundation model names.

    Returns
    -------
    list of str
        Names of foundation models: MOMENT and UniTS.
    """
    return ["MOMENT", "UniTS"]

get_simple_outlier_detectors

get_simple_outlier_detectors() -> List[str]

Get list of traditional outlier detection method names.

RETURNS DESCRIPTION
list of str

Names of simple outlier detectors: LOF, OneClassSVM, PROPHET.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_simple_outlier_detectors() -> List[str]:
    """Get list of traditional outlier detection method names.

    Returns
    -------
    list of str
        Names of simple outlier detectors: LOF, OneClassSVM, PROPHET.
    """
    return ["LOF", "OneClassSVM", "PROPHET"]

get_eval_metric_name

get_eval_metric_name(
    cls_model_name: str, cfg: DictConfig
) -> str

Extract evaluation metric name from classifier configuration.

Looks for metric_val in HYPERPARAMS (XGBoost, CatBoost, TabM) or fit_params.scoring (Logistic Regression).

PARAMETER DESCRIPTION
cls_model_name

Name of the classifier model.

TYPE: str

cfg

Configuration containing CLS_HYPERPARAMS for the model.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Name of the evaluation metric.

RAISES DESCRIPTION
ValueError

If eval_metric cannot be found in the configuration.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_eval_metric_name(cls_model_name: str, cfg: DictConfig) -> str:
    """Extract evaluation metric name from classifier configuration.

    Looks for metric_val in HYPERPARAMS (XGBoost, CatBoost, TabM) or
    fit_params.scoring (Logistic Regression).

    Parameters
    ----------
    cls_model_name : str
        Name of the classifier model.
    cfg : DictConfig
        Configuration containing CLS_HYPERPARAMS for the model.

    Returns
    -------
    str
        Name of the evaluation metric.

    Raises
    ------
    ValueError
        If eval_metric cannot be found in the configuration.
    """
    hparam_cfg = cfg["CLS_HYPERPARAMS"][cls_model_name]
    if "metric_val" in hparam_cfg["HYPERPARAMS"]:
        # XGBoost, CatBoost, TabM
        eval_metric = hparam_cfg["HYPERPARAMS"]["metric_val"]
    elif "fit_params" in hparam_cfg["HYPERPARAMS"]:
        # Logistic regression
        eval_metric = hparam_cfg["HYPERPARAMS"]["fit_params"]["scoring"]
    else:
        logger.error("Where is your eval_metric defined? ({})".format(cls_model_name))
        raise ValueError(
            "Where is your eval_metric defined? ({})".format(cls_model_name)
        )
    return eval_metric

get_train_loss_name

get_train_loss_name(cfg: DictConfig) -> str

Get training loss function name from configuration.

PARAMETER DESCRIPTION
cfg

Configuration containing CLASSIFICATION_SETTINGS.loss.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Name of the loss function.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_train_loss_name(cfg: DictConfig) -> str:
    """Get training loss function name from configuration.

    Parameters
    ----------
    cfg : DictConfig
        Configuration containing CLASSIFICATION_SETTINGS.loss.

    Returns
    -------
    str
        Name of the loss function.
    """
    return cfg["CLASSIFICATION_SETTINGS"]["loss"]

update_cls_run_name

update_cls_run_name(
    cls_model_name: str,
    source_name: str,
    model_cfg: DictConfig,
    hparam_cfg: DictConfig,
    cfg: DictConfig,
) -> str

Construct classification run name from model and source information.

PARAMETER DESCRIPTION
cls_model_name

Name of the classifier model.

TYPE: str

source_name

Name of the data source/preprocessing pipeline.

TYPE: str

model_cfg

Model configuration (currently unused).

TYPE: DictConfig

hparam_cfg

Hyperparameter configuration (currently unused).

TYPE: DictConfig

cfg

Full configuration for extracting eval metric.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Run name in format '{model}eval-{metric}_'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def update_cls_run_name(
    cls_model_name: str,
    source_name: str,
    model_cfg: DictConfig,
    hparam_cfg: DictConfig,
    cfg: DictConfig,
) -> str:
    """Construct classification run name from model and source information.

    Parameters
    ----------
    cls_model_name : str
        Name of the classifier model.
    source_name : str
        Name of the data source/preprocessing pipeline.
    model_cfg : DictConfig
        Model configuration (currently unused).
    hparam_cfg : DictConfig
        Hyperparameter configuration (currently unused).
    cfg : DictConfig
        Full configuration for extracting eval metric.

    Returns
    -------
    str
        Run name in format '{model}_eval-{metric}__{source}'.
    """
    # train_loss = get_train_loss_name(cfg)
    eval_metric = get_eval_metric_name(cls_model_name, cfg)
    return f"{cls_model_name}_eval-{eval_metric}__{source_name}"

get_embedding_npy_fname

get_embedding_npy_fname(model_name: str, split: str) -> str

Generate filename for embedding numpy array.

PARAMETER DESCRIPTION
model_name

Name of the model that generated embeddings.

TYPE: str

split

Data split name (e.g., 'train', 'test').

TYPE: str

RETURNS DESCRIPTION
str

Filename in format '{model_name}embedding.npy'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_embedding_npy_fname(model_name: str, split: str) -> str:
    """Generate filename for embedding numpy array.

    Parameters
    ----------
    model_name : str
        Name of the model that generated embeddings.
    split : str
        Data split name (e.g., 'train', 'test').

    Returns
    -------
    str
        Filename in format '{model_name}_embedding_{split}.npy'.
    """
    return f"{model_name}_embedding_{split}.npy"

get_moment_cls_run_name

get_moment_cls_run_name(
    cls_model_name: str, cls_model_cfg: DictConfig
) -> str

Generate classification run name for MOMENT model.

Encodes model variant, detection type, and loss weighting in the name.

PARAMETER DESCRIPTION
cls_model_name

Base classifier model name.

TYPE: str

cls_model_cfg

MOMENT model configuration with MODEL settings.

TYPE: DictConfig

RETURNS DESCRIPTION
str

Run name in format '{model}-{variant}_{detection_type}[_w]'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_moment_cls_run_name(cls_model_name: str, cls_model_cfg: DictConfig) -> str:
    """Generate classification run name for MOMENT model.

    Encodes model variant, detection type, and loss weighting in the name.

    Parameters
    ----------
    cls_model_name : str
        Base classifier model name.
    cls_model_cfg : DictConfig
        MOMENT model configuration with MODEL settings.

    Returns
    -------
    str
        Run name in format '{model}-{variant}_{detection_type}[_w]'.
    """
    model_variant = (
        cls_model_cfg["MODEL"]["pretrained_model_name_or_path"]
        .split("/")[-1]
        .split("-")[-1]
    )
    detection_type = cls_model_cfg["MODEL"]["detection_type"]
    weighing_string = "_w" if cls_model_cfg["MODEL"]["use_weighed_loss"] else ""
    cls_run_name = f"{cls_model_name}-{model_variant}_{detection_type}{weighing_string}"
    return cls_run_name

get_imputation_pickle_name

get_imputation_pickle_name(model_name: str) -> str

Generate pickle filename for imputation results.

PARAMETER DESCRIPTION
model_name

Name of the imputation model.

TYPE: str

RETURNS DESCRIPTION
str

Filename in format 'imputation_{model_name}.pickle'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_imputation_pickle_name(model_name: str) -> str:
    """Generate pickle filename for imputation results.

    Parameters
    ----------
    model_name : str
        Name of the imputation model.

    Returns
    -------
    str
        Filename in format 'imputation_{model_name}.pickle'.
    """
    return f"imputation_{model_name}.pickle"

get_summary_fname

get_summary_fname(experiment_name: str) -> str

Generate summary database filename from experiment name.

PARAMETER DESCRIPTION
experiment_name

Name of the experiment.

TYPE: str

RETURNS DESCRIPTION
str

Filename with 'PLR_' prefix removed and .db extension.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_summary_fname(experiment_name: str) -> str:
    """Generate summary database filename from experiment name.

    Parameters
    ----------
    experiment_name : str
        Name of the experiment.

    Returns
    -------
    str
        Filename with 'PLR_' prefix removed and .db extension.
    """
    return f"summary_{experiment_name.replace('PLR_', '')}.db"

get_summary_fpath

get_summary_fpath(experiment_name: str) -> str

Get full path for summary database, removing existing file if present.

PARAMETER DESCRIPTION
experiment_name

Name of the experiment.

TYPE: str

RETURNS DESCRIPTION
str

Full path to summary database file.

Notes

Deletes existing file at the path before returning.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_summary_fpath(experiment_name: str) -> str:
    """Get full path for summary database, removing existing file if present.

    Parameters
    ----------
    experiment_name : str
        Name of the experiment.

    Returns
    -------
    str
        Full path to summary database file.

    Notes
    -----
    Deletes existing file at the path before returning.
    """
    dir_out = get_artifacts_dir("dataframes")
    db_fname = get_summary_fname(experiment_name)
    db_path = dir_out / db_fname
    if db_path.exists():
        db_path.unlink()
    return str(db_path)

get_summary_artifacts_fname

get_summary_artifacts_fname(experiment_name: str) -> str

Generate summary artifacts pickle filename from experiment name.

PARAMETER DESCRIPTION
experiment_name

Name of the experiment.

TYPE: str

RETURNS DESCRIPTION
str

Filename with 'PLR_' prefix removed and .pickle extension.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_summary_artifacts_fname(experiment_name: str) -> str:
    """Generate summary artifacts pickle filename from experiment name.

    Parameters
    ----------
    experiment_name : str
        Name of the experiment.

    Returns
    -------
    str
        Filename with 'PLR_' prefix removed and .pickle extension.
    """
    return f"summary_artifacts_{experiment_name.replace('PLR_', '')}.pickle"

get_summary_artifacts_fpath

get_summary_artifacts_fpath(experiment_name: str) -> str

Get full path for summary artifacts pickle, removing existing file if present.

PARAMETER DESCRIPTION
experiment_name

Name of the experiment.

TYPE: str

RETURNS DESCRIPTION
str

Full path to summary artifacts pickle file.

Notes

Deletes existing file at the path before returning.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def get_summary_artifacts_fpath(experiment_name: str) -> str:
    """Get full path for summary artifacts pickle, removing existing file if present.

    Parameters
    ----------
    experiment_name : str
        Name of the experiment.

    Returns
    -------
    str
        Full path to summary artifacts pickle file.

    Notes
    -----
    Deletes existing file at the path before returning.
    """
    dir_out = get_artifacts_dir("artifacts")
    fname = get_summary_artifacts_fname(experiment_name)
    fpath = dir_out / fname
    if fpath.exists():
        fpath.unlink()
    return str(fpath)

parse_task_from_exp_name

parse_task_from_exp_name(experiment_name: str) -> str

Parse task type from experiment name string.

PARAMETER DESCRIPTION
experiment_name

Name of the experiment containing task identifier.

TYPE: str

RETURNS DESCRIPTION
str

Task type: 'outlier_detection', 'imputation', 'classification', or 'featurization'.

Source code in src/log_helpers/log_naming_uris_and_dirs.py
def parse_task_from_exp_name(experiment_name: str) -> str:
    """Parse task type from experiment name string.

    Parameters
    ----------
    experiment_name : str
        Name of the experiment containing task identifier.

    Returns
    -------
    str
        Task type: 'outlier_detection', 'imputation', 'classification',
        or 'featurization'.
    """
    # You could as well use the cfg hard-coded names?
    if "OutlierDetection" in experiment_name:
        task = "outlier_detection"
    elif "Imputation" in experiment_name:
        task = "imputation"
    elif "Classification" in experiment_name:
        task = "classification"
    elif "Featurization" in experiment_name:
        task = "featurization"
    return task

Model Retraining

retrain_or_not

check_if_imputation_model_trained_already_from_mlflow

check_if_imputation_model_trained_already_from_mlflow(
    cfg: DictConfig, run_name: str, model_type: str
) -> dict | None

Check if an imputation model with matching configuration exists in MLflow.

PARAMETER DESCRIPTION
cfg

Configuration for determining search parameters.

TYPE: DictConfig

run_name

Name of the run to search for.

TYPE: str

model_type

Type of model to search for.

TYPE: str

RETURNS DESCRIPTION
dict or None

Best matching run data if found, None otherwise.

Source code in src/log_helpers/retrain_or_not.py
def check_if_imputation_model_trained_already_from_mlflow(
    cfg: DictConfig,
    run_name: str,
    model_type: str,
) -> dict | None:
    """Check if an imputation model with matching configuration exists in MLflow.

    Parameters
    ----------
    cfg : DictConfig
        Configuration for determining search parameters.
    run_name : str
        Name of the run to search for.
    model_type : str
        Type of model to search for.

    Returns
    -------
    dict or None
        Best matching run data if found, None otherwise.
    """
    current_experiment, metric_string, split_key, metric_direction = (
        what_to_search_from_mlflow(run_name=run_name, cfg=cfg, model_type=model_type)
    )

    if current_experiment is not None:
        logger.info(
            "MLflow | Searching for the best model (metric = {}, split_key = {}, "
            "direction = {})".format(metric_string, split_key, metric_direction)
        )

        best_run = return_best_mlflow_run(
            current_experiment,
            metric_string,
            split_key,
            metric_direction,
            run_name=run_name,
        )

    else:
        logger.debug(
            "No previous (best) runs found from MLflow, need to re-train the model"
        )
        best_run = None

    return best_run

if_retrain_the_imputation_model

if_retrain_the_imputation_model(
    cfg: DictConfig,
    run_name: str | None = None,
    model_type: str = "imputation",
) -> tuple[bool, dict]

Determine whether to retrain an imputation model.

Checks configuration flag and MLflow history to decide if retraining is needed.

PARAMETER DESCRIPTION
cfg

Configuration with IMPUTATION_TRAINING.retrain_models flag.

TYPE: DictConfig

run_name

Name of the run to check.

TYPE: str DEFAULT: None

model_type

Type of model.

TYPE: str DEFAULT: "imputation"

RETURNS DESCRIPTION
tuple

Tuple of (should_retrain: bool, best_run: dict).

Source code in src/log_helpers/retrain_or_not.py
def if_retrain_the_imputation_model(
    cfg: DictConfig,
    run_name: str | None = None,
    model_type: str = "imputation",
) -> tuple[bool, dict]:
    """Determine whether to retrain an imputation model.

    Checks configuration flag and MLflow history to decide if retraining
    is needed.

    Parameters
    ----------
    cfg : DictConfig
        Configuration with IMPUTATION_TRAINING.retrain_models flag.
    run_name : str, optional
        Name of the run to check.
    model_type : str, default "imputation"
        Type of model.

    Returns
    -------
    tuple
        Tuple of (should_retrain: bool, best_run: dict).
    """
    if cfg["IMPUTATION_TRAINING"]["retrain_models"]:
        # No matter what, always retrain the model
        logger.debug("You had retraining model set to True, so retraining the model")
        return True, {}
    else:
        # check all the previous runs from MLflow, and see if you have already trained the model
        best_run = check_if_imputation_model_trained_already_from_mlflow(
            cfg=cfg,
            run_name=run_name,
            model_type=model_type,
        )
        if best_run is not None:
            logger.debug("Found previous runs from MLflow, so skipping the retraining")
            return False, best_run
        else:
            logger.debug("No previous runs found from MLflow, so training the model")
            return True, {}

check_if_imputation_source_featurized_already_from_mlflow

check_if_imputation_source_featurized_already_from_mlflow(
    cfg: DictConfig, experiment_name: str, run_name: str
) -> bool

Check if features have already been extracted for an imputation source.

PARAMETER DESCRIPTION
cfg

Configuration object (currently unused).

TYPE: DictConfig

experiment_name

MLflow experiment name.

TYPE: str

run_name

Run name to search for.

TYPE: str

RETURNS DESCRIPTION
bool

True if featurization run exists, False otherwise.

Source code in src/log_helpers/retrain_or_not.py
def check_if_imputation_source_featurized_already_from_mlflow(
    cfg: DictConfig,
    experiment_name: str,
    run_name: str,
) -> bool:
    """Check if features have already been extracted for an imputation source.

    Parameters
    ----------
    cfg : DictConfig
        Configuration object (currently unused).
    experiment_name : str
        MLflow experiment name.
    run_name : str
        Run name to search for.

    Returns
    -------
    bool
        True if featurization run exists, False otherwise.
    """
    current_experiment = dict(mlflow.get_experiment_by_name(experiment_name))
    df: pd.DataFrame = mlflow.search_runs([current_experiment["experiment_id"]])

    if df.shape[0] == 0:
        logger.debug("No previous runs found from MLflow, need to re-featurize")
        return False
    else:
        if run_name in df["tags.mlflow.runName"].values:
            logger.debug(
                f"Found previous runs (n={df.shape[0]}) from MLflow, "
                f"so skipping the refeaturization for '{run_name}'"
            )
            return True

if_refeaturize_from_imputation

if_refeaturize_from_imputation(
    run_name: str, experiment_name: str, cfg: DictConfig
) -> bool

Determine whether to re-extract features from imputation results.

PARAMETER DESCRIPTION
run_name

Run name to check.

TYPE: str

experiment_name

MLflow experiment name.

TYPE: str

cfg

Configuration with PLR_FEATURIZATION.re_featurize flag.

TYPE: DictConfig

RETURNS DESCRIPTION
bool

True if re-featurization is needed.

Source code in src/log_helpers/retrain_or_not.py
def if_refeaturize_from_imputation(
    run_name: str, experiment_name: str, cfg: DictConfig
) -> bool:
    """Determine whether to re-extract features from imputation results.

    Parameters
    ----------
    run_name : str
        Run name to check.
    experiment_name : str
        MLflow experiment name.
    cfg : DictConfig
        Configuration with PLR_FEATURIZATION.re_featurize flag.

    Returns
    -------
    bool
        True if re-featurization is needed.
    """
    if cfg["PLR_FEATURIZATION"]["re_featurize"]:
        # No matter what, always retrain the model
        logger.debug("You had re_featurize set to True, so re_featurizing the data")
        return True

    else:
        # check all the previous runs from MLflow, and see if you have already trained the model
        already_featurized = check_if_imputation_source_featurized_already_from_mlflow(
            cfg=cfg,
            experiment_name=experiment_name,
            run_name=run_name,
        )
        if already_featurized:
            logger.info("MLflow found -> Skipping the refeaturization for the sources")
            return False
        else:
            logger.info("MLflow not found -> Refeaturizing all the sources")
            return True

if_recompute_and_viz_imputation_metrics

if_recompute_and_viz_imputation_metrics(
    _recompute: bool = True,
) -> bool

Determine whether to recompute and visualize imputation metrics.

PARAMETER DESCRIPTION
_recompute

Input flag (currently unused — placeholder implementation).

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
bool

Always returns True in current implementation.

Notes

This is a placeholder function. Future implementation should check for previously computed metrics to avoid redundant computation.

Source code in src/log_helpers/retrain_or_not.py
def if_recompute_and_viz_imputation_metrics(_recompute: bool = True) -> bool:
    """Determine whether to recompute and visualize imputation metrics.

    Parameters
    ----------
    _recompute : bool, default True
        Input flag (currently unused — placeholder implementation).

    Returns
    -------
    bool
        Always returns True in current implementation.

    Notes
    -----
    This is a placeholder function. Future implementation should check
    for previously computed metrics to avoid redundant computation.
    """
    true_out = True
    # TODO! implement this at some point, if you have this False, and you don't check
    #  for previously computed metrics, your downstream code will crash while you still have the imputation done,
    #  but not the metrics
    logger.warning(
        "Placeholder for metric recomputation decision, returning now = {}".format(
            true_out
        )
    )
    return true_out

if_recreate_ensemble

if_recreate_ensemble(
    ensemble_name: str,
    experiment_name: str,
    cfg: DictConfig,
) -> bool

Determine whether to recreate an ensemble model.

PARAMETER DESCRIPTION
ensemble_name

Name of the ensemble.

TYPE: str

experiment_name

MLflow experiment name.

TYPE: str

cfg

Configuration object (currently unused).

TYPE: DictConfig

RETURNS DESCRIPTION
bool

True if no previous runs found, False otherwise.

Source code in src/log_helpers/retrain_or_not.py
def if_recreate_ensemble(
    ensemble_name: str, experiment_name: str, cfg: DictConfig
) -> bool:
    """Determine whether to recreate an ensemble model.

    Parameters
    ----------
    ensemble_name : str
        Name of the ensemble.
    experiment_name : str
        MLflow experiment name.
    cfg : DictConfig
        Configuration object (currently unused).

    Returns
    -------
    bool
        True if no previous runs found, False otherwise.
    """
    current_experiment = dict(mlflow.get_experiment_by_name(experiment_name))
    df: pd.DataFrame = mlflow.search_runs([current_experiment["experiment_id"]])

    if df.shape[0] == 0:
        logger.warning("No previous runs found from MLflow, need to re-ensemble")
        return True
    else:
        logger.warning(
            f"Found previous runs (n={df.shape[0]}) from MLflow, "
            f"so skipping the re-ensembling for '{ensemble_name}'"
        )
        return False

System Utilities

system_utils

get_commit_id

get_commit_id(return_short: bool = True) -> str

Get current git commit ID.

PARAMETER DESCRIPTION
return_short

If True, return short hash; otherwise return full hash.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
str

Git commit hash, or np.nan if git is not available.

Source code in src/log_helpers/system_utils.py
def get_commit_id(return_short: bool = True) -> str:
    """Get current git commit ID.

    Parameters
    ----------
    return_short : bool, default True
        If True, return short hash; otherwise return full hash.

    Returns
    -------
    str
        Git commit hash, or np.nan if git is not available.
    """

    def get_git_revision_hash() -> str:
        return (
            subprocess.check_output(["git", "rev-parse", "HEAD"])
            .decode("ascii")
            .strip()
        )

    def get_git_revision_short_hash() -> str:
        return (
            subprocess.check_output(["git", "rev-parse", "--short", "HEAD"])
            .decode("ascii")
            .strip()
        )

    # Get the current git commit id
    try:
        git_hash_short = get_git_revision_short_hash()
        git_hash = get_git_revision_hash()
    except Exception as e:
        logger.warning("Failed to get the git hash, e = {}".format(e))
        git_hash_short, git_hash = np.nan, np.nan

    if return_short:
        return git_hash_short
    else:
        return git_hash

get_processor_info

get_processor_info()

Get CPU model name from system.

RETURNS DESCRIPTION
str or nan

CPU model name, or np.nan if detection fails.

Notes

Currently only fully implemented for Linux. Windows and macOS have placeholder implementations.

Source code in src/log_helpers/system_utils.py
def get_processor_info():
    """Get CPU model name from system.

    Returns
    -------
    str or np.nan
        CPU model name, or np.nan if detection fails.

    Notes
    -----
    Currently only fully implemented for Linux. Windows and macOS
    have placeholder implementations.
    """
    model_name = np.nan

    if system() == "Windows":
        all_info = processor()
        # cpuinfo better? https://stackoverflow.com/a/62888665
        logger.warning("You need to add to Windows parsing for your CPU name")

    elif system() == "Darwin":
        all_info = subprocess.check_output(
            ["/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string"]
        ).strip()
        logger.warning("You need to add to Mac parsing for your CPU name")

    elif system() == "Linux":
        command = "cat /proc/cpuinfo"
        all_info = subprocess.check_output(command, shell=True).decode().strip()
        for line in all_info.split("\n"):
            if "model name" in line:
                model_name = re.sub(".*model name.*:", "", line, 1)

    else:
        logger.warning("Unknown OS = {}, cannot get the CPU name".format(system()))

    return model_name

get_system_params

get_system_params()

Get system hardware parameters.

RETURNS DESCRIPTION
dict

Dictionary with 'CPU' (model name) and 'RAM_GB' (total RAM in GB).

Source code in src/log_helpers/system_utils.py
def get_system_params():
    """Get system hardware parameters.

    Returns
    -------
    dict
        Dictionary with 'CPU' (model name) and 'RAM_GB' (total RAM in GB).
    """
    # CPU/Mem

    dict = {
        "CPU": get_processor_info(),
        "RAM_GB": str(round(psutil.virtual_memory().total / (1024**3), 1)),
    }
    return dict

get_library_versions

get_library_versions() -> dict

Get versions of key Python libraries.

RETURNS DESCRIPTION
dict

Dictionary with version strings for Python, NumPy, Polars, OS, PyTorch, CUDA, and cuDNN.

Source code in src/log_helpers/system_utils.py
def get_library_versions() -> dict:
    """Get versions of key Python libraries.

    Returns
    -------
    dict
        Dictionary with version strings for Python, NumPy, Polars, OS,
        PyTorch, CUDA, and cuDNN.
    """
    metadata = {}
    try:
        metadata["v_Python"] = python_version()
        metadata["v_Numpy"] = np.__version__
        metadata["v_Polars"] = pl.__version__
        metadata["v_OS"] = system()
        metadata["v_OS_kernel"] = release()  # in Linux systems
        metadata["v_Torch"] = str(torch.__version__)
        # https://www.thepythoncode.com/article/get-hardware-system-information-python
    except Exception as e:
        logger.warning("Problem getting library versions, error = {}".format(e))

    try:
        metadata["v_CUDA"] = torch.version.cuda
        metadata["v_CuDNN"] = torch.backends.cudnn.version()
    except Exception as e:
        logger.warning("Problem getting CUDA library versions, error = {}".format(e))

    return metadata

get_system_param_dict

get_system_param_dict() -> dict

Get comprehensive system parameters dictionary.

Collects hardware info, library versions, and git commit for reproducibility logging.

RETURNS DESCRIPTION
dict

Dictionary with 'system', 'libraries', and 'git_commit' keys.

Source code in src/log_helpers/system_utils.py
def get_system_param_dict() -> dict:
    """Get comprehensive system parameters dictionary.

    Collects hardware info, library versions, and git commit for
    reproducibility logging.

    Returns
    -------
    dict
        Dictionary with 'system', 'libraries', and 'git_commit' keys.
    """
    # In a way, might as well log everything, but at some point you just clutter the MLflow UI
    # You could dump this dict to a file as well and log it as an artifact?
    dict = {
        "system": get_system_params(),
        "libraries": get_library_versions(),
        "git_commit": {"git": get_commit_id()},
        # DVC commit?
    }

    return dict

Visualization Logging

viz_log_utils

get_run_ids_from_infos

get_run_ids_from_infos(mlflow_infos)

Extract run IDs from MLflow info dictionaries.

PARAMETER DESCRIPTION
mlflow_infos

Dictionary mapping names to MLflow info with 'run_info' containing 'run_id'.

TYPE: dict

RETURNS DESCRIPTION
dict

Mapping of names to run IDs.

Source code in src/log_helpers/viz_log_utils.py
def get_run_ids_from_infos(mlflow_infos):
    """Extract run IDs from MLflow info dictionaries.

    Parameters
    ----------
    mlflow_infos : dict
        Dictionary mapping names to MLflow info with 'run_info' containing 'run_id'.

    Returns
    -------
    dict
        Mapping of names to run IDs.
    """
    run_ids = {}
    for name in mlflow_infos.keys():
        run_ids[name] = mlflow_infos[name]["run_info"]["run_id"]
    return run_ids

export_viz_as_artifacts

export_viz_as_artifacts(
    fig_paths: dict,
    flow_type: str,
    cfg: DictConfig,
    mlflow_run_ids: dict = None,
    mlflow_infos: dict = None,
)

Export visualization files as MLflow artifacts.

Logs figure files to all relevant MLflow runs. Useful for aggregated visualizations that span multiple model runs.

PARAMETER DESCRIPTION
fig_paths

Dictionary mapping figure names to file paths.

TYPE: dict

flow_type

Type of flow for logging context.

TYPE: str

cfg

Configuration object (currently unused).

TYPE: DictConfig

mlflow_run_ids

Pre-computed mapping of model names to run IDs.

TYPE: dict DEFAULT: None

mlflow_infos

MLflow info dictionaries to extract run IDs from.

TYPE: dict DEFAULT: None

RAISES DESCRIPTION
ValueError

If neither mlflow_run_ids nor mlflow_infos is provided.

Source code in src/log_helpers/viz_log_utils.py
def export_viz_as_artifacts(
    fig_paths: dict,
    flow_type: str,
    cfg: DictConfig,
    mlflow_run_ids: dict = None,
    mlflow_infos: dict = None,
):
    """Export visualization files as MLflow artifacts.

    Logs figure files to all relevant MLflow runs. Useful for aggregated
    visualizations that span multiple model runs.

    Parameters
    ----------
    fig_paths : dict
        Dictionary mapping figure names to file paths.
    flow_type : str
        Type of flow for logging context.
    cfg : DictConfig
        Configuration object (currently unused).
    mlflow_run_ids : dict, optional
        Pre-computed mapping of model names to run IDs.
    mlflow_infos : dict, optional
        MLflow info dictionaries to extract run IDs from.

    Raises
    ------
    ValueError
        If neither mlflow_run_ids nor mlflow_infos is provided.
    """
    logger.info(f"Logging the {flow_type} visualizations as artifacts")
    if mlflow_run_ids is None:
        if mlflow_infos is not None:
            mlflow_run_ids = get_run_ids_from_infos(mlflow_infos)
        else:
            logger.error("Need some information about the MLflow run")
            raise ValueError("Need some information about the MLflow run")

    for fig_name, path_output_dir in fig_paths.items():
        logger.debug(f"Logging the {fig_name} as artifact from {path_output_dir}")
        for model_name, run_id in mlflow_run_ids.items():
            # Note! This is not run-specific plots as it aggregates all the models (i.e. various MLflow runs)
            # Logging now to every run separately, PNGs are not that massive in the end
            try:
                with mlflow.start_run(run_id):
                    logger.debug(
                        f"MLFLOW Artifact Log | model_name = {model_name}, run_id = {run_id}"
                    )
                    mlflow.log_artifact(path_output_dir, "figures")
            except Exception as e:
                logger.error(
                    f"Could not save the {flow_type} visualization to MLflow: {e}"
                )

Polars Utilities

polars_utils

cast_numeric_polars_cols

cast_numeric_polars_cols(
    df: DataFrame, cast_to: str = "Float64"
)

Cast all numeric columns in Polars DataFrame to specified type.

Useful for avoiding schema errors when combining DataFrames with different numeric precision.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

cast_to

Target numeric type.

TYPE: str DEFAULT: "Float64"

RETURNS DESCRIPTION
DataFrame

DataFrame with numeric columns cast to specified type.

RAISES DESCRIPTION
NotImplementedError

If cast_to is not "Float64".

Source code in src/log_helpers/polars_utils.py
def cast_numeric_polars_cols(df: pl.DataFrame, cast_to: str = "Float64"):
    """Cast all numeric columns in Polars DataFrame to specified type.

    Useful for avoiding schema errors when combining DataFrames with
    different numeric precision.

    Parameters
    ----------
    df : pl.DataFrame
        Input DataFrame.
    cast_to : str, default "Float64"
        Target numeric type.

    Returns
    -------
    pl.DataFrame
        DataFrame with numeric columns cast to specified type.

    Raises
    ------
    NotImplementedError
        If cast_to is not "Float64".
    """
    # To avoid this:
    # polars.exceptions.SchemaError: type Float32 is incompatible with expected type Float64
    for col in df.columns:
        if df[col].dtype.is_numeric():
            if cast_to == "Float64":
                try:
                    df = df.with_columns(pl.col(col).cast(pl.Float64))
                except Exception as e:
                    logger.error(f"Error in casting the column {col} to Float64: {e}")
            else:
                logger.error(f"Unknown cast_to type: {cast_to}")
                raise NotImplementedError

    return df