Skip to content

Attribution Engine

Confidence aggregation with conformal calibration: ResolvedEntity[]AttributionRecord.

Aggregator

aggregator

Multi-source credit aggregation for attribution.

Core component of Pipeline 3 (Attribution Engine). Aggregates credits from multiple ResolvedEntity objects into a single AttributionRecord. Handles disagreements between sources using weighted voting based on source reliability.

Default source reliability weights (from constants.SOURCE_RELIABILITY_WEIGHTS):

  • MusicBrainz -- highest weight (community-curated, structured data)
  • Discogs -- medium weight (community-curated, less structured)
  • AcoustID -- medium weight (acoustic fingerprint-based)
  • File metadata -- lowest weight (often incomplete or incorrect)
  • Artist input -- high weight (authoritative but potentially biased)
Notes

Implements the multi-source aggregation described in Teikari (2026), Section 5.1. The weighted voting approach ensures that higher-quality sources have more influence on the final attribution, while still incorporating evidence from all available sources.

See Also

music_attribution.attribution.conformal : Calibration of aggregated scores. music_attribution.attribution.persistence : Storage of attribution records. music_attribution.constants : Source reliability weight definitions.

CreditAggregator

CreditAggregator(
    source_weights: dict[SourceEnum, float] | None = None,
)

Aggregate credits from resolved entities into attribution records.

Uses weighted voting based on source reliability to handle disagreements between sources. Produces a complete AttributionRecord with credits, confidence, assurance level, and provenance chain.

PARAMETER DESCRIPTION
source_weights

Per-source reliability weight overrides. If None, uses SOURCE_RELIABILITY_WEIGHTS from constants.

TYPE: dict[SourceEnum, float] | None DEFAULT: None

ATTRIBUTE DESCRIPTION
_source_weights

Active source reliability weights.

TYPE: dict[SourceEnum, float]

See Also

music_attribution.attribution.conformal.ConformalScorer : Calibrate the aggregated score. music_attribution.attribution.priority_queue.ReviewPriorityQueue : Prioritize records for review.

Source code in src/music_attribution/attribution/aggregator.py
def __init__(self, source_weights: dict[SourceEnum, float] | None = None) -> None:
    self._source_weights = source_weights or SOURCE_RELIABILITY_WEIGHTS

aggregate async

aggregate(
    work_entity: ResolvedEntity,
    contributor_entities: list[ResolvedEntity],
    roles: dict[UUID, CreditRoleEnum],
) -> AttributionRecord

Aggregate resolved entities into a single AttributionRecord.

Builds per-credit confidence scores from source reliability weights, computes overall attribution confidence and source agreement, and initializes the provenance chain with a SCORE event.

Records with confidence below REVIEW_THRESHOLD are automatically flagged for human review with priority = 1.0 - confidence.

PARAMETER DESCRIPTION
work_entity

The work or recording entity being attributed.

TYPE: ResolvedEntity

contributor_entities

Resolved contributor entities (artists, producers, etc.).

TYPE: list[ResolvedEntity]

roles

Mapping of contributor entity_id to their credit role (e.g., PERFORMER, COMPOSER, PRODUCER).

TYPE: dict[UUID, CreditRoleEnum]

RETURNS DESCRIPTION
AttributionRecord

Complete attribution record with credits, confidence, assurance level, conformal set placeholder, and provenance.

See Also

music_attribution.attribution.conformal.ConformalScorer.score : Apply conformal calibration to the aggregated record.

