Skip to content

Entity Resolution

Multi-strategy resolution cascade: identifier match → string similarity → embedding match → LLM disambiguation → Splink linkage.

Orchestrator

orchestrator

Multi-signal resolution orchestrator.

Combines all resolution methods (identifier, string, embedding, Splink, graph, and LLM) into a single pipeline. Produces ResolvedEntity objects with per-method confidence breakdowns and A0-A3 assurance levels.

The orchestrator implements a cascade pattern: cheap deterministic methods (identifier matching) run first, and more expensive probabilistic methods (embedding, Splink, LLM) only fire for records that remain unresolved. Signal weights are configurable per deployment.

Notes

This is the top-level entry point for Pipeline 2 (Entity Resolution) in the five-pipeline architecture. See Teikari (2026), Section 4 for the theoretical framework behind multi-signal resolution and the A0-A3 assurance level mapping.

See Also

music_attribution.resolution.identifier_match : Stage 1 -- exact ID matching. music_attribution.resolution.string_similarity : Stage 2 -- fuzzy name matching. music_attribution.resolution.embedding_match : Stage 3 -- semantic similarity. music_attribution.resolution.splink_linkage : Stage 4 -- probabilistic linkage. music_attribution.resolution.graph_resolution : Stage 5 -- graph evidence. music_attribution.resolution.llm_disambiguation : Stage 6 -- LLM tie-breaking.

ResolutionOrchestrator

ResolutionOrchestrator(
    weights: dict[str, float] | None = None,
)

Orchestrate multi-signal entity resolution.

Combines identifier matching, string similarity, embedding similarity, Splink, graph evidence, and LLM disambiguation into a unified pipeline. Each signal contributes a weighted score; the final confidence is the weighted average of all active signals.

PARAMETER DESCRIPTION
weights

Per-method weight overrides for score combination. Keys are method names ("identifier", "splink", "string", "embedding", "graph", "llm"). Defaults to _DEFAULT_WEIGHTS.

TYPE: dict[str, float] | None DEFAULT: None

ATTRIBUTE DESCRIPTION
_weights

Active signal weights.

TYPE: dict[str, float]

_id_matcher

Stage 1 identifier matcher.

TYPE: IdentifierMatcher

_string_matcher

Stage 2 string similarity matcher.

TYPE: StringSimilarityMatcher

Examples:

>>> orchestrator = ResolutionOrchestrator()
>>> entities = await orchestrator.resolve(normalized_records)
Source code in src/music_attribution/resolution/orchestrator.py
def __init__(self, weights: dict[str, float] | None = None) -> None:
    self._weights = weights or _DEFAULT_WEIGHTS
    self._id_matcher = IdentifierMatcher()
    self._string_matcher = StringSimilarityMatcher()

resolve async

resolve(
    records: list[NormalizedRecord],
) -> list[ResolvedEntity]

Resolve a list of NormalizedRecords into ResolvedEntities.

Executes the resolution cascade in order:

  1. Group records by shared identifiers (exact match).
  2. For ungrouped records, attempt string-similarity grouping.
  3. Remaining singletons form their own groups.
  4. Each group is resolved into a ResolvedEntity with confidence scores and assurance levels.
PARAMETER DESCRIPTION
records

Input records from the ETL pipeline. Each record represents a single source's view of an entity (artist, work, recording).

TYPE: list[NormalizedRecord]

RETURNS DESCRIPTION
list[ResolvedEntity]

One ResolvedEntity per distinct entity discovered. Each contains per-method confidence breakdown and merged identifiers.

Notes

The cascade ordering ensures that high-confidence deterministic matches are found first, reducing the workload for expensive probabilistic methods downstream.

Source code in src/music_attribution/resolution/orchestrator.py
async def resolve(self, records: list[NormalizedRecord]) -> list[ResolvedEntity]:
    """Resolve a list of NormalizedRecords into ResolvedEntities.

    Executes the resolution cascade in order:

    1. Group records by shared identifiers (exact match).
    2. For ungrouped records, attempt string-similarity grouping.
    3. Remaining singletons form their own groups.
    4. Each group is resolved into a ``ResolvedEntity`` with confidence
       scores and assurance levels.

    Parameters
    ----------
    records : list[NormalizedRecord]
        Input records from the ETL pipeline. Each record represents a
        single source's view of an entity (artist, work, recording).

    Returns
    -------
    list[ResolvedEntity]
        One ``ResolvedEntity`` per distinct entity discovered. Each
        contains per-method confidence breakdown and merged identifiers.

    Notes
    -----
    The cascade ordering ensures that high-confidence deterministic
    matches are found first, reducing the workload for expensive
    probabilistic methods downstream.
    """
    if not records:
        return []

    # Step 1: Group by shared identifiers
    groups = self._group_by_identifiers(records)

    # Step 2: For ungrouped records, try string similarity
    grouped_indices: set[int] = set()
    for group in groups:
        grouped_indices.update(group)

    ungrouped = [i for i in range(len(records)) if i not in grouped_indices]
    if ungrouped:
        string_groups = self._group_by_string_similarity(records, ungrouped)
        groups.extend(string_groups)
        for g in string_groups:
            grouped_indices.update(g)

    # Step 3: Singleton groups for remaining ungrouped
    for i in range(len(records)):
        if i not in grouped_indices:
            groups.append([i])

    # Step 4: Resolve each group into a ResolvedEntity
    entities = []
    for group in groups:
        group_records = [records[i] for i in group]
        entity = await self.resolve_group(group_records)
        entities.append(entity)

    return entities

resolve_group async

resolve_group(
    records: list[NormalizedRecord],
) -> ResolvedEntity

Resolve a pre-clustered group of records into a single entity.

Merges identifiers, picks the canonical name, detects cross-source conflicts, and computes a weighted confidence score from all available resolution signals.

PARAMETER DESCRIPTION
records

Pre-clustered records believed to represent the same entity. Must contain at least one record.

TYPE: list[NormalizedRecord]

RETURNS DESCRIPTION
ResolvedEntity

A merged entity with combined identifiers, canonical name, per-method confidence breakdown, and assurance level.

Notes

Records with confidence below _REVIEW_THRESHOLD (0.5) are automatically flagged for human review in the attribution pipeline.

Source code in src/music_attribution/resolution/orchestrator.py
async def resolve_group(self, records: list[NormalizedRecord]) -> ResolvedEntity:
    """Resolve a pre-clustered group of records into a single entity.

    Merges identifiers, picks the canonical name, detects cross-source
    conflicts, and computes a weighted confidence score from all
    available resolution signals.

    Parameters
    ----------
    records : list[NormalizedRecord]
        Pre-clustered records believed to represent the same entity.
        Must contain at least one record.

    Returns
    -------
    ResolvedEntity
        A merged entity with combined identifiers, canonical name,
        per-method confidence breakdown, and assurance level.

    Notes
    -----
    Records with confidence below ``_REVIEW_THRESHOLD`` (0.5) are
    automatically flagged for human review in the attribution pipeline.
    """
    # Determine resolution method and compute details
    details = self._compute_resolution_details(records)
    method = self._determine_method(records, details)
    confidence = self._compute_confidence(details)
    assurance = self._compute_assurance_level(records)
    conflicts = self._detect_conflicts(records)
    canonical = self._pick_canonical_name(records)
    alt_names = list({r.canonical_name for r in records if r.canonical_name != canonical})

    needs_review = confidence < _REVIEW_THRESHOLD
    review_reason = f"Low confidence ({confidence:.2f})" if needs_review else None

    # Merge identifiers
    merged_ids = self._merge_identifiers(records)

    # Source references
    source_refs = [
        SourceReference(
            record_id=uuid.uuid4(),
            source=r.source,
            source_id=r.source_id,
            agreement_score=confidence,
        )
        for r in records
    ]

    return ResolvedEntity(
        entity_type=records[0].entity_type,
        canonical_name=canonical,
        alternative_names=alt_names,
        identifiers=merged_ids,
        source_records=source_refs,
        resolution_method=method,
        resolution_confidence=confidence,
        resolution_details=details,
        assurance_level=assurance,
        conflicts=conflicts,
        needs_review=needs_review,
        review_reason=review_reason,
        resolved_at=datetime.now(UTC),
    )