Source code in src/music_attribution/attribution/aggregator.py
async def aggregate(
    self,
    work_entity: ResolvedEntity,
    contributor_entities: list[ResolvedEntity],
    roles: dict[uuid.UUID, CreditRoleEnum],
) -> AttributionRecord:
    """Aggregate resolved entities into a single AttributionRecord.

    Builds per-credit confidence scores from source reliability weights,
    computes overall attribution confidence and source agreement, and
    initializes the provenance chain with a SCORE event.

    Records with confidence below ``REVIEW_THRESHOLD`` are automatically
    flagged for human review with priority = ``1.0 - confidence``.

    Parameters
    ----------
    work_entity : ResolvedEntity
        The work or recording entity being attributed.
    contributor_entities : list[ResolvedEntity]
        Resolved contributor entities (artists, producers, etc.).
    roles : dict[uuid.UUID, CreditRoleEnum]
        Mapping of contributor ``entity_id`` to their credit role
        (e.g., PERFORMER, COMPOSER, PRODUCER).

    Returns
    -------
    AttributionRecord
        Complete attribution record with credits, confidence,
        assurance level, conformal set placeholder, and provenance.

    See Also
    --------
    music_attribution.attribution.conformal.ConformalScorer.score :
        Apply conformal calibration to the aggregated record.
    """
    now = datetime.now(UTC)

    # Build credits
    credits = self._build_credits(contributor_entities, roles)

    # Compute overall confidence and agreement
    confidence = self._compute_confidence(credits)
    agreement = self._compute_source_agreement(contributor_entities)

    # Determine assurance level (min of all contributors)
    assurance = self._compute_assurance(contributor_entities)

    # Build provenance chain
    provenance = [
        ProvenanceEvent(
            event_type=ProvenanceEventTypeEnum.SCORE,
            timestamp=now,
            agent="credit-aggregator",
            details=ScoreEventDetails(
                new_confidence=confidence,
                scoring_method="weighted_source_aggregation",
            ),
        ),
    ]

    # Build placeholder conformal set
    conformal = ConformalSet(
        coverage_level=0.9,
        marginal_coverage=0.9,
        calibration_error=0.0,
        calibration_method="placeholder",
        calibration_set_size=0,
    )

    return AttributionRecord(
        work_entity_id=work_entity.entity_id,
        credits=credits,
        assurance_level=assurance,
        confidence_score=confidence,
        conformal_set=conformal,
        source_agreement=agreement,
        provenance_chain=provenance,
        needs_review=confidence < REVIEW_THRESHOLD,
        review_priority=1.0 - confidence,
        created_at=now,
        updated_at=now,
        version=1,
    )

Conformal Prediction

conformal

Conformal prediction confidence scoring for attribution.

Wraps attribution confidence in conformal prediction sets to ensure calibrated uncertainty quantification. When the system says "90% confident", this module guarantees that the prediction set covers the true label at least 90% of the time.

Implements the Adaptive Prediction Sets (APS) method:

  1. Sort candidate labels by decreasing confidence.
  2. Include labels in the prediction set until cumulative confidence reaches the target coverage level.
  3. The resulting set size reflects true uncertainty: larger sets mean more ambiguity.

The CalibrationReport tracks Expected Calibration Error (ECE) using equal-width binning to detect systematic over- or under-confidence.

Notes

Implements the conformal prediction framework described in Teikari (2026), Section 5.2. Based on the theoretical foundations of Vovk et al. (2005), "Algorithmic Learning in a Random World."

References

.. [1] Vovk, V., Gammerman, A., & Shafer, G. (2005). "Algorithmic Learning in a Random World." Springer. .. [2] Romano, Y., Sesia, M., & Candes, E. (2020). "Classification with Valid and Adaptive Coverage." NeurIPS.

See Also

music_attribution.attribution.aggregator : Upstream confidence aggregation. music_attribution.schemas.attribution.ConformalSet : Pydantic model for prediction sets.

CalibrationReport

Bases: BaseModel

Report on calibration quality using equal-width binning.

Tracks Expected Calibration Error (ECE) and per-bin accuracy vs. confidence, enabling detection of systematic over- or under-confidence in the scoring model.

ATTRIBUTE DESCRIPTION
ece

Expected Calibration Error -- weighted average of per-bin |accuracy - confidence|. Lower is better; 0.0 means perfectly calibrated. Must be >= 0.0.

TYPE: float

marginal_coverage

Achieved coverage (fraction of correct predictions) in [0, 1].

TYPE: float

target_coverage

Target coverage level (e.g., 0.9 for 90% coverage).

TYPE: float

calibration_method

Name of the calibration method (e.g., "APS").

TYPE: str

calibration_set_size

Number of samples used for calibration.

TYPE: int

bin_accuracies

Per-bin accuracy values (10 bins by default).

TYPE: list[float]

bin_confidences

Per-bin mean confidence values (10 bins by default).

TYPE: list[float]

timestamp

When the calibration was computed (UTC).

TYPE: datetime

ConformalScorer

Conformal prediction scorer for attribution confidence.

Uses the Adaptive Prediction Sets (APS) method to produce calibrated prediction sets at specified coverage levels. The prediction set includes the minimum number of candidate labels needed to achieve the target coverage.

Notes

A well-calibrated model produces small prediction sets (often just 1 label) for high-confidence predictions and larger sets for ambiguous cases. The set size itself is a useful uncertainty signal.

See Also

music_attribution.attribution.aggregator.CreditAggregator : Produces the raw confidence scores that are calibrated here.

score

score(
    predictions: list[tuple[CreditRoleEnum, float]],
    coverage: float = 0.9,
) -> ConformalSet