Identifier Match

identifier_match

Identifier-based exact matching for entity resolution.

Stage 1 of the resolution cascade. The simplest and highest-confidence resolution method: if two records share the same ISRC, ISWC, ISNI, or MBID, they refer to the same entity with confidence approaching 1.0.

Standardized identifiers provide the strongest resolution signal because they are globally unique by design:

  • ISRC (International Standard Recording Code) -- identifies recordings
  • ISWC (International Standard Musical Work Code) -- identifies compositions
  • ISNI (International Standard Name Identifier) -- identifies contributors
  • MBID (MusicBrainz Identifier) -- MusicBrainz-specific stable UUID
  • AcoustID -- acoustic fingerprint identifier
Notes

This module implements the deterministic resolution layer described in Teikari (2026), Section 4.1. Because standardized identifiers are globally unique, matches found here bypass all downstream probabilistic methods and receive A1+ assurance levels automatically.

See Also

music_attribution.resolution.orchestrator : Cascade coordinator that calls this first. music_attribution.resolution.string_similarity : Fallback for records without identifiers.

IdentifierMatcher

Resolve entities by exact identifier matching.

Two NormalizedRecord objects sharing any standard identifier (ISRC, ISWC, ISNI, MBID, AcoustID) are considered the same entity. This is the highest-confidence resolution strategy because standardized identifiers are globally unique by design.

The matcher uses a union-find data structure with path compression to efficiently cluster records that share identifiers, even transitively (e.g., record A shares ISRC with B, and B shares MBID with C, so A, B, C are all the same entity).

Notes

This is Stage 1 of the resolution cascade. Records matched here bypass all downstream probabilistic methods. See Teikari (2026), Section 4.1.

See Also

music_attribution.resolution.orchestrator : Cascade coordinator. music_attribution.resolution.string_similarity : Stage 2 fallback.

match

match(
    records: list[NormalizedRecord],
) -> list[ResolvedEntity]

Match records by shared identifiers and produce ResolvedEntities.

Groups records using union-find on shared identifier values, then builds a ResolvedEntity for each group with merged identifiers, conflict detection, and assurance level computation.

PARAMETER DESCRIPTION
records

Input records to match. Records without any identifiers will form singleton groups.

TYPE: list[NormalizedRecord]

RETURNS DESCRIPTION
list[ResolvedEntity]

One entity per distinct group found. Multi-record groups have resolution method EXACT_ID and confidence >= 0.7.

Source code in src/music_attribution/resolution/identifier_match.py
def match(self, records: list[NormalizedRecord]) -> list[ResolvedEntity]:
    """Match records by shared identifiers and produce ResolvedEntities.

    Groups records using union-find on shared identifier values, then
    builds a ``ResolvedEntity`` for each group with merged identifiers,
    conflict detection, and assurance level computation.

    Parameters
    ----------
    records : list[NormalizedRecord]
        Input records to match. Records without any identifiers will
        form singleton groups.

    Returns
    -------
    list[ResolvedEntity]
        One entity per distinct group found. Multi-record groups have
        resolution method ``EXACT_ID`` and confidence >= 0.7.
    """
    if not records:
        return []

    # Build groups using union-find by shared identifiers
    groups = self._group_by_identifiers(records)

    entities: list[ResolvedEntity] = []
    for group in groups:
        entity = self._build_entity(group)
        entities.append(entity)

    return entities

String Similarity

string_similarity

String similarity matching for entity resolution.

Stage 2 of the resolution cascade. Fast fuzzy matching for entity names using Jaro-Winkler distance (via jellyfish) and token-sort ratio (via thefuzz). Handles common music-domain variations:

  • "The" prefix reordering ("Beatles, The" -> "the beatles")
  • Accented character normalization ("Bjork" matches "Bjork")
  • Abbreviation expansion ("feat." -> "featuring", "ft." -> "featuring")
  • Whitespace normalization

The two similarity algorithms are complementary:

  • Jaro-Winkler excels at short strings and character-level typos.
  • Token-sort ratio handles word reordering ("John Elton" matches "Elton John").
Notes

This module implements the fuzzy string matching layer described in Teikari (2026), Section 4.2. It fires only for records that were not matched by exact identifiers in Stage 1.

See Also

music_attribution.resolution.identifier_match : Stage 1 (runs before this). music_attribution.resolution.embedding_match : Stage 3 (semantic similarity).

StringSimilarityMatcher

StringSimilarityMatcher(threshold: float = 0.85)

String similarity matcher for music entity names.

Combines Jaro-Winkler similarity (good for short strings and typos) with token-sort ratio (good for word reordering) for robust matching. Takes the maximum of both scores for each comparison.

PARAMETER DESCRIPTION
threshold

Minimum similarity score (0.0-1.0) to consider a match. Default is 0.85, which balances precision and recall for typical music entity names.

TYPE: float DEFAULT: 0.85

ATTRIBUTE DESCRIPTION
_threshold

Active similarity threshold.

TYPE: float

See Also

music_attribution.resolution.orchestrator.ResolutionOrchestrator : Uses this as Stage 2.

Source code in src/music_attribution/resolution/string_similarity.py
def __init__(self, threshold: float = 0.85) -> None:
    self._threshold = threshold

score

score(name_a: str, name_b: str) -> float

Compute similarity score between two entity names.

Both names are normalized (accent stripping, abbreviation expansion, lowercase) before comparison. The score is the maximum of Jaro-Winkler similarity and token-sort ratio.

PARAMETER DESCRIPTION
name_a

First entity name (raw, unnormalized).

TYPE: str

name_b

Second entity name (raw, unnormalized).

TYPE: str

RETURNS DESCRIPTION
float

Similarity score in range [0.0, 1.0]. Returns 1.0 for exact matches after normalization.

Source code in src/music_attribution/resolution/string_similarity.py
def score(self, name_a: str, name_b: str) -> float:
    """Compute similarity score between two entity names.

    Both names are normalized (accent stripping, abbreviation expansion,
    lowercase) before comparison. The score is the maximum of
    Jaro-Winkler similarity and token-sort ratio.

    Parameters
    ----------
    name_a : str
        First entity name (raw, unnormalized).
    name_b : str
        Second entity name (raw, unnormalized).

    Returns
    -------
    float
        Similarity score in range [0.0, 1.0]. Returns 1.0 for
        exact matches after normalization.
    """
    norm_a = _normalize_name(name_a)
    norm_b = _normalize_name(name_b)

    if norm_a == norm_b:
        return 1.0

    # Jaro-Winkler: good for short strings and typos
    jw_score = jellyfish.jaro_winkler_similarity(norm_a, norm_b)

    # Token sort ratio: handles word reordering
    token_score = fuzz.token_sort_ratio(norm_a, norm_b) / 100.0

    # Take the max of both scores
    return float(max(jw_score, token_score))