Produce a conformal prediction set at the specified coverage level.

Sorts candidate roles by confidence descending, then includes roles until cumulative confidence reaches the target coverage. If total confidence across all candidates is less than the target, all candidates are included.

PARAMETER DESCRIPTION
predictions

List of (role, confidence) tuples. Confidence values should sum to approximately 1.0 for well-calibrated models.

TYPE: list[tuple[CreditRoleEnum, float]]

coverage

Target coverage level. Default is 0.90 (90% coverage).

TYPE: float DEFAULT: 0.9

RETURNS DESCRIPTION
ConformalSet

Prediction set with coverage metadata. The set_sizes field indicates how many labels were needed to achieve coverage (smaller = more confident).

Source code in src/music_attribution/attribution/conformal.py
def score(
    self,
    predictions: list[tuple[CreditRoleEnum, float]],
    coverage: float = 0.90,
) -> ConformalSet:
    """Produce a conformal prediction set at the specified coverage level.

    Sorts candidate roles by confidence descending, then includes
    roles until cumulative confidence reaches the target coverage.
    If total confidence across all candidates is less than the target,
    all candidates are included.

    Parameters
    ----------
    predictions : list[tuple[CreditRoleEnum, float]]
        List of ``(role, confidence)`` tuples. Confidence values
        should sum to approximately 1.0 for well-calibrated models.
    coverage : float, optional
        Target coverage level. Default is 0.90 (90% coverage).

    Returns
    -------
    ConformalSet
        Prediction set with coverage metadata. The ``set_sizes``
        field indicates how many labels were needed to achieve
        coverage (smaller = more confident).
    """
    if not predictions:
        return ConformalSet(
            coverage_level=coverage,
            prediction_sets={},
            set_sizes={},
            marginal_coverage=coverage,
            calibration_error=0.0,
            calibration_method="APS",
            calibration_set_size=0,
        )

    # Sort by confidence descending
    sorted_preds = sorted(predictions, key=lambda x: x[1], reverse=True)

    # Build prediction set: include roles until cumulative confidence >= coverage
    prediction_set: list[CreditRoleEnum] = []
    cumulative = 0.0
    for role, conf in sorted_preds:
        prediction_set.append(role)
        cumulative += conf
        if cumulative >= coverage:
            break

    # If we still haven't reached coverage, include all
    if cumulative < coverage:
        prediction_set = [role for role, _ in sorted_preds]

    # Compute marginal coverage (achieved coverage)
    total_conf = sum(c for _, c in sorted_preds)
    marginal = min(cumulative / total_conf, 1.0) if total_conf > 0 else 0.0

    # Calibration error (simplified — deviation from target)
    cal_error = abs(marginal - coverage)

    return ConformalSet(
        coverage_level=coverage,
        prediction_sets={"default": prediction_set},
        set_sizes={"default": len(prediction_set)},
        marginal_coverage=marginal,
        calibration_error=cal_error,
        calibration_method="APS",
        calibration_set_size=len(predictions),
    )

calibrate

calibrate(
    predictions: list[tuple[float, bool]],
) -> CalibrationReport

Compute calibration metrics from predictions vs. actual outcomes.

Uses equal-width binning (10 bins spanning [0, 1]) to compute:

  • ECE (Expected Calibration Error): weighted average of per-bin |accuracy - confidence|.
  • Per-bin accuracy: fraction of correct predictions in each bin.
  • Per-bin confidence: mean predicted probability in each bin.
  • Marginal coverage: overall fraction of correct predictions.
PARAMETER DESCRIPTION
predictions

List of (predicted_probability, actual_outcome) tuples. predicted_probability is in [0, 1]; actual_outcome is True if the prediction was correct.

TYPE: list[tuple[float, bool]]

RETURNS DESCRIPTION
CalibrationReport

Calibration metrics including ECE and per-bin breakdowns. Returns an empty report (ECE=0) if no predictions are provided.

Notes

A perfectly calibrated model has ECE=0 and bin accuracies that match bin confidences (i.e., the reliability diagram is a 45-degree line). See Naeini et al. (2015) for ECE methodology.

Source code in src/music_attribution/attribution/conformal.py
def calibrate(
    self,
    predictions: list[tuple[float, bool]],
) -> CalibrationReport:
    """Compute calibration metrics from predictions vs. actual outcomes.

    Uses equal-width binning (10 bins spanning [0, 1]) to compute:

    - **ECE** (Expected Calibration Error): weighted average of
      per-bin ``|accuracy - confidence|``.
    - **Per-bin accuracy**: fraction of correct predictions in each bin.
    - **Per-bin confidence**: mean predicted probability in each bin.
    - **Marginal coverage**: overall fraction of correct predictions.

    Parameters
    ----------
    predictions : list[tuple[float, bool]]
        List of ``(predicted_probability, actual_outcome)`` tuples.
        ``predicted_probability`` is in [0, 1]; ``actual_outcome``
        is ``True`` if the prediction was correct.

    Returns
    -------
    CalibrationReport
        Calibration metrics including ECE and per-bin breakdowns.
        Returns an empty report (ECE=0) if no predictions are provided.

    Notes
    -----
    A perfectly calibrated model has ECE=0 and bin accuracies that
    match bin confidences (i.e., the reliability diagram is a 45-degree
    line). See Naeini et al. (2015) for ECE methodology.
    """
    if not predictions:
        return CalibrationReport(
            ece=0.0,
            marginal_coverage=0.0,
            target_coverage=0.9,
            calibration_method="APS",
            calibration_set_size=0,
        )

    # Bin predictions into 10 bins
    n_bins = 10
    bin_sums: list[float] = [0.0] * n_bins
    bin_correct: list[int] = [0] * n_bins
    bin_counts: list[int] = [0] * n_bins

    for prob, actual in predictions:
        bin_idx = min(int(prob * n_bins), n_bins - 1)
        bin_sums[bin_idx] += prob
        bin_correct[bin_idx] += int(actual)
        bin_counts[bin_idx] += 1

    # Compute per-bin accuracy and confidence
    bin_accuracies: list[float] = []
    bin_confidences: list[float] = []
    ece = 0.0
    total = len(predictions)

    for i in range(n_bins):
        if bin_counts[i] > 0:
            acc = bin_correct[i] / bin_counts[i]
            conf = bin_sums[i] / bin_counts[i]
            bin_accuracies.append(acc)
            bin_confidences.append(conf)
            ece += (bin_counts[i] / total) * abs(acc - conf)
        else:
            bin_accuracies.append(0.0)
            bin_confidences.append(0.0)

    # Compute actual coverage
    correct_count = sum(1 for _, actual in predictions if actual)
    marginal_coverage = correct_count / total if total > 0 else 0.0

    return CalibrationReport(
        ece=ece,
        marginal_coverage=marginal_coverage,
        target_coverage=0.9,
        calibration_method="APS",
        calibration_set_size=total,
        bin_accuracies=bin_accuracies,
        bin_confidences=bin_confidences,
    )

Persistence

persistence

AttributionRecord persistence repositories.

Provides two repository implementations for AttributionRecord storage:

  • AttributionRecordRepository: In-memory storage for development and testing. No database required; data is lost on process exit.
  • AsyncAttributionRepository: Async PostgreSQL storage via SQLAlchemy AsyncSession. Production-grade with ACID guarantees.

Both repositories expose the same async interface:

  • store() -- persist a new attribution record.
  • update() -- update an existing record (auto-increments version).
  • find_by_id() -- lookup by attribution UUID.
  • find_by_work_entity_id() -- lookup by work entity UUID.
  • find_needs_review() -- fetch records flagged for human review.

Every update appends a ProvenanceEvent to the record's provenance chain, creating an immutable audit trail.

Notes

The provenance chain is a key component of the attribution-by-design philosophy described in Teikari (2026), Section 5.3. Every change to an attribution record is recorded with timestamp, agent, and details.

See Also

music_attribution.schemas.attribution : Pydantic models for attribution. music_attribution.db.models.AttributionRecordModel : SQLAlchemy ORM model.

AttributionRecordRepository

AttributionRecordRepository()

In-memory repository for AttributionRecord persistence.

Provides the same async interface as AsyncAttributionRepository so that dev/test code can be written against the same API. Records are stored as deep copies to prevent mutation through references.

ATTRIBUTE DESCRIPTION
_records

In-memory record storage keyed by attribution_id.

TYPE: dict[UUID, AttributionRecord]

Source code in src/music_attribution/attribution/persistence.py
def __init__(self) -> None:
    self._records: dict[uuid.UUID, AttributionRecord] = {}

store async

store(record: AttributionRecord) -> UUID

Store an attribution record (deep copy).

PARAMETER DESCRIPTION
record

The attribution record to store.

TYPE: AttributionRecord

RETURNS DESCRIPTION
UUID

The attribution_id of the stored record.

Source code in src/music_attribution/attribution/persistence.py
async def store(self, record: AttributionRecord) -> uuid.UUID:
    """Store an attribution record (deep copy).

    Parameters
    ----------
    record : AttributionRecord
        The attribution record to store.

    Returns
    -------
    uuid.UUID
        The ``attribution_id`` of the stored record.
    """
    self._records[record.attribution_id] = copy.deepcopy(record)
    return record.attribution_id