find_candidates

find_candidates(
    name: str,
    corpus: list[str],
    threshold: float | None = None,
) -> list[tuple[str, float]]

Find candidate matches from a corpus above the similarity threshold.

Compares name against every entry in corpus and returns those exceeding the threshold, sorted by descending score.

PARAMETER DESCRIPTION
name

Query name to search for.

TYPE: str

corpus

List of candidate names to compare against.

TYPE: list[str]

threshold

Override the instance threshold for this query. If None, uses the threshold set at construction time.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
list[tuple[str, float]]

Candidate matches as (name, score) tuples, sorted by score descending. Empty list if no matches exceed threshold.

Source code in src/music_attribution/resolution/string_similarity.py
def find_candidates(
    self,
    name: str,
    corpus: list[str],
    threshold: float | None = None,
) -> list[tuple[str, float]]:
    """Find candidate matches from a corpus above the similarity threshold.

    Compares ``name`` against every entry in ``corpus`` and returns
    those exceeding the threshold, sorted by descending score.

    Parameters
    ----------
    name : str
        Query name to search for.
    corpus : list[str]
        List of candidate names to compare against.
    threshold : float | None, optional
        Override the instance threshold for this query. If ``None``,
        uses the threshold set at construction time.

    Returns
    -------
    list[tuple[str, float]]
        Candidate matches as ``(name, score)`` tuples, sorted by
        score descending. Empty list if no matches exceed threshold.
    """
    effective_threshold = threshold if threshold is not None else self._threshold
    candidates = []

    for candidate in corpus:
        s = self.score(name, candidate)
        if s >= effective_threshold:
            candidates.append((candidate, s))

    candidates.sort(key=lambda x: x[1], reverse=True)
    return candidates

Embedding Match

embedding_match

Embedding-based semantic matching for entity resolution.

Stage 3 of the resolution cascade. Uses sentence-transformers to embed entity names and metadata into dense vectors and finds semantically similar entities via cosine similarity. Handles cases that string matching misses:

  • Translations ("Die Fledermaus" ~ "The Bat")
  • Very different spellings of the same name
  • Contextual metadata similarity (genre, collaborators)

The default model (all-MiniLM-L6-v2) produces 384-dimensional embeddings suitable for fast cosine similarity search. In production, embeddings are stored in PostgreSQL via pgvector halfvec(768) for efficient approximate nearest-neighbor queries.

Notes

This module implements the semantic similarity layer described in Teikari (2026), Section 4.3. It fires only for records that were not resolved by identifier matching (Stage 1) or string similarity (Stage 2).

See Also

music_attribution.resolution.string_similarity : Stage 2 (runs before this). music_attribution.resolution.embedding_service : Persistence layer for pgvector. music_attribution.resolution.splink_linkage : Stage 4 (probabilistic linkage).

EmbeddingMatcher

EmbeddingMatcher(model_name: str = 'all-MiniLM-L6-v2')

Semantic entity matching using sentence-transformer embeddings.

Embeds entity names into dense vectors and finds similar entities via cosine similarity. Supports in-memory storage for development and pgvector for production deployments.

The model is lazy-loaded on first use to avoid heavy import-time dependencies when the embedding stage is not needed.

PARAMETER DESCRIPTION
model_name

Sentence-transformer model to use. Default is "all-MiniLM-L6-v2", a lightweight model with good quality-speed tradeoff.

TYPE: str DEFAULT: 'all-MiniLM-L6-v2'

ATTRIBUTE DESCRIPTION
_model_name

Name of the sentence-transformer model.

TYPE: str

_model

Lazy-loaded SentenceTransformer instance.

TYPE: Any

_embeddings

In-memory embedding store (entity_id -> vector).

TYPE: dict[str, list[float]]

See Also

music_attribution.resolution.embedding_service : Production persistence via pgvector.

Source code in src/music_attribution/resolution/embedding_match.py
def __init__(self, model_name: str = "all-MiniLM-L6-v2") -> None:
    self._model_name = model_name
    self._model: Any = None
    self._embeddings: dict[str, list[float]] = {}

embed async

embed(text: str) -> list[float]

Embed a single text string into a dense vector.

PARAMETER DESCRIPTION
text

Text to embed (entity name, metadata string, etc.).

TYPE: str

RETURNS DESCRIPTION
list[float]

Embedding vector. Dimensionality depends on the model (384 for all-MiniLM-L6-v2).

Source code in src/music_attribution/resolution/embedding_match.py
async def embed(self, text: str) -> list[float]:
    """Embed a single text string into a dense vector.

    Parameters
    ----------
    text : str
        Text to embed (entity name, metadata string, etc.).

    Returns
    -------
    list[float]
        Embedding vector. Dimensionality depends on the model
        (384 for ``all-MiniLM-L6-v2``).
    """
    model = self._get_model()
    result = model.encode([text])
    return [float(v) for v in result[0]]

embed_batch async

embed_batch(texts: list[str]) -> list[list[float]]

Embed multiple texts in a single batch for efficiency.

Batch encoding is significantly faster than calling embed() in a loop because the model can parallelize across inputs.

PARAMETER DESCRIPTION
texts

List of texts to embed.

TYPE: list[str]

RETURNS DESCRIPTION
list[list[float]]

List of embedding vectors, one per input text.