update async

update(record: AttributionRecord) -> UUID

Update an existing attribution record with provenance tracking.

Increments the version number, updates the timestamp, and appends an UPDATE provenance event to the record's provenance chain.

PARAMETER DESCRIPTION
record

The record to update.

TYPE: AttributionRecord

RETURNS DESCRIPTION
UUID

The attribution_id of the updated record.

Source code in src/music_attribution/attribution/persistence.py
async def update(self, record: AttributionRecord) -> uuid.UUID:
    """Update an existing attribution record with provenance tracking.

    Increments the version number, updates the timestamp, and appends
    an ``UPDATE`` provenance event to the record's provenance chain.

    Parameters
    ----------
    record : AttributionRecord
        The record to update.

    Returns
    -------
    uuid.UUID
        The ``attribution_id`` of the updated record.
    """
    now = datetime.now(UTC)
    old_version = record.version

    updated = record.model_copy(
        update={
            "version": old_version + 1,
            "updated_at": now,
            "provenance_chain": [
                *record.provenance_chain,
                ProvenanceEvent(
                    event_type=ProvenanceEventTypeEnum.UPDATE,
                    timestamp=now,
                    agent="system",
                    details=UpdateEventDetails(
                        previous_version=old_version,
                        new_version=old_version + 1,
                        fields_changed=["version", "updated_at"],
                        trigger="repository_update",
                    ),
                ),
            ],
        },
    )

    self._records[updated.attribution_id] = updated
    return updated.attribution_id

find_by_id async

find_by_id(
    attribution_id: UUID,
) -> AttributionRecord | None

Find an attribution record by its UUID.

PARAMETER DESCRIPTION
attribution_id

The attribution record UUID to look up.

TYPE: UUID

RETURNS DESCRIPTION
AttributionRecord | None

Deep copy of the record if found, None otherwise.

Source code in src/music_attribution/attribution/persistence.py
async def find_by_id(self, attribution_id: uuid.UUID) -> AttributionRecord | None:
    """Find an attribution record by its UUID.

    Parameters
    ----------
    attribution_id : uuid.UUID
        The attribution record UUID to look up.

    Returns
    -------
    AttributionRecord | None
        Deep copy of the record if found, ``None`` otherwise.
    """
    record = self._records.get(attribution_id)
    return copy.deepcopy(record) if record is not None else None

find_by_work_entity_id async

find_by_work_entity_id(
    work_entity_id: UUID,
) -> AttributionRecord | None

Find an attribution record by work entity ID.

PARAMETER DESCRIPTION
work_entity_id

The work entity UUID.

TYPE: UUID

RETURNS DESCRIPTION
AttributionRecord | None

Deep copy of the first matching record, or None.

Source code in src/music_attribution/attribution/persistence.py
async def find_by_work_entity_id(
    self,
    work_entity_id: uuid.UUID,
) -> AttributionRecord | None:
    """Find an attribution record by work entity ID.

    Parameters
    ----------
    work_entity_id : uuid.UUID
        The work entity UUID.

    Returns
    -------
    AttributionRecord | None
        Deep copy of the first matching record, or ``None``.
    """
    for record in self._records.values():
        if record.work_entity_id == work_entity_id:
            return copy.deepcopy(record)
    return None

find_needs_review async

find_needs_review(
    limit: int = 50,
) -> list[AttributionRecord]

Find records that need human review, sorted by priority descending.

PARAMETER DESCRIPTION
limit

Maximum number of records to return. Default is 50.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
list[AttributionRecord]

Deep copies of records with needs_review=True, sorted by review_priority descending (highest priority first).

Source code in src/music_attribution/attribution/persistence.py
async def find_needs_review(self, limit: int = 50) -> list[AttributionRecord]:
    """Find records that need human review, sorted by priority descending.

    Parameters
    ----------
    limit : int, optional
        Maximum number of records to return. Default is 50.

    Returns
    -------
    list[AttributionRecord]
        Deep copies of records with ``needs_review=True``, sorted
        by ``review_priority`` descending (highest priority first).
    """
    needs_review = [copy.deepcopy(r) for r in self._records.values() if r.needs_review]
    needs_review.sort(key=lambda r: r.review_priority, reverse=True)
    return needs_review[:limit]

AsyncAttributionRepository

Async PostgreSQL repository for AttributionRecord persistence.

Production-grade repository using SQLAlchemy AsyncSession for database access. All methods require an active session; the caller is responsible for transaction management (commit/rollback).

Provides the same logical operations as AttributionRecordRepository but backed by PostgreSQL with ACID guarantees and JSONB storage for nested Pydantic models.

store async

store(
    record: AttributionRecord, session: AsyncSession
) -> UUID

Store an attribution record in PostgreSQL.

PARAMETER DESCRIPTION
record

The attribution record to store.

TYPE: AttributionRecord

session

Active async database session.

TYPE: AsyncSession

RETURNS DESCRIPTION
UUID

The attribution_id of the stored record.

Source code in src/music_attribution/attribution/persistence.py
async def store(self, record: AttributionRecord, session: AsyncSession) -> uuid.UUID:
    """Store an attribution record in PostgreSQL.

    Parameters
    ----------
    record : AttributionRecord
        The attribution record to store.
    session : AsyncSession
        Active async database session.

    Returns
    -------
    uuid.UUID
        The ``attribution_id`` of the stored record.
    """
    model = _record_to_model(record)
    session.add(model)
    await session.flush()
    return record.attribution_id

update async

update(
    record: AttributionRecord, session: AsyncSession
) -> UUID

Update an existing attribution record with provenance tracking.

Increments the version number, updates the timestamp, appends an UPDATE provenance event, and persists changes via session.flush().

PARAMETER DESCRIPTION
record

The record to update.

TYPE: AttributionRecord

session

Active async database session.

TYPE: AsyncSession

RETURNS DESCRIPTION
UUID

The attribution_id of the updated record.

RAISES DESCRIPTION
NoResultFound

If no record with the given attribution_id exists.

Source code in src/music_attribution/attribution/persistence.py
async def update(self, record: AttributionRecord, session: AsyncSession) -> uuid.UUID:
    """Update an existing attribution record with provenance tracking.

    Increments the version number, updates the timestamp, appends an
    ``UPDATE`` provenance event, and persists changes via
    ``session.flush()``.

    Parameters
    ----------
    record : AttributionRecord
        The record to update.
    session : AsyncSession
        Active async database session.

    Returns
    -------
    uuid.UUID
        The ``attribution_id`` of the updated record.

    Raises
    ------
    sqlalchemy.exc.NoResultFound
        If no record with the given ``attribution_id`` exists.
    """
    now = datetime.now(UTC)
    old_version = record.version

    updated = record.model_copy(
        update={
            "version": old_version + 1,
            "updated_at": now,
            "provenance_chain": [
                *record.provenance_chain,
                ProvenanceEvent(
                    event_type=ProvenanceEventTypeEnum.UPDATE,
                    timestamp=now,
                    agent="system",
                    details=UpdateEventDetails(
                        previous_version=old_version,
                        new_version=old_version + 1,
                        fields_changed=["version", "updated_at"],
                        trigger="repository_update",
                    ),
                ),
            ],
        },
    )

    stmt = select(AttributionRecordModel).where(
        AttributionRecordModel.attribution_id == record.attribution_id,
    )
    result = await session.execute(stmt)
    existing = result.scalar_one()

    existing.version = updated.version
    existing.updated_at = updated.updated_at  # type: ignore[assignment]
    existing.provenance_chain = [e.model_dump(mode="json") for e in updated.provenance_chain]  # type: ignore[assignment]
    existing.confidence_score = updated.confidence_score
    existing.needs_review = updated.needs_review
    existing.review_priority = updated.review_priority
    existing.credits = [c.model_dump(mode="json") for c in updated.credits]  # type: ignore[assignment]

    await session.flush()
    return updated.attribution_id

find_by_id async

find_by_id(
    attribution_id: UUID, session: AsyncSession
) -> AttributionRecord | None

Find an attribution record by its UUID.

PARAMETER DESCRIPTION
attribution_id

The attribution record UUID to look up.

TYPE: UUID

session

Active async database session.

TYPE: AsyncSession

RETURNS DESCRIPTION
AttributionRecord | None

The validated Pydantic record if found, None otherwise.

Source code in src/music_attribution/attribution/persistence.py
async def find_by_id(
    self,
    attribution_id: uuid.UUID,
    session: AsyncSession,
) -> AttributionRecord | None:
    """Find an attribution record by its UUID.

    Parameters
    ----------
    attribution_id : uuid.UUID
        The attribution record UUID to look up.
    session : AsyncSession
        Active async database session.

    Returns
    -------
    AttributionRecord | None
        The validated Pydantic record if found, ``None`` otherwise.
    """
    stmt = select(AttributionRecordModel).where(
        AttributionRecordModel.attribution_id == attribution_id,
    )
    result = await session.execute(stmt)
    model = result.scalar_one_or_none()
    return _model_to_record(model) if model is not None else None