Source code in src/music_attribution/resolution/embedding_match.py
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Embed multiple texts in a single batch for efficiency.

    Batch encoding is significantly faster than calling ``embed()``
    in a loop because the model can parallelize across inputs.

    Parameters
    ----------
    texts : list[str]
        List of texts to embed.

    Returns
    -------
    list[list[float]]
        List of embedding vectors, one per input text.
    """
    model = self._get_model()
    results = model.encode(texts)
    return [[float(v) for v in row] for row in results]

store_embedding async

store_embedding(
    entity_id: str, embedding: list[float]
) -> None

Store an embedding in the in-memory index for later similarity search.

In production, use EmbeddingService.store_embedding() for pgvector-backed persistence instead.

PARAMETER DESCRIPTION
entity_id

Unique identifier for the entity.

TYPE: str

embedding

Embedding vector to store.

TYPE: list[float]

Source code in src/music_attribution/resolution/embedding_match.py
async def store_embedding(self, entity_id: str, embedding: list[float]) -> None:
    """Store an embedding in the in-memory index for later similarity search.

    In production, use ``EmbeddingService.store_embedding()`` for
    pgvector-backed persistence instead.

    Parameters
    ----------
    entity_id : str
        Unique identifier for the entity.
    embedding : list[float]
        Embedding vector to store.
    """
    self._embeddings[entity_id] = embedding

find_similar async

find_similar(
    query_embedding: list[float], top_k: int = 5
) -> list[tuple[str, float]]

Find the most similar stored embeddings via brute-force cosine search.

Performs exhaustive comparison against all stored embeddings. For production-scale deployments, use pgvector's approximate nearest neighbor index instead.

PARAMETER DESCRIPTION
query_embedding

The query embedding vector.

TYPE: list[float]

top_k

Number of top results to return. Default is 5.

TYPE: int DEFAULT: 5

RETURNS DESCRIPTION
list[tuple[str, float]]

Top-k results as (entity_id, cosine_similarity) tuples, sorted by similarity descending.

Source code in src/music_attribution/resolution/embedding_match.py
async def find_similar(
    self,
    query_embedding: list[float],
    top_k: int = 5,
) -> list[tuple[str, float]]:
    """Find the most similar stored embeddings via brute-force cosine search.

    Performs exhaustive comparison against all stored embeddings. For
    production-scale deployments, use pgvector's approximate nearest
    neighbor index instead.

    Parameters
    ----------
    query_embedding : list[float]
        The query embedding vector.
    top_k : int, optional
        Number of top results to return. Default is 5.

    Returns
    -------
    list[tuple[str, float]]
        Top-k results as ``(entity_id, cosine_similarity)`` tuples,
        sorted by similarity descending.
    """
    scored: list[tuple[str, float]] = []
    for entity_id, stored_embedding in self._embeddings.items():
        score = self.cosine_similarity(query_embedding, stored_embedding)
        scored.append((entity_id, score))

    scored.sort(key=lambda x: x[1], reverse=True)
    return scored[:top_k]

cosine_similarity staticmethod

cosine_similarity(
    vec_a: list[float], vec_b: list[float]
) -> float

Compute cosine similarity between two vectors.

Defined as dot(a, b) / (||a|| * ||b||). Returns 0.0 if either vector has zero magnitude.

PARAMETER DESCRIPTION
vec_a

First vector.

TYPE: list[float]

vec_b

Second vector (must have same dimensionality as vec_a).

TYPE: list[float]

RETURNS DESCRIPTION
float

Cosine similarity in range [-1.0, 1.0]. For normalized sentence-transformer outputs, values are typically in [0, 1].

RAISES DESCRIPTION
ValueError

If vectors have different lengths (via zip(..., strict=True)).

Source code in src/music_attribution/resolution/embedding_match.py
@staticmethod
def cosine_similarity(vec_a: list[float], vec_b: list[float]) -> float:
    """Compute cosine similarity between two vectors.

    Defined as ``dot(a, b) / (||a|| * ||b||)``. Returns 0.0 if
    either vector has zero magnitude.

    Parameters
    ----------
    vec_a : list[float]
        First vector.
    vec_b : list[float]
        Second vector (must have same dimensionality as ``vec_a``).

    Returns
    -------
    float
        Cosine similarity in range [-1.0, 1.0]. For normalized
        sentence-transformer outputs, values are typically in [0, 1].

    Raises
    ------
    ValueError
        If vectors have different lengths (via ``zip(..., strict=True)``).
    """
    dot_product = sum(a * b for a, b in zip(vec_a, vec_b, strict=True))
    norm_a = math.sqrt(sum(a * a for a in vec_a))
    norm_b = math.sqrt(sum(b * b for b in vec_b))
    if norm_a == 0 or norm_b == 0:
        return 0.0
    return dot_product / (norm_a * norm_b)

LLM Disambiguation

llm_disambiguation

LLM-assisted disambiguation for entity resolution.

Stage 6 (final) of the resolution cascade. For complex disambiguation cases (e.g., "John Williams the composer vs the guitarist"), uses PydanticAI with structured output to make a reasoned decision. The LLM is only called when other signals produce ambiguous results (confidence in the 0.4-0.7 range), providing strict cost control.

Key design decisions:

  • Cost gating: LLM invocation is guarded by should_invoke(), which checks that the best existing signal falls in the ambiguity range.
  • Deterministic caching: A SHA-256 cache key prevents duplicate LLM calls for the same candidate set and context.
  • Structured output: The LLM returns a DisambiguationResult with chosen index, confidence, and reasoning (not free text).
Notes

This module implements the LLM disambiguation layer described in Teikari (2026), Section 4.6. The Oracle Problem (digital systems cannot fully verify physical reality) means LLM confidence is treated as one signal among many, not as ground truth.

See Also

music_attribution.resolution.graph_resolution : Stage 5 (runs before this). music_attribution.resolution.orchestrator : Cascade coordinator.

DisambiguationResult

Bases: BaseModel

Structured output from LLM disambiguation.

Represents the LLM's reasoned decision about which candidate entity (if any) matches the query, along with self-reported confidence and a natural-language explanation.

ATTRIBUTE DESCRIPTION
chosen_index

Index into the candidates list identifying the chosen entity. None if the LLM is uncertain and cannot make a selection.

TYPE: int | None

confidence

LLM's self-reported confidence in range [0.0, 1.0]. This is one signal among many and should not be taken at face value.

TYPE: float

reasoning

Natural-language explanation of the LLM's decision.

TYPE: str

alternatives_considered

Number of candidate entities the LLM evaluated.

TYPE: int

cached

Whether this result was served from the in-memory cache.

TYPE: bool

LLMDisambiguator

LLMDisambiguator()

LLM-assisted entity disambiguation.

Only invoked when other resolution methods produce ambiguous results (confidence in the 0.4-0.7 range). Uses SHA-256 content-based caching to reduce LLM costs.

The _call_llm method is designed to be overridden in subclasses or mocked in tests. In production, it would use a PydanticAI Agent with structured DisambiguationResult output.

ATTRIBUTE DESCRIPTION
_cache

In-memory cache keyed by SHA-256 hash of candidate + context.

TYPE: dict[str, DisambiguationResult]

Notes

The ambiguity range constants (_AMBIGUITY_LOW=0.4, _AMBIGUITY_HIGH=0.7) define when the LLM is invoked. Scores above 0.7 are confident enough to not need LLM; scores below 0.4 are too uncertain for LLM to add value.

Source code in src/music_attribution/resolution/llm_disambiguation.py
def __init__(self) -> None:
    self._cache: dict[str, DisambiguationResult] = {}

disambiguate async

disambiguate(
    candidates: list[NormalizedRecord], context: str
) -> DisambiguationResult

Disambiguate between candidate entities using LLM.

Checks the content-addressed cache first. On cache miss, calls _call_llm() and caches the result. On LLM failure, returns a safe fallback with chosen_index=None and confidence=0.0.

PARAMETER DESCRIPTION
candidates

List of candidate records that could not be resolved by earlier cascade stages.

TYPE: list[NormalizedRecord]

context

Additional context for disambiguation (e.g., album name, genre, release year).

TYPE: str

RETURNS DESCRIPTION
DisambiguationResult

The LLM's structured decision, or a safe fallback on error.

Source code in src/music_attribution/resolution/llm_disambiguation.py
async def disambiguate(
    self,
    candidates: list[NormalizedRecord],
    context: str,
) -> DisambiguationResult:
    """Disambiguate between candidate entities using LLM.

    Checks the content-addressed cache first. On cache miss, calls
    ``_call_llm()`` and caches the result. On LLM failure, returns
    a safe fallback with ``chosen_index=None`` and ``confidence=0.0``.

    Parameters
    ----------
    candidates : list[NormalizedRecord]
        List of candidate records that could not be resolved by
        earlier cascade stages.
    context : str
        Additional context for disambiguation (e.g., album name,
        genre, release year).

    Returns
    -------
    DisambiguationResult
        The LLM's structured decision, or a safe fallback on error.
    """
    cache_key = self._cache_key(candidates, context)

    # Check cache
    if cache_key in self._cache:
        cached = self._cache[cache_key]
        return DisambiguationResult(
            chosen_index=cached.chosen_index,
            confidence=cached.confidence,
            reasoning=cached.reasoning,
            alternatives_considered=cached.alternatives_considered,
            cached=True,
        )

    # Call LLM
    try:
        result = await self._call_llm(candidates, context)
        self._cache[cache_key] = result
        return result
    except TimeoutError as e:
        logger.warning("LLM disambiguation timed out: %s", e)
        return DisambiguationResult(
            chosen_index=None,
            confidence=0.0,
            reasoning=f"LLM timeout: {e}",
            alternatives_considered=len(candidates),
        )
    except Exception as e:  # noqa: BLE001
        logger.warning("LLM disambiguation failed: %s", e)
        return DisambiguationResult(
            chosen_index=None,
            confidence=0.0,
            reasoning=f"LLM error: {e}",
            alternatives_considered=len(candidates),
        )

should_invoke async

should_invoke(existing_scores: ResolutionDetails) -> bool

Determine if LLM disambiguation is needed based on existing signals.

The LLM is only invoked when the best signal from other methods falls in the ambiguity range [0.4, 0.7]. If no other signals exist at all, the LLM is invoked as a last resort.

PARAMETER DESCRIPTION
existing_scores

Resolution scores from earlier cascade stages (string similarity, embedding similarity, graph path confidence).

TYPE: ResolutionDetails

RETURNS DESCRIPTION
bool

True if LLM should be invoked (ambiguous or missing signals).

Source code in src/music_attribution/resolution/llm_disambiguation.py
async def should_invoke(self, existing_scores: ResolutionDetails) -> bool:
    """Determine if LLM disambiguation is needed based on existing signals.

    The LLM is only invoked when the best signal from other methods
    falls in the ambiguity range [0.4, 0.7]. If no other signals
    exist at all, the LLM is invoked as a last resort.

    Parameters
    ----------
    existing_scores : ResolutionDetails
        Resolution scores from earlier cascade stages (string
        similarity, embedding similarity, graph path confidence).

    Returns
    -------
    bool
        ``True`` if LLM should be invoked (ambiguous or missing signals).
    """
    scores = [
        s
        for s in [
            existing_scores.string_similarity,
            existing_scores.embedding_similarity,
            existing_scores.graph_path_confidence,
        ]
        if s is not None
    ]

    if not scores:
        return True  # No other signals — invoke LLM

    max_score = max(scores)
    return _AMBIGUITY_LOW <= max_score <= _AMBIGUITY_HIGH

Splink probabilistic record linkage for entity resolution.

Stage 4 of the resolution cascade. Implements Fellegi-Sunter probabilistic record linkage at scale using the Splink library. Estimates match/non-match probability distributions via expectation-maximization and produces calibrated linkage scores. Uses DuckDB backend for performance.

The Fellegi-Sunter model treats record comparison as a binary classification problem: for each pair of records, it estimates the probability that they refer to the same entity based on agreement/disagreement patterns across comparison fields. The model parameters (m-probabilities for matches, u-probabilities for non-matches) are estimated from the data using EM.

When Splink is not available (e.g., in lightweight test environments), the matcher falls back to a simple exact-match heuristic on comparison columns.

Notes

This module implements the probabilistic record linkage layer described in Teikari (2026), Section 4.4. Splink v4 API is used (from splink import block_on, not splink.blocking_rules_library).

References

.. [1] Fellegi, I. P., & Sunter, A. B. (1969). "A Theory for Record Linkage." Journal of the American Statistical Association, 64(328), 1183-1210.

See Also

music_attribution.resolution.embedding_match : Stage 3 (runs before this). music_attribution.resolution.graph_resolution : Stage 5 (runs after this).

SplinkMatcher()

Probabilistic record linkage using the Splink library.

Uses the Fellegi-Sunter model with configurable comparison columns and blocking rules to efficiently link records at scale. The workflow is:

  1. configure_model() -- define comparison columns.
  2. estimate_parameters() -- learn m/u probabilities from data.
  3. predict() -- compute match probabilities for all candidate pairs.
  4. cluster() -- group records by match probability threshold.
ATTRIBUTE DESCRIPTION
_model_configured

Whether configure_model() has been called.

TYPE: bool

_comparison_columns

Column names used for record comparison.

TYPE: list[str]

_linker

The Splink Linker instance (None until parameters are estimated).

TYPE: Any

Source code in src/music_attribution/resolution/splink_linkage.py
def __init__(self) -> None:
    self._model_configured = False
    self._comparison_columns: list[str] = []
    self._linker: Any = None
configure_model(comparison_columns: list[str]) -> None

Configure the Splink model with comparison columns.

Must be called before estimate_parameters(). Each column will be compared using exact-match comparisons with term frequency adjustments.

PARAMETER DESCRIPTION
comparison_columns

Column names to compare (e.g., ["canonical_name", "isrc"]).

TYPE: list[str]

Source code in src/music_attribution/resolution/splink_linkage.py
def configure_model(self, comparison_columns: list[str]) -> None:
    """Configure the Splink model with comparison columns.

    Must be called before ``estimate_parameters()``. Each column
    will be compared using exact-match comparisons with term
    frequency adjustments.

    Parameters
    ----------
    comparison_columns : list[str]
        Column names to compare (e.g., ``["canonical_name", "isrc"]``).
    """
    self._comparison_columns = comparison_columns
    self._model_configured = True
estimate_parameters(records: DataFrame) -> None

Estimate Fellegi-Sunter m/u parameters from data.

Uses random sampling to estimate u-probabilities (probability of agreement among non-matches) and expectation-maximization to estimate m-probabilities (probability of agreement among matches) for each comparison column.

PARAMETER DESCRIPTION
records

DataFrame with a unique_id column plus all configured comparison columns.

TYPE: DataFrame

RAISES DESCRIPTION
RuntimeError

If configure_model() has not been called first.

Notes

If Splink is not installed, falls back to None linker and subsequent calls to predict() will use the simple exact-match fallback.

Source code in src/music_attribution/resolution/splink_linkage.py
def estimate_parameters(self, records: pd.DataFrame) -> None:
    """Estimate Fellegi-Sunter m/u parameters from data.

    Uses random sampling to estimate u-probabilities (probability of
    agreement among non-matches) and expectation-maximization to
    estimate m-probabilities (probability of agreement among matches)
    for each comparison column.

    Parameters
    ----------
    records : pd.DataFrame
        DataFrame with a ``unique_id`` column plus all configured
        comparison columns.

    Raises
    ------
    RuntimeError
        If ``configure_model()`` has not been called first.

    Notes
    -----
    If Splink is not installed, falls back to ``None`` linker and
    subsequent calls to ``predict()`` will use the simple exact-match
    fallback.
    """
    if not self._model_configured:
        msg = "Model not configured. Call configure_model() first."
        raise RuntimeError(msg)

    try:
        import splink.comparison_library as cl
        from splink import DuckDBAPI, Linker, SettingsCreator, block_on

        comparisons = []
        for col in self._comparison_columns:
            comparisons.append(cl.ExactMatch(col).configure(term_frequency_adjustments=True))

        blocking_rules = [block_on(col) for col in self._comparison_columns]

        settings = SettingsCreator(
            link_type="dedupe_only",
            comparisons=comparisons,  # type: ignore[arg-type]
            blocking_rules_to_generate_predictions=blocking_rules,  # type: ignore[arg-type]
        )

        db_api = DuckDBAPI()
        self._linker = Linker(records, settings, db_api=db_api)  # type: ignore[arg-type]
        self._linker.training.estimate_u_using_random_sampling(max_pairs=1e5)

        # Try to estimate probability two random records match
        em_failures = 0
        for col in self._comparison_columns:
            try:
                self._linker.training.estimate_parameters_using_expectation_maximisation(
                    block_on(col), fix_u_probabilities=False
                )
            except Exception:  # noqa: BLE001
                em_failures += 1
                logger.debug("EM estimation failed for column %s", col)
        if em_failures == len(self._comparison_columns):
            logger.warning(
                "EM estimation failed for all %d columns — predictions may use unestimated parameters",
                em_failures,
            )

    except ImportError:
        logger.warning("Splink not available, using fallback parameter estimation")
        self._linker = None
predict(records: DataFrame) -> DataFrame

Predict match probabilities for all candidate record pairs.

If the Splink linker is available, uses the trained model to predict. Otherwise, falls back to a simple exact-match heuristic on the comparison columns.

PARAMETER DESCRIPTION
records

DataFrame with comparison columns (used only in fallback mode).

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

DataFrame with columns unique_id_l, unique_id_r, and match_probability (float in [0, 1]).

Source code in src/music_attribution/resolution/splink_linkage.py
def predict(self, records: pd.DataFrame) -> pd.DataFrame:
    """Predict match probabilities for all candidate record pairs.

    If the Splink linker is available, uses the trained model to
    predict. Otherwise, falls back to a simple exact-match heuristic
    on the comparison columns.

    Parameters
    ----------
    records : pd.DataFrame
        DataFrame with comparison columns (used only in fallback mode).

    Returns
    -------
    pd.DataFrame
        DataFrame with columns ``unique_id_l``, ``unique_id_r``,
        and ``match_probability`` (float in [0, 1]).
    """
    if self._linker is not None:
        try:
            predictions = self._linker.inference.predict()
            df: pd.DataFrame = predictions.as_pandas_dataframe()
            return df[["unique_id_l", "unique_id_r", "match_probability"]]  # type: ignore[no-any-return]
        except Exception as e:
            logger.warning("Splink prediction failed: %s, using fallback", e)

    # Fallback: simple exact match on comparison columns
    return self._fallback_predict(records)
cluster(
    predictions: DataFrame, threshold: float = 0.85
) -> list[list[int]]

Cluster records into entity groups based on match predictions.

Uses union-find with path compression to transitively merge records connected by match probabilities above the threshold.

PARAMETER DESCRIPTION
predictions

DataFrame with columns unique_id_l, unique_id_r, and match_probability.

TYPE: DataFrame

threshold

Minimum match probability to consider a pair as linked. Default is 0.85.

TYPE: float DEFAULT: 0.85

RETURNS DESCRIPTION
list[list[int]]

Clusters of unique_id values. Each cluster represents records believed to be the same entity.

Source code in src/music_attribution/resolution/splink_linkage.py
def cluster(
    self,
    predictions: pd.DataFrame,
    threshold: float = 0.85,
) -> list[list[int]]:
    """Cluster records into entity groups based on match predictions.

    Uses union-find with path compression to transitively merge
    records connected by match probabilities above the threshold.

    Parameters
    ----------
    predictions : pd.DataFrame
        DataFrame with columns ``unique_id_l``, ``unique_id_r``,
        and ``match_probability``.
    threshold : float, optional
        Minimum match probability to consider a pair as linked.
        Default is 0.85.

    Returns
    -------
    list[list[int]]
        Clusters of ``unique_id`` values. Each cluster represents
        records believed to be the same entity.
    """
    # Filter predictions above threshold
    above = predictions[predictions["match_probability"] >= threshold]

    # Union-find clustering
    parent: dict[int, int] = {}

    def find(x: int) -> int:
        if x not in parent:
            parent[x] = x
        while parent[x] != x:
            parent[x] = parent[parent[x]]
            x = parent[x]
        return x

    def union(x: int, y: int) -> None:
        px, py = find(x), find(y)
        if px != py:
            parent[px] = py

    # Collect all IDs
    all_ids: set[int] = set()
    for _, row in above.iterrows():
        lid = int(row["unique_id_l"])
        rid = int(row["unique_id_r"])
        all_ids.add(lid)
        all_ids.add(rid)
        union(lid, rid)

    # Also add unpaired IDs from predictions
    for _, row in predictions.iterrows():
        all_ids.add(int(row["unique_id_l"]))
        all_ids.add(int(row["unique_id_r"]))

    # Group by root
    groups: dict[int, list[int]] = defaultdict(list)
    for uid in all_ids:
        groups[find(uid)].append(uid)

    return list(groups.values())

Graph Resolution

graph_resolution

Graph-based entity resolution via relationship evidence.

Stage 5 of the resolution cascade. Uses relationship graph traversals to resolve entities based on shared connections. Two artist records sharing 3+ album relationships are likely the same artist, even if their names differ slightly.

The graph resolver computes confidence from two complementary signals:

  • Jaccard coefficient of shared neighbor sets (structural similarity).
  • Absolute shared count with diminishing returns (3+ shared neighbors is strong evidence regardless of total degree).

The in-memory adjacency graph is suitable for development and testing. In production, Apache AGE (PostgreSQL graph extension) provides the same traversal semantics with persistent storage and ACID guarantees.

Notes

This module implements the graph-based resolution layer described in Teikari (2026), Section 4.5. Graph evidence is particularly valuable for resolving entities with common names (e.g., "John Smith") where string similarity alone is insufficient.

See Also

music_attribution.resolution.splink_linkage : Stage 4 (runs before this). music_attribution.resolution.llm_disambiguation : Stage 6 (runs after this). music_attribution.resolution.graph_store : Persistent graph storage.

GraphResolver

GraphResolver()

Resolve entities using relationship graph evidence.

Maintains an in-memory adjacency graph of entity relationships. Each entity is a node, and relationships (PERFORMED_ON, WROTE, PRODUCED, etc.) form bidirectional edges.

In production, this would query Apache AGE or a similar graph database. The in-memory implementation provides the same API for testing and development.

ATTRIBUTE DESCRIPTION
_graph

Adjacency list mapping entity IDs to sets of (neighbor_id, relationship_type) tuples.

TYPE: dict[str, set[tuple[str, str]]]

_test_ids

Optional test-only ID mapping for deterministic tests.

TYPE: dict[str, str]

Source code in src/music_attribution/resolution/graph_resolution.py
def __init__(self) -> None:
    # Adjacency list: entity_id -> set of (neighbor_id, rel_type)
    self._graph: dict[str, set[tuple[str, str]]] = defaultdict(set)
    self._test_ids: dict[str, str] = {}

add_relationship

add_relationship(
    from_id: str, to_id: str, rel_type: str
) -> None

Add a bidirectional relationship to the graph.

Both directions are stored so that neighbor lookups work regardless of edge direction.

PARAMETER DESCRIPTION
from_id

Source entity ID.

TYPE: str

to_id

Target entity ID.

TYPE: str

rel_type

Relationship type (e.g., "PERFORMED_ON", "WROTE").

TYPE: str

Source code in src/music_attribution/resolution/graph_resolution.py
def add_relationship(self, from_id: str, to_id: str, rel_type: str) -> None:
    """Add a bidirectional relationship to the graph.

    Both directions are stored so that neighbor lookups work
    regardless of edge direction.

    Parameters
    ----------
    from_id : str
        Source entity ID.
    to_id : str
        Target entity ID.
    rel_type : str
        Relationship type (e.g., ``"PERFORMED_ON"``, ``"WROTE"``).
    """
    self._graph[from_id].add((to_id, rel_type))
    self._graph[to_id].add((from_id, rel_type))

find_candidate_matches async

find_candidate_matches(
    entity_id: str, min_shared: int = 2
) -> list[tuple[str, float]]

Find candidate entity matches based on shared neighbor relationships.

Two entities that share many neighbors (e.g., both performed on the same albums) are likely the same entity or closely related. The confidence score combines the ratio of shared-to-total neighbors with an absolute shared-count bonus.

PARAMETER DESCRIPTION
entity_id

Entity ID to find matches for.

TYPE: str

min_shared

Minimum number of shared neighbors to qualify as a candidate. Default is 2.

TYPE: int DEFAULT: 2

RETURNS DESCRIPTION
list[tuple[str, float]]

Candidate matches as (entity_id, confidence) tuples, sorted by confidence descending. Empty list if the entity has no graph relationships.

Source code in src/music_attribution/resolution/graph_resolution.py
async def find_candidate_matches(
    self,
    entity_id: str,
    min_shared: int = 2,
) -> list[tuple[str, float]]:
    """Find candidate entity matches based on shared neighbor relationships.

    Two entities that share many neighbors (e.g., both performed on the
    same albums) are likely the same entity or closely related. The
    confidence score combines the ratio of shared-to-total neighbors
    with an absolute shared-count bonus.

    Parameters
    ----------
    entity_id : str
        Entity ID to find matches for.
    min_shared : int, optional
        Minimum number of shared neighbors to qualify as a candidate.
        Default is 2.

    Returns
    -------
    list[tuple[str, float]]
        Candidate matches as ``(entity_id, confidence)`` tuples,
        sorted by confidence descending. Empty list if the entity
        has no graph relationships.
    """
    if entity_id not in self._graph:
        return []

    # Get this entity's neighbors
    my_neighbors = {n for n, _ in self._graph[entity_id]}

    # Find other entities that share neighbors
    shared_counts: dict[str, int] = defaultdict(int)
    for neighbor in my_neighbors:
        for other_entity, _ in self._graph[neighbor]:
            if other_entity != entity_id:
                shared_counts[other_entity] += 1

    # Filter by minimum shared and score
    candidates: list[tuple[str, float]] = []
    for candidate_id, shared_count in shared_counts.items():
        if shared_count >= min_shared:
            score = self._compute_confidence(shared_count, len(my_neighbors))
            candidates.append((candidate_id, score))

    candidates.sort(key=lambda x: x[1], reverse=True)
    return candidates

score_graph_evidence async

score_graph_evidence(entity_a: str, entity_b: str) -> float

Score the graph evidence that two entities are the same.

Combines two complementary signals:

  • Jaccard coefficient: |shared| / |union| of neighbor sets.
  • Shared count bonus: min(|shared| / 3, 1.0) (diminishing returns -- 3+ shared is strong evidence).

The final score is the average of both signals, capped at 1.0.

PARAMETER DESCRIPTION
entity_a

First entity ID.

TYPE: str

entity_b

Second entity ID.

TYPE: str

RETURNS DESCRIPTION
float

Confidence score in range [0.0, 1.0]. Returns 0.0 if either entity has no graph relationships or they share no neighbors.

Source code in src/music_attribution/resolution/graph_resolution.py
async def score_graph_evidence(self, entity_a: str, entity_b: str) -> float:
    """Score the graph evidence that two entities are the same.

    Combines two complementary signals:

    - **Jaccard coefficient**: ``|shared| / |union|`` of neighbor sets.
    - **Shared count bonus**: ``min(|shared| / 3, 1.0)`` (diminishing
      returns -- 3+ shared is strong evidence).

    The final score is the average of both signals, capped at 1.0.

    Parameters
    ----------
    entity_a : str
        First entity ID.
    entity_b : str
        Second entity ID.

    Returns
    -------
    float
        Confidence score in range [0.0, 1.0]. Returns 0.0 if either
        entity has no graph relationships or they share no neighbors.
    """
    if entity_a not in self._graph or entity_b not in self._graph:
        return 0.0

    neighbors_a = {n for n, _ in self._graph[entity_a]}
    neighbors_b = {n for n, _ in self._graph[entity_b]}

    shared = neighbors_a & neighbors_b
    if not shared:
        return 0.0

    total = neighbors_a | neighbors_b
    # Jaccard-like coefficient weighted by shared count
    jaccard = len(shared) / len(total)

    # Boost for more shared neighbors (diminishing returns)
    shared_boost = min(len(shared) / 3.0, 1.0)

    return min((jaccard + shared_boost) / 2.0, 1.0)

Graph Store

graph_store

Graph storage for ResolvedEntities.

Provides in-memory graph storage for development and testing, and defines the interface for Apache AGE integration in production. Enables relationship-based queries such as:

  • "Find all entities that share an album with this artist."
  • "What is the shortest path between two entities?"
  • "Who are all performers on works by this composer?"

The graph is stored as an adjacency list of bidirectional edges with typed relationships and arbitrary attributes. BFS traversal supports depth-limited neighbor queries and shortest-path computation.

Notes

In production, Apache AGE (PostgreSQL graph extension) provides the same traversal semantics with persistent storage, ACID guarantees, and Cypher query support. The AsyncEdgeRepository provides the PostgreSQL-backed edge storage layer.

See Also

music_attribution.resolution.graph_resolution : Graph-based entity resolution. music_attribution.resolution.edge_repository : PostgreSQL edge persistence.

GraphStore

GraphStore()

Store and query ResolvedEntities as a graph.

Uses in-memory storage by default. Production implementations would use Apache AGE (PostgreSQL graph extension).

The store maintains two data structures:

  • _entities: UUID-keyed map of ResolvedEntity objects (nodes).
  • _edges: Adjacency list of bidirectional edges with relationship type and arbitrary string attributes.
ATTRIBUTE DESCRIPTION
_entities

Entity node storage.

TYPE: dict[UUID, ResolvedEntity]

_edges

Adjacency list: entity_id -> [(neighbor_id, rel_type, attrs)].

TYPE: dict[UUID, list[tuple[UUID, str, dict[str, str]]]]

Source code in src/music_attribution/resolution/graph_store.py
def __init__(self) -> None:
    self._entities: dict[uuid.UUID, ResolvedEntity] = {}
    self._edges: dict[uuid.UUID, list[tuple[uuid.UUID, str, dict[str, str]]]] = defaultdict(list)

add_entity async

add_entity(entity: ResolvedEntity) -> None

Store a ResolvedEntity as a node in the graph.

If an entity with the same ID already exists, it is overwritten.

PARAMETER DESCRIPTION
entity

The resolved entity to store.

TYPE: ResolvedEntity

Source code in src/music_attribution/resolution/graph_store.py
async def add_entity(self, entity: ResolvedEntity) -> None:
    """Store a ResolvedEntity as a node in the graph.

    If an entity with the same ID already exists, it is overwritten.

    Parameters
    ----------
    entity : ResolvedEntity
        The resolved entity to store.
    """
    self._entities[entity.entity_id] = entity

get_entity async

get_entity(entity_id: UUID) -> ResolvedEntity | None

Retrieve a ResolvedEntity by its UUID.

PARAMETER DESCRIPTION
entity_id

The entity ID to look up.

TYPE: UUID

RETURNS DESCRIPTION
ResolvedEntity | None

The entity if found, None otherwise.

Source code in src/music_attribution/resolution/graph_store.py
async def get_entity(self, entity_id: uuid.UUID) -> ResolvedEntity | None:
    """Retrieve a ResolvedEntity by its UUID.

    Parameters
    ----------
    entity_id : uuid.UUID
        The entity ID to look up.

    Returns
    -------
    ResolvedEntity | None
        The entity if found, ``None`` otherwise.
    """
    return self._entities.get(entity_id)

add_relationship async

add_relationship(
    from_id: UUID,
    to_id: UUID,
    rel_type: str,
    attrs: dict[str, str],
) -> None

Add a bidirectional relationship between two entities.

Both directions are stored to enable traversal from either endpoint. The entities referenced by from_id and to_id should already exist in the store (but this is not enforced).

PARAMETER DESCRIPTION
from_id

Source entity ID.

TYPE: UUID

to_id

Target entity ID.

TYPE: UUID

rel_type

Relationship type (e.g., "PERFORMED", "WROTE", "PRODUCED").

TYPE: str

attrs

Additional relationship attributes (e.g., {"role": "lead vocalist"}).

TYPE: dict[str, str]

Source code in src/music_attribution/resolution/graph_store.py
async def add_relationship(
    self,
    from_id: uuid.UUID,
    to_id: uuid.UUID,
    rel_type: str,
    attrs: dict[str, str],
) -> None:
    """Add a bidirectional relationship between two entities.

    Both directions are stored to enable traversal from either endpoint.
    The entities referenced by ``from_id`` and ``to_id`` should already
    exist in the store (but this is not enforced).

    Parameters
    ----------
    from_id : uuid.UUID
        Source entity ID.
    to_id : uuid.UUID
        Target entity ID.
    rel_type : str
        Relationship type (e.g., ``"PERFORMED"``, ``"WROTE"``,
        ``"PRODUCED"``).
    attrs : dict[str, str]
        Additional relationship attributes (e.g., ``{"role": "lead vocalist"}``).
    """
    self._edges[from_id].append((to_id, rel_type, attrs))
    self._edges[to_id].append((from_id, rel_type, attrs))
find_related(
    entity_id: UUID, rel_type: str, depth: int = 1
) -> list[ResolvedEntity]

Find entities related by a specific relationship type.

Performs a breadth-first traversal following only edges of the specified type, up to the given depth. Each entity is visited at most once (cycle-safe).

PARAMETER DESCRIPTION
entity_id

Starting entity ID.

TYPE: UUID

rel_type

Relationship type to follow (e.g., "PERFORMED").

TYPE: str

depth

Maximum traversal depth (number of hops). Default is 1.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
list[ResolvedEntity]

Related entities found within the traversal depth. Does not include the starting entity.

Source code in src/music_attribution/resolution/graph_store.py
async def find_related(
    self,
    entity_id: uuid.UUID,
    rel_type: str,
    depth: int = 1,
) -> list[ResolvedEntity]:
    """Find entities related by a specific relationship type.

    Performs a breadth-first traversal following only edges of the
    specified type, up to the given depth. Each entity is visited
    at most once (cycle-safe).

    Parameters
    ----------
    entity_id : uuid.UUID
        Starting entity ID.
    rel_type : str
        Relationship type to follow (e.g., ``"PERFORMED"``).
    depth : int, optional
        Maximum traversal depth (number of hops). Default is 1.

    Returns
    -------
    list[ResolvedEntity]
        Related entities found within the traversal depth. Does
        not include the starting entity.
    """
    visited: set[uuid.UUID] = {entity_id}
    current_level: set[uuid.UUID] = {entity_id}
    results: list[ResolvedEntity] = []

    for _ in range(depth):
        next_level: set[uuid.UUID] = set()
        for node in current_level:
            for target_id, edge_type, _ in self._edges.get(node, []):
                if edge_type == rel_type and target_id not in visited:
                    visited.add(target_id)
                    next_level.add(target_id)
                    entity = self._entities.get(target_id)
                    if entity:
                        results.append(entity)
        current_level = next_level

    return results

shortest_path async

shortest_path(
    from_id: UUID, to_id: UUID
) -> list[ResolvedEntity]

Find the shortest path between two entities using BFS.

Traverses all relationship types to find the shortest path (fewest hops) between two entities. Useful for understanding how two entities are connected in the knowledge graph.

PARAMETER DESCRIPTION
from_id

Starting entity ID.

TYPE: UUID

to_id

Target entity ID.

TYPE: UUID

RETURNS DESCRIPTION
list[ResolvedEntity]

Entities along the shortest path, inclusive of both endpoints. Returns a single-element list if from_id == to_id. Returns an empty list if no path exists or either entity is not in the store.

Source code in src/music_attribution/resolution/graph_store.py
async def shortest_path(
    self,
    from_id: uuid.UUID,
    to_id: uuid.UUID,
) -> list[ResolvedEntity]:
    """Find the shortest path between two entities using BFS.

    Traverses all relationship types to find the shortest path
    (fewest hops) between two entities. Useful for understanding
    how two entities are connected in the knowledge graph.

    Parameters
    ----------
    from_id : uuid.UUID
        Starting entity ID.
    to_id : uuid.UUID
        Target entity ID.

    Returns
    -------
    list[ResolvedEntity]
        Entities along the shortest path, inclusive of both endpoints.
        Returns a single-element list if ``from_id == to_id``.
        Returns an empty list if no path exists or either entity
        is not in the store.
    """
    if from_id == to_id:
        entity = self._entities.get(from_id)
        return [entity] if entity else []

    # BFS
    queue: deque[list[uuid.UUID]] = deque([[from_id]])
    visited: set[uuid.UUID] = {from_id}

    while queue:
        path = queue.popleft()
        current = path[-1]

        for neighbor_id, _, _ in self._edges.get(current, []):
            if neighbor_id in visited:
                continue
            new_path = [*path, neighbor_id]
            if neighbor_id == to_id:
                # Convert IDs to entities
                return [e for uid in new_path if (e := self._entities.get(uid)) is not None]
            visited.add(neighbor_id)
            queue.append(new_path)

    return []