find_by_work_entity_id async

find_by_work_entity_id(
    work_entity_id: UUID, session: AsyncSession
) -> AttributionRecord | None

Find the most recent attribution record for a work entity.

Returns the record with the highest version number if multiple versions exist for the same work entity.

PARAMETER DESCRIPTION
work_entity_id

The work entity UUID.

TYPE: UUID

session

Active async database session.

TYPE: AsyncSession

RETURNS DESCRIPTION
AttributionRecord | None

The most recent record for this work, or None.

Source code in src/music_attribution/attribution/persistence.py
async def find_by_work_entity_id(
    self,
    work_entity_id: uuid.UUID,
    session: AsyncSession,
) -> AttributionRecord | None:
    """Find the most recent attribution record for a work entity.

    Returns the record with the highest version number if multiple
    versions exist for the same work entity.

    Parameters
    ----------
    work_entity_id : uuid.UUID
        The work entity UUID.
    session : AsyncSession
        Active async database session.

    Returns
    -------
    AttributionRecord | None
        The most recent record for this work, or ``None``.
    """
    stmt = (
        select(AttributionRecordModel)
        .where(AttributionRecordModel.work_entity_id == work_entity_id)
        .order_by(AttributionRecordModel.version.desc())
        .limit(1)
    )
    result = await session.execute(stmt)
    model = result.scalar_one_or_none()
    return _model_to_record(model) if model is not None else None

find_needs_review async

find_needs_review(
    limit: int = 50, *, session: AsyncSession
) -> list[AttributionRecord]

Find records that need human review, sorted by priority descending.

PARAMETER DESCRIPTION
limit

Maximum number of records to return. Default is 50.

TYPE: int DEFAULT: 50

session

Active async database session.

TYPE: AsyncSession

RETURNS DESCRIPTION
list[AttributionRecord]

Records with needs_review=True, sorted by review_priority descending (highest priority first).

Source code in src/music_attribution/attribution/persistence.py
async def find_needs_review(
    self,
    limit: int = 50,
    *,
    session: AsyncSession,
) -> list[AttributionRecord]:
    """Find records that need human review, sorted by priority descending.

    Parameters
    ----------
    limit : int, optional
        Maximum number of records to return. Default is 50.
    session : AsyncSession
        Active async database session.

    Returns
    -------
    list[AttributionRecord]
        Records with ``needs_review=True``, sorted by
        ``review_priority`` descending (highest priority first).
    """
    stmt = (
        select(AttributionRecordModel)
        .where(AttributionRecordModel.needs_review.is_(True))
        .order_by(AttributionRecordModel.review_priority.desc())
        .limit(limit)
    )
    result = await session.execute(stmt)
    return [_model_to_record(m) for m in result.scalars().all()]

list_all async

list_all(
    limit: int = 50,
    offset: int = 0,
    *,
    session: AsyncSession,
) -> list[AttributionRecord]

List all attribution records with pagination.

Returns records ordered by created_at ascending (oldest first).

PARAMETER DESCRIPTION
limit

Maximum number of records to return. Default is 50.

TYPE: int DEFAULT: 50

offset

Number of records to skip for pagination. Default is 0.

TYPE: int DEFAULT: 0

session

Active async database session.

TYPE: AsyncSession

RETURNS DESCRIPTION
list[AttributionRecord]

Paginated list of attribution records.

Source code in src/music_attribution/attribution/persistence.py
async def list_all(
    self,
    limit: int = 50,
    offset: int = 0,
    *,
    session: AsyncSession,
) -> list[AttributionRecord]:
    """List all attribution records with pagination.

    Returns records ordered by ``created_at`` ascending (oldest first).

    Parameters
    ----------
    limit : int, optional
        Maximum number of records to return. Default is 50.
    offset : int, optional
        Number of records to skip for pagination. Default is 0.
    session : AsyncSession
        Active async database session.

    Returns
    -------
    list[AttributionRecord]
        Paginated list of attribution records.
    """
    stmt = select(AttributionRecordModel).order_by(AttributionRecordModel.created_at).offset(offset).limit(limit)
    result = await session.execute(stmt)
    return [_model_to_record(m) for m in result.scalars().all()]

Priority Queue

priority_queue

Active learning priority queue for attribution review.

Ranks AttributionRecord objects by review priority for human experts. Uses a multi-factor priority formula combining five signals:

  1. Boundary proximity (weight=0.30): Records near the 0.5 confidence decision boundary are most informative for active learning, as human feedback on these cases maximally reduces model uncertainty.
  2. Source disagreement (weight=0.25): Low inter-source agreement indicates conflicting evidence that needs human arbitration.
  3. Ambiguity (weight=0.15): Large conformal prediction set sizes indicate genuine uncertainty about the credit role.
  4. Never-reviewed penalty (weight=0.15): Records that have never been reviewed (version=1) are prioritized over already-reviewed ones.
  5. Staleness (weight=0.15): Records not updated for 30+ days are prioritized to ensure periodic re-validation.
Notes

This implements the active learning review queue described in Teikari (2026), Section 5.4. The boundary proximity criterion follows the uncertainty sampling strategy from Settles (2009), "Active Learning Literature Survey."

See Also

music_attribution.attribution.persistence : Persistence layer that provides find_needs_review() for initial candidate retrieval. music_attribution.attribution.aggregator : Produces the confidence and agreement scores used for priority computation.

ReviewPriorityQueue

ReviewPriorityQueue(
    weights: dict[str, float] | None = None,
)

Priority queue for attribution review.

Ranks records by a composite priority score combining five factors that indicate review urgency. Higher scores mean higher priority for human expert review.

PARAMETER DESCRIPTION
weights

Override the default factor weights. Keys must be: "boundary", "disagreement", "ambiguity", "never_reviewed", "staleness". Defaults to _WEIGHTS.

TYPE: dict[str, float] | None DEFAULT: None

ATTRIBUTE DESCRIPTION
_weights

Active priority factor weights.

TYPE: dict[str, float]

Source code in src/music_attribution/attribution/priority_queue.py
def __init__(self, weights: dict[str, float] | None = None) -> None:
    self._weights = weights or _WEIGHTS

compute_priority

compute_priority(record: AttributionRecord) -> float

Compute review priority score for an attribution record.

Combines five weighted factors into a single priority score. Higher scores indicate higher review urgency.

PARAMETER DESCRIPTION
record

The attribution record to prioritize.

TYPE: AttributionRecord

RETURNS DESCRIPTION
float

Priority score clamped to range [0.0, 1.0].

Source code in src/music_attribution/attribution/priority_queue.py
def compute_priority(self, record: AttributionRecord) -> float:
    """Compute review priority score for an attribution record.

    Combines five weighted factors into a single priority score.
    Higher scores indicate higher review urgency.

    Parameters
    ----------
    record : AttributionRecord
        The attribution record to prioritize.

    Returns
    -------
    float
        Priority score clamped to range [0.0, 1.0].
    """
    boundary = self._boundary_score(record.confidence_score)
    disagreement = self._disagreement_score(record.source_agreement)
    ambiguity = self._ambiguity_score(record)
    never_reviewed = self._never_reviewed_score(record.version)
    staleness = self._staleness_score(record.updated_at)

    priority = (
        self._weights["boundary"] * boundary
        + self._weights["disagreement"] * disagreement
        + self._weights["ambiguity"] * ambiguity
        + self._weights["never_reviewed"] * never_reviewed
        + self._weights["staleness"] * staleness
    )

    return min(max(priority, 0.0), 1.0)

next_for_review

next_for_review(
    records: list[AttributionRecord], limit: int = 10
) -> list[AttributionRecord]

Return top records needing review, sorted by priority.

Scores all input records and returns the top limit by descending priority. Does not filter by needs_review flag; the caller should pre-filter if desired.

PARAMETER DESCRIPTION
records

All attribution records to consider.

TYPE: list[AttributionRecord]

limit

Maximum number of records to return. Default is 10.

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
list[AttributionRecord]

Records sorted by priority (highest first), limited to limit entries.

Source code in src/music_attribution/attribution/priority_queue.py
def next_for_review(
    self,
    records: list[AttributionRecord],
    limit: int = 10,
) -> list[AttributionRecord]:
    """Return top records needing review, sorted by priority.

    Scores all input records and returns the top ``limit`` by
    descending priority. Does not filter by ``needs_review`` flag;
    the caller should pre-filter if desired.

    Parameters
    ----------
    records : list[AttributionRecord]
        All attribution records to consider.
    limit : int, optional
        Maximum number of records to return. Default is 10.

    Returns
    -------
    list[AttributionRecord]
        Records sorted by priority (highest first), limited to
        ``limit`` entries.
    """
    scored = [(self.compute_priority(r), r) for r in records]
    scored.sort(key=lambda x: x[0], reverse=True)
    return [r for _, r in scored[:limit]]