Skip to content

ETL Pipeline

Extract-Transform-Load pipeline: raw data from 5 sources → NormalizedRecord.

MusicBrainz Extractor

musicbrainz

MusicBrainz ETL connector.

Fetches recordings, works, artists, and relationships from the MusicBrainz API and transforms raw API responses into NormalizedRecord boundary objects. Handles rate limiting (1 req/s per MusicBrainz policy), pagination, and exponential-backoff retries.

MusicBrainz is the highest-confidence open data source (source_confidence = 0.9) because it is community-curated with editorial review. It provides ISRC, ISWC, and ISNI identifiers that map directly to the A0-A3 assurance levels described in the companion paper (Section 3).

Notes

musicbrainzngs is synchronous — all API calls are wrapped in asyncio.to_thread() to avoid blocking the event loop.

The _RELATION_TYPE_MAP translates MusicBrainz relationship type strings (e.g., "producer", "lyricist") into the normalised RelationshipTypeEnum used throughout the attribution pipeline.

MusicBrainzConnector

MusicBrainzConnector(
    user_agent: str, rate: float = 1.0, max_retries: int = 3
)

ETL connector for the MusicBrainz API.

Provides async fetch_* methods that query the MusicBrainz web service and transform_* methods that convert raw JSON responses into NormalizedRecord boundary objects for the downstream entity resolution pipeline.

PARAMETER DESCRIPTION
user_agent

User-Agent string for API compliance. Must include the application name, version, and contact email (e.g., "MusicAttribution/0.1.0 (user@example.com)").

TYPE: str

rate

Maximum requests per second, by default 1.0 (MusicBrainz policy).

TYPE: float DEFAULT: 1.0

max_retries

Maximum retry attempts on transient API errors, by default 3. Uses exponential backoff (2^attempt seconds).

TYPE: int DEFAULT: 3

ATTRIBUTE DESCRIPTION
_rate_limiter

Token-bucket limiter enforcing the per-second request cap.

TYPE: TokenBucketRateLimiter

Examples:

>>> connector = MusicBrainzConnector("MusicAttribution/0.1.0 (user@example.com)")
>>> record = await connector.fetch_recording("some-mbid")
>>> record.source
<SourceEnum.MUSICBRAINZ: 'musicbrainz'>
Source code in src/music_attribution/etl/musicbrainz.py
def __init__(
    self,
    user_agent: str,
    rate: float = 1.0,
    max_retries: int = 3,
) -> None:
    self._user_agent = user_agent
    self._rate_limiter = TokenBucketRateLimiter(rate=rate, capacity=1)
    self._max_retries = max_retries
    musicbrainzngs.set_useragent(*self._parse_user_agent(user_agent))

fetch_recording async

fetch_recording(mbid: str) -> NormalizedRecord

Fetch a recording by MusicBrainz ID.

Retrieves the recording with artist credits, ISRCs, releases, and artist relationships, then transforms the response into a NormalizedRecord.

PARAMETER DESCRIPTION
mbid

MusicBrainz recording UUID (e.g., "b10bbbfc-cf9e-42e0-be17-e2c3e1d2600d").

TYPE: str

RETURNS DESCRIPTION
NormalizedRecord

Normalised recording with relationships and identifiers.

RAISES DESCRIPTION
WebServiceError

If the MusicBrainz API returns an error after all retries.

Source code in src/music_attribution/etl/musicbrainz.py
async def fetch_recording(self, mbid: str) -> NormalizedRecord:
    """Fetch a recording by MusicBrainz ID.

    Retrieves the recording with artist credits, ISRCs, releases,
    and artist relationships, then transforms the response into a
    ``NormalizedRecord``.

    Parameters
    ----------
    mbid : str
        MusicBrainz recording UUID (e.g.,
        ``"b10bbbfc-cf9e-42e0-be17-e2c3e1d2600d"``).

    Returns
    -------
    NormalizedRecord
        Normalised recording with relationships and identifiers.

    Raises
    ------
    musicbrainzngs.WebServiceError
        If the MusicBrainz API returns an error after all retries.
    """
    data = await self._api_call(
        musicbrainzngs.get_recording_by_id,
        mbid,
        includes=["artists", "isrcs", "releases", "artist-rels"],
    )
    return self.transform_recording(data["recording"])

fetch_artist async

fetch_artist(mbid: str) -> NormalizedRecord

Fetch an artist by MusicBrainz ID.

Retrieves the artist profile with aliases, then transforms the response into a NormalizedRecord with ISNI identifiers and alternative name variants.

PARAMETER DESCRIPTION
mbid

MusicBrainz artist UUID.

TYPE: str

RETURNS DESCRIPTION
NormalizedRecord

Normalised artist with identifiers and alternative names.

RAISES DESCRIPTION
WebServiceError

If the MusicBrainz API returns an error after all retries.

Source code in src/music_attribution/etl/musicbrainz.py
async def fetch_artist(self, mbid: str) -> NormalizedRecord:
    """Fetch an artist by MusicBrainz ID.

    Retrieves the artist profile with aliases, then transforms the
    response into a ``NormalizedRecord`` with ISNI identifiers and
    alternative name variants.

    Parameters
    ----------
    mbid : str
        MusicBrainz artist UUID.

    Returns
    -------
    NormalizedRecord
        Normalised artist with identifiers and alternative names.

    Raises
    ------
    musicbrainzngs.WebServiceError
        If the MusicBrainz API returns an error after all retries.
    """
    data = await self._api_call(
        musicbrainzngs.get_artist_by_id,
        mbid,
        includes=["aliases"],
    )
    return self.transform_artist(data["artist"])

transform_recording

transform_recording(data: dict) -> NormalizedRecord

Transform a MusicBrainz recording response to a NormalizedRecord.

Extracts ISRC identifiers, artist relationships, and structured metadata (roles, release date, country, duration) from the raw API response. The source_confidence is set to 0.9 reflecting MusicBrainz's community-curated editorial quality.

PARAMETER DESCRIPTION
data

Raw MusicBrainz recording dictionary as returned by musicbrainzngs.get_recording_by_id().

TYPE: dict

RETURNS DESCRIPTION
NormalizedRecord

Normalised recording with identifiers, relationships, and source metadata.

Source code in src/music_attribution/etl/musicbrainz.py
def transform_recording(self, data: dict) -> NormalizedRecord:
    """Transform a MusicBrainz recording response to a NormalizedRecord.

    Extracts ISRC identifiers, artist relationships, and structured
    metadata (roles, release date, country, duration) from the raw
    API response.  The ``source_confidence`` is set to 0.9
    reflecting MusicBrainz's community-curated editorial quality.

    Parameters
    ----------
    data : dict
        Raw MusicBrainz recording dictionary as returned by
        ``musicbrainzngs.get_recording_by_id()``.

    Returns
    -------
    NormalizedRecord
        Normalised recording with identifiers, relationships, and
        source metadata.
    """
    isrc_list = data.get("isrc-list", [])
    isrc = isrc_list[0] if isrc_list else None

    relationships = self._extract_relationships(data)
    metadata = self._extract_recording_metadata(data)

    return NormalizedRecord(
        source=SourceEnum.MUSICBRAINZ,
        source_id=data["id"],
        entity_type=EntityTypeEnum.RECORDING,
        canonical_name=data.get("title", ""),
        identifiers=IdentifierBundle(isrc=isrc, mbid=data["id"]),
        metadata=metadata,
        relationships=relationships,
        fetch_timestamp=datetime.now(UTC),
        source_confidence=0.9,
        raw_payload=data,
    )

transform_artist

transform_artist(data: dict) -> NormalizedRecord

Transform a MusicBrainz artist response to a NormalizedRecord.

Extracts ISNI identifiers and alias-list name variants. The source_confidence is set to 0.9 reflecting MusicBrainz's community-curated editorial quality.

PARAMETER DESCRIPTION
data

Raw MusicBrainz artist dictionary as returned by musicbrainzngs.get_artist_by_id().

TYPE: dict

RETURNS DESCRIPTION
NormalizedRecord

Normalised artist with identifiers and alternative names.

Source code in src/music_attribution/etl/musicbrainz.py
def transform_artist(self, data: dict) -> NormalizedRecord:
    """Transform a MusicBrainz artist response to a NormalizedRecord.

    Extracts ISNI identifiers and alias-list name variants.  The
    ``source_confidence`` is set to 0.9 reflecting MusicBrainz's
    community-curated editorial quality.

    Parameters
    ----------
    data : dict
        Raw MusicBrainz artist dictionary as returned by
        ``musicbrainzngs.get_artist_by_id()``.

    Returns
    -------
    NormalizedRecord
        Normalised artist with identifiers and alternative names.
    """
    isni_list = data.get("isni-list", [])
    isni = isni_list[0] if isni_list else None

    alias_list = data.get("alias-list", [])
    alternative_names = [a.get("alias", "") for a in alias_list if a.get("alias")]

    return NormalizedRecord(
        source=SourceEnum.MUSICBRAINZ,
        source_id=data["id"],
        entity_type=EntityTypeEnum.ARTIST,
        canonical_name=data.get("name", ""),
        alternative_names=alternative_names,
        identifiers=IdentifierBundle(isni=isni, mbid=data["id"]),
        fetch_timestamp=datetime.now(UTC),
        source_confidence=0.9,
        raw_payload=data,
    )

Discogs Extractor

discogs

Discogs ETL connector.

Fetches release credits, artist profiles, and label information from the Discogs API and transforms raw API responses into NormalizedRecord boundary objects. Handles rate limiting (60 req/min authenticated, 25 req/min unauthenticated) and exponential-backoff retries.

Discogs is particularly valuable for detailed credit information (producer, engineer, mix, mastering) that is often missing from other sources. The source_confidence is set to 0.85, slightly below MusicBrainz, because Discogs data is user-contributed without the same editorial review process.

Notes

python3-discogs-client is synchronous — all API calls are wrapped in asyncio.to_thread() to avoid blocking the event loop.

The _ROLE_MAP translates Discogs credit role strings (e.g., "Producer", "Mixed By") into the normalised RelationshipTypeEnum. Discogs roles can be comma-separated (e.g., "Producer, Engineer"), so each role part is mapped independently.

DiscogsConnector

DiscogsConnector(
    user_agent: str,
    token: str | None = None,
    rate: float | None = None,
    max_retries: int = 3,
)

ETL connector for the Discogs API.

Provides async fetch_* methods that query the Discogs web service and transform_* methods that convert raw JSON responses into NormalizedRecord boundary objects.

PARAMETER DESCRIPTION
user_agent

User-Agent string for API compliance. Discogs requires a unique user-agent identifying the application.

TYPE: str

token

Discogs personal access token. When provided, the rate limit increases from 25 req/min to 60 req/min.

TYPE: str or None DEFAULT: None

rate

Override the requests-per-second cap. If None (default), the rate is auto-selected based on authentication status.

TYPE: float or None DEFAULT: None

max_retries

Maximum retry attempts on transient errors, by default 3. Uses exponential backoff (2^attempt seconds).

TYPE: int DEFAULT: 3

ATTRIBUTE DESCRIPTION
_authenticated

Whether a personal access token was provided.

TYPE: bool

_rate_limiter

Token-bucket limiter enforcing the per-second request cap.

TYPE: TokenBucketRateLimiter

Examples:

>>> connector = DiscogsConnector("MyApp/1.0", token="secret")
>>> records = await connector.fetch_release(12345)
>>> len(records) > 0
True
Source code in src/music_attribution/etl/discogs.py
def __init__(
    self,
    user_agent: str,
    token: str | None = None,
    rate: float | None = None,
    max_retries: int = 3,
) -> None:
    self._user_agent = user_agent
    self._max_retries = max_retries
    self._authenticated = token is not None

    if token:
        self._client = discogs_client.Client(user_agent, user_token=token)
    else:
        self._client = discogs_client.Client(user_agent)

    # Rate limits: 60/min auth (1/s), 25/min unauth (~0.42/s)
    if rate is not None:
        effective_rate = rate
    elif self._authenticated:
        effective_rate = 1.0  # 60 per minute
    else:
        effective_rate = 25.0 / 60.0  # ~0.42 per second

    self._rate_limiter = TokenBucketRateLimiter(rate=effective_rate, capacity=1)

fetch_release async

fetch_release(release_id: int) -> list[NormalizedRecord]

Fetch a release by Discogs ID.

Retrieves the release with full credits and tracklist, then transforms it into one NormalizedRecord per track plus one for the release itself.

PARAMETER DESCRIPTION
release_id

Discogs release ID (numeric).

TYPE: int

RETURNS DESCRIPTION
list[NormalizedRecord]

List containing one release-level record followed by one record per track.

RAISES DESCRIPTION
Exception

If the Discogs API returns an error after all retries.

Source code in src/music_attribution/etl/discogs.py
async def fetch_release(self, release_id: int) -> list[NormalizedRecord]:
    """Fetch a release by Discogs ID.

    Retrieves the release with full credits and tracklist, then
    transforms it into one ``NormalizedRecord`` per track plus one
    for the release itself.

    Parameters
    ----------
    release_id : int
        Discogs release ID (numeric).

    Returns
    -------
    list[NormalizedRecord]
        List containing one release-level record followed by one
        record per track.

    Raises
    ------
    Exception
        If the Discogs API returns an error after all retries.
    """
    release = await self._api_call(self._client.release, release_id)
    data = release.data
    return self.transform_release(data)

fetch_artist async

fetch_artist(artist_id: int) -> NormalizedRecord

Fetch an artist profile by Discogs ID.

Retrieves the artist profile with name variations and transforms it into a NormalizedRecord.

PARAMETER DESCRIPTION
artist_id

Discogs artist ID (numeric).

TYPE: int

RETURNS DESCRIPTION
NormalizedRecord

Normalised artist with alternative name variants.

RAISES DESCRIPTION
Exception

If the Discogs API returns an error after all retries.

Source code in src/music_attribution/etl/discogs.py
async def fetch_artist(self, artist_id: int) -> NormalizedRecord:
    """Fetch an artist profile by Discogs ID.

    Retrieves the artist profile with name variations and transforms
    it into a ``NormalizedRecord``.

    Parameters
    ----------
    artist_id : int
        Discogs artist ID (numeric).

    Returns
    -------
    NormalizedRecord
        Normalised artist with alternative name variants.

    Raises
    ------
    Exception
        If the Discogs API returns an error after all retries.
    """
    artist = await self._api_call(self._client.artist, artist_id)
    return self.transform_artist(artist.data)

search_releases async

search_releases(query: str) -> list[NormalizedRecord]

Search for releases by a free-text query string.

PARAMETER DESCRIPTION
query

Search query (e.g., artist name, album title).

TYPE: str

RETURNS DESCRIPTION
list[NormalizedRecord]

Flattened list of NormalizedRecord objects from all matching releases (one per track plus one per release).

RAISES DESCRIPTION
Exception

If the Discogs API returns an error after all retries.

Source code in src/music_attribution/etl/discogs.py
async def search_releases(self, query: str) -> list[NormalizedRecord]:
    """Search for releases by a free-text query string.

    Parameters
    ----------
    query : str
        Search query (e.g., artist name, album title).

    Returns
    -------
    list[NormalizedRecord]
        Flattened list of ``NormalizedRecord`` objects from all
        matching releases (one per track plus one per release).

    Raises
    ------
    Exception
        If the Discogs API returns an error after all retries.
    """
    results = await self._api_call(self._client.search, query, type="release")
    records = []
    for result in results:
        if hasattr(result, "data"):
            records.extend(self.transform_release(result.data))
    return records

transform_release

transform_release(data: dict) -> list[NormalizedRecord]

Transform a Discogs release response into NormalizedRecords.

Creates one NormalizedRecord for the release itself (EntityTypeEnum.RELEASE) and one per track (EntityTypeEnum.RECORDING). Release-level credits from extraartists are attached to the release record, while track-level credits are attached to each track record.

The source_confidence is set to 0.85 for all Discogs records.

PARAMETER DESCRIPTION
data

Raw Discogs release dictionary as returned by the python3-discogs-client.

TYPE: dict

RETURNS DESCRIPTION
list[NormalizedRecord]

Release record followed by per-track recording records.

Source code in src/music_attribution/etl/discogs.py
def transform_release(self, data: dict) -> list[NormalizedRecord]:
    """Transform a Discogs release response into NormalizedRecords.

    Creates one ``NormalizedRecord`` for the release itself
    (``EntityTypeEnum.RELEASE``) and one per track
    (``EntityTypeEnum.RECORDING``).  Release-level credits from
    ``extraartists`` are attached to the release record, while
    track-level credits are attached to each track record.

    The ``source_confidence`` is set to 0.85 for all Discogs
    records.

    Parameters
    ----------
    data : dict
        Raw Discogs release dictionary as returned by the
        ``python3-discogs-client``.

    Returns
    -------
    list[NormalizedRecord]
        Release record followed by per-track recording records.
    """
    records: list[NormalizedRecord] = []
    release_id = data.get("id", 0)
    release_title = data.get("title", "")
    release_year = data.get("year")
    release_country = data.get("country")

    # Release-level relationships from extraartists
    release_relationships = self._extract_relationships(data.get("extraartists", []))

    # Release-level record
    primary_artists = data.get("artists", [])
    artist_names = [a.get("name", "") for a in primary_artists if a.get("name")]

    release_record = NormalizedRecord(
        source=SourceEnum.DISCOGS,
        source_id=str(release_id),
        entity_type=EntityTypeEnum.RELEASE,
        canonical_name=release_title,
        identifiers=IdentifierBundle(discogs_id=release_id),
        metadata=SourceMetadata(
            roles=artist_names,
            release_date=str(release_year) if release_year else None,
            release_country=release_country,
        ),
        relationships=release_relationships,
        fetch_timestamp=datetime.now(UTC),
        source_confidence=0.85,
        raw_payload=data,
    )
    records.append(release_record)

    # Per-track records
    for track in data.get("tracklist", []):
        track_title = track.get("title", "")
        track_relationships = self._extract_relationships(track.get("extraartists", []))

        # Parse duration string like "4:19" to milliseconds
        duration_ms = self._parse_duration(track.get("duration", ""))

        track_record = NormalizedRecord(
            source=SourceEnum.DISCOGS,
            source_id=f"{release_id}-{track.get('position', '')}",
            entity_type=EntityTypeEnum.RECORDING,
            canonical_name=track_title,
            identifiers=IdentifierBundle(discogs_id=release_id),
            metadata=SourceMetadata(
                roles=artist_names,
                release_date=str(release_year) if release_year else None,
                release_country=release_country,
                duration_ms=duration_ms,
            ),
            relationships=track_relationships,
            fetch_timestamp=datetime.now(UTC),
            source_confidence=0.85,
            raw_payload=track,
        )
        records.append(track_record)

    return records

transform_artist

transform_artist(data: dict) -> NormalizedRecord

Transform a Discogs artist response into a NormalizedRecord.

Extracts the artist's canonical name and all name variations from the raw Discogs response.

PARAMETER DESCRIPTION
data

Raw Discogs artist dictionary.

TYPE: dict

RETURNS DESCRIPTION
NormalizedRecord

Normalised artist with alternative_names populated from Discogs namevariations.

Source code in src/music_attribution/etl/discogs.py
def transform_artist(self, data: dict) -> NormalizedRecord:
    """Transform a Discogs artist response into a NormalizedRecord.

    Extracts the artist's canonical name and all name variations
    from the raw Discogs response.

    Parameters
    ----------
    data : dict
        Raw Discogs artist dictionary.

    Returns
    -------
    NormalizedRecord
        Normalised artist with ``alternative_names`` populated from
        Discogs ``namevariations``.
    """
    name_variations = data.get("namevariations", [])
    alternative_names = [n for n in name_variations if n]

    return NormalizedRecord(
        source=SourceEnum.DISCOGS,
        source_id=str(data.get("id", 0)),
        entity_type=EntityTypeEnum.ARTIST,
        canonical_name=data.get("name", ""),
        alternative_names=alternative_names,
        identifiers=IdentifierBundle(discogs_id=data.get("id")),
        fetch_timestamp=datetime.now(UTC),
        source_confidence=0.85,
        raw_payload=data,
    )

AcoustID Extractor

acoustid

AcoustID fingerprint connector.

Audio fingerprinting via AcoustID / Chromaprint. Given an audio file, generates a Chromaprint fingerprint and looks it up against the AcoustID database to find matching MusicBrainz recording IDs. This answers the question "what recording is this?" — the first step before attribution metadata can be attached.

The source_confidence for AcoustID results is the AcoustID match score (0.0-1.0), which reflects the fingerprint similarity between the query audio and the database entry. High scores (> 0.9) indicate near- certain identification.

Notes

acoustid.fingerprint_file and acoustid.lookup are synchronous — they are wrapped in asyncio.to_thread() to avoid blocking the event loop.

AcoustID results link directly to MusicBrainz recording IDs (MBIDs), enabling seamless cross-referencing with the MusicBrainzConnector for full metadata enrichment.

AcoustIDConnector

AcoustIDConnector(
    api_key: str, rate: float = 3.0, max_retries: int = 3
)

ETL connector for the AcoustID fingerprint service.

Provides async methods for fingerprinting audio files and looking up fingerprints against the AcoustID database to resolve MusicBrainz recording IDs.

PARAMETER DESCRIPTION
api_key

AcoustID API key (obtain from https://acoustid.org/).

TYPE: str

rate

Maximum requests per second, by default 3.0 (AcoustID policy).

TYPE: float DEFAULT: 3.0

max_retries

Maximum retry attempts on transient API errors, by default 3. Uses exponential backoff (2^attempt seconds).

TYPE: int DEFAULT: 3

ATTRIBUTE DESCRIPTION
_rate_limiter

Token-bucket limiter enforcing the per-second request cap.

TYPE: TokenBucketRateLimiter

Examples:

>>> connector = AcoustIDConnector(api_key="your-key")
>>> fp, dur = await connector.fingerprint_file(Path("song.mp3"))
>>> records = await connector.lookup(fp, dur)
>>> records[0].source
<SourceEnum.ACOUSTID: 'acoustid'>
Source code in src/music_attribution/etl/acoustid.py
def __init__(
    self,
    api_key: str,
    rate: float = 3.0,
    max_retries: int = 3,
) -> None:
    self._api_key = api_key
    self._rate_limiter = TokenBucketRateLimiter(rate=rate, capacity=3)
    self._max_retries = max_retries

fingerprint_file async

fingerprint_file(file_path: Path) -> tuple[str, int]

Generate a Chromaprint fingerprint from an audio file.

Delegates to acoustid.fingerprint_file in a worker thread to avoid blocking the event loop.

PARAMETER DESCRIPTION
file_path

Path to the audio file (MP3, FLAC, WAV, OGG, etc.).

TYPE: Path

RETURNS DESCRIPTION
tuple[str, int]

A 2-tuple of (fingerprint_string, duration_in_seconds).

RAISES DESCRIPTION
FingerprintGenerationError

If Chromaprint cannot process the audio file.

Source code in src/music_attribution/etl/acoustid.py
async def fingerprint_file(self, file_path: Path) -> tuple[str, int]:
    """Generate a Chromaprint fingerprint from an audio file.

    Delegates to ``acoustid.fingerprint_file`` in a worker thread
    to avoid blocking the event loop.

    Parameters
    ----------
    file_path : Path
        Path to the audio file (MP3, FLAC, WAV, OGG, etc.).

    Returns
    -------
    tuple[str, int]
        A 2-tuple of ``(fingerprint_string, duration_in_seconds)``.

    Raises
    ------
    acoustid.FingerprintGenerationError
        If Chromaprint cannot process the audio file.
    """
    duration, fingerprint = await asyncio.to_thread(acoustid.fingerprint_file, str(file_path))
    return fingerprint, duration

lookup async

lookup(
    fingerprint: str, duration: int
) -> list[NormalizedRecord]

Look up a fingerprint against the AcoustID database.

Queries AcoustID with the fingerprint and duration, then transforms matched results into NormalizedRecord objects with MusicBrainz recording IDs for cross-referencing.

PARAMETER DESCRIPTION
fingerprint

Chromaprint fingerprint string generated by fingerprint_file().

TYPE: str

duration

Audio duration in seconds.

TYPE: int

RETURNS DESCRIPTION
list[NormalizedRecord]

Matching recordings sorted by confidence score (descending). Empty list if no matches are found.

RAISES DESCRIPTION
WebServiceError

If the AcoustID API returns an error after all retries.

Source code in src/music_attribution/etl/acoustid.py
async def lookup(self, fingerprint: str, duration: int) -> list[NormalizedRecord]:
    """Look up a fingerprint against the AcoustID database.

    Queries AcoustID with the fingerprint and duration, then
    transforms matched results into ``NormalizedRecord`` objects
    with MusicBrainz recording IDs for cross-referencing.

    Parameters
    ----------
    fingerprint : str
        Chromaprint fingerprint string generated by
        ``fingerprint_file()``.
    duration : int
        Audio duration in seconds.

    Returns
    -------
    list[NormalizedRecord]
        Matching recordings sorted by confidence score (descending).
        Empty list if no matches are found.

    Raises
    ------
    acoustid.WebServiceError
        If the AcoustID API returns an error after all retries.
    """
    response = await self._api_call(
        acoustid.lookup,
        self._api_key,
        fingerprint,
        duration,
        meta="recordings",
    )
    return self.transform_lookup_results(response)

transform_lookup_results

transform_lookup_results(
    response: dict,
) -> list[NormalizedRecord]

Transform an AcoustID lookup response into NormalizedRecords.

Each AcoustID result may contain multiple MusicBrainz recording matches. Every recording is emitted as a separate NormalizedRecord with:

  • source_confidence set to the AcoustID match score
  • identifiers.mbid linking to the MusicBrainz recording
  • identifiers.acoustid linking to the AcoustID entry
PARAMETER DESCRIPTION
response

Raw AcoustID API response containing a results list.

TYPE: dict

RETURNS DESCRIPTION
list[NormalizedRecord]

Normalised recordings sorted by source_confidence descending.

Source code in src/music_attribution/etl/acoustid.py
def transform_lookup_results(self, response: dict) -> list[NormalizedRecord]:
    """Transform an AcoustID lookup response into NormalizedRecords.

    Each AcoustID result may contain multiple MusicBrainz recording
    matches.  Every recording is emitted as a separate
    ``NormalizedRecord`` with:

    * ``source_confidence`` set to the AcoustID match score
    * ``identifiers.mbid`` linking to the MusicBrainz recording
    * ``identifiers.acoustid`` linking to the AcoustID entry

    Parameters
    ----------
    response : dict
        Raw AcoustID API response containing a ``results`` list.

    Returns
    -------
    list[NormalizedRecord]
        Normalised recordings sorted by ``source_confidence``
        descending.
    """
    records: list[NormalizedRecord] = []

    for result in response.get("results", []):
        score = result.get("score", 0.0)
        acoustid_id = result.get("id", "")

        for recording in result.get("recordings", []):
            mbid = recording.get("id", "")
            title = recording.get("title", "")
            duration = recording.get("duration")

            artists = recording.get("artists", [])
            artist_names = [a.get("name", "") for a in artists if a.get("name")]

            record = NormalizedRecord(
                source=SourceEnum.ACOUSTID,
                source_id=acoustid_id,
                entity_type=EntityTypeEnum.RECORDING,
                canonical_name=title,
                identifiers=IdentifierBundle(
                    mbid=mbid,
                    acoustid=acoustid_id,
                ),
                metadata=SourceMetadata(
                    roles=artist_names,
                    duration_ms=duration * 1000 if duration else None,
                ),
                fetch_timestamp=datetime.now(UTC),
                source_confidence=score,
                raw_payload=result,
            )
            records.append(record)

    # Sort by confidence descending
    records.sort(key=lambda r: r.source_confidence, reverse=True)
    return records

File Metadata Extractor

file_metadata

File metadata reader using tinytag.

Reads metadata (title, artist, album, duration, year) from audio files using tinytag (MIT-licensed, pure Python, zero dependencies) and produces NormalizedRecord objects for the ETL pipeline.

This is the lowest-confidence data source (source_confidence = 0.7) because embedded file metadata is often incomplete, inconsistent, or user-edited. It serves as a baseline that is enriched by higher- confidence sources (MusicBrainz, Discogs, AcoustID).

Notes

tinytag provides normalised high-level attributes only. Low-level ID3 frames (TIPL, TMCL, TSRC) are not available through tinytag:

  • ISRC: Not exposed by tinytag. Use AcoustID fingerprinting or MusicBrainz lookup to obtain ISRCs.
  • Credits (TIPL/TMCL): Not available. Detailed credits come from MusicBrainz and Discogs APIs instead.

Supported formats: MP3, M4A, WAV, OGG, FLAC, WMA, AIFF.

FileMetadataReader

Reads audio file metadata and produces NormalizedRecords.

A stateless reader that extracts embedded metadata from audio files using tinytag. When metadata cannot be read (corrupted file, DRM, unsupported codec), a minimal fallback record is produced using only the filename.

Supported formats: MP3, M4A, WAV, OGG, FLAC, WMA, AIFF.

Examples:

>>> reader = FileMetadataReader()
>>> record = reader.read(Path("hide_and_seek.mp3"))
>>> record.canonical_name
'Hide and Seek'
>>> record.source
<SourceEnum.FILE_METADATA: 'file_metadata'>

read

read(file_path: Path) -> NormalizedRecord

Read metadata from an audio file.

Extracts title, artist, album, year, and duration from the file's embedded tags. Falls back to a minimal record (using the filename stem as title) if tags cannot be read.

The source_confidence is set to 0.7 for successfully read files and 0.3 for fallback records.

PARAMETER DESCRIPTION
file_path

Path to the audio file.

TYPE: Path

RETURNS DESCRIPTION
NormalizedRecord

Normalised recording with extracted metadata. The identifiers bundle will be empty (no ISRC from tinytag).

Source code in src/music_attribution/etl/file_metadata.py
def read(self, file_path: Path) -> NormalizedRecord:
    """Read metadata from an audio file.

    Extracts title, artist, album, year, and duration from the
    file's embedded tags.  Falls back to a minimal record (using
    the filename stem as title) if tags cannot be read.

    The ``source_confidence`` is set to 0.7 for successfully read
    files and 0.3 for fallback records.

    Parameters
    ----------
    file_path : Path
        Path to the audio file.

    Returns
    -------
    NormalizedRecord
        Normalised recording with extracted metadata.  The
        ``identifiers`` bundle will be empty (no ISRC from tinytag).
    """
    try:
        tag = TinyTag.get(str(file_path))
    except Exception:
        logger.warning("Failed to read metadata from %s", file_path)
        return self._minimal_record(file_path)

    title = tag.title or file_path.stem
    artist = tag.artist or ""
    album = tag.album or ""
    year = tag.year
    duration = tag.duration

    # TODO: ISRC not available in tinytag — add via raw ID3 parser if needed
    # TODO: TIPL credits not available in tinytag — credits come from MusicBrainz/Discogs APIs

    return NormalizedRecord(
        source=SourceEnum.FILE_METADATA,
        source_id=str(file_path),
        entity_type=EntityTypeEnum.RECORDING,
        canonical_name=title,
        identifiers=IdentifierBundle(),
        metadata=SourceMetadata(
            roles=[artist] if artist else [],
            release_date=str(year) if year else None,
            duration_ms=int(duration * 1000) if duration else None,
        ),
        relationships=[],
        fetch_timestamp=datetime.now(UTC),
        source_confidence=0.7,
        raw_payload={"file_path": str(file_path), "album": album},
    )

Quality Gate

quality_gate

Data quality gate for NormalizedRecord batches.

Validates batches of NormalizedRecord objects before they are passed to the Entity Resolution pipeline. Checks statistical properties beyond what Pydantic field validators can cover, including:

  • Identifier coverage — what fraction of records carry at least one standard identifier (ISRC, MBID, ISNI, etc.)
  • Duplicate detection — same (source, source_id) appearing more than once in the batch
  • Source distribution — whether the batch is dangerously skewed toward a single data source

These checks form a quality firewall between the ETL layer and the downstream entity resolution layer, preventing garbage-in/garbage-out propagation through the attribution pipeline.

Notes

The gate operates in two modes:

  1. Reporting (validate_batch) — produces a QualityReport without modifying the input.
  2. Enforcement (enforce) — raises ValueError on critical failures and removes duplicates on success.

QualityCheckResult

Bases: BaseModel

Result of a single quality check.

ATTRIBUTE DESCRIPTION
check_name

Machine-readable name of the check (e.g., "identifier_coverage").

TYPE: str

status

Outcome of the check.

TYPE: {'pass', 'warn', 'fail'}

message

Human-readable description of the result.

TYPE: str

metric_value

Numeric metric associated with the check (e.g., coverage fraction), or None if not applicable.

TYPE: float or None

QualityReport

Bases: BaseModel

Aggregate quality report for a batch of NormalizedRecords.

ATTRIBUTE DESCRIPTION
batch_id

Unique identifier for this quality report.

TYPE: UUID

checks

Individual check results.

TYPE: list[QualityCheckResult]

overall_status

Worst status across all checks (fail > warn > pass).

TYPE: {'pass', 'warn', 'fail'}

records_in

Number of records in the input batch.

TYPE: int

records_passed

Number of unique (non-duplicate) records.

TYPE: int

timestamp

UTC timestamp when the report was generated.

TYPE: datetime

DataQualityGate

DataQualityGate(
    min_identifier_coverage: float = 0.5,
    max_single_source_fraction: float = 0.95,
)

Validates batches of NormalizedRecords before entity resolution.

Runs a configurable series of quality checks and produces a QualityReport. The gate can operate in report-only mode (validate_batch) or enforcement mode (enforce) which raises on critical failures.

PARAMETER DESCRIPTION
min_identifier_coverage

Minimum fraction of records that must have at least one standard identifier (ISRC, MBID, etc.), by default 0.5. Below this threshold the check emits a warning; zero coverage is a hard failure.

TYPE: float DEFAULT: 0.5

max_single_source_fraction

Maximum fraction of records allowed from a single data source, by default 0.95. If 100% of records come from one source, a warning is emitted (not a failure, since single-source batches are valid for targeted fetches).

TYPE: float DEFAULT: 0.95

Examples:

>>> gate = DataQualityGate(min_identifier_coverage=0.6)
>>> report = gate.validate_batch(records)
>>> report.overall_status
'pass'
Source code in src/music_attribution/etl/quality_gate.py
def __init__(
    self,
    min_identifier_coverage: float = 0.5,
    max_single_source_fraction: float = 0.95,
) -> None:
    self._min_identifier_coverage = min_identifier_coverage
    self._max_single_source_fraction = max_single_source_fraction

validate_batch

validate_batch(
    records: list[NormalizedRecord],
) -> QualityReport

Validate a batch of NormalizedRecords (report-only mode).

Runs all quality checks and returns a QualityReport without modifying the input batch.

PARAMETER DESCRIPTION
records

Batch of NormalizedRecords to validate.

TYPE: list[NormalizedRecord]

RETURNS DESCRIPTION
QualityReport

Report containing individual check results, overall status, and record counts.

Source code in src/music_attribution/etl/quality_gate.py
def validate_batch(self, records: list[NormalizedRecord]) -> QualityReport:
    """Validate a batch of NormalizedRecords (report-only mode).

    Runs all quality checks and returns a ``QualityReport`` without
    modifying the input batch.

    Parameters
    ----------
    records : list[NormalizedRecord]
        Batch of NormalizedRecords to validate.

    Returns
    -------
    QualityReport
        Report containing individual check results, overall status,
        and record counts.
    """
    checks: list[QualityCheckResult] = []

    checks.append(self._check_identifier_coverage(records))
    checks.append(self._check_no_duplicates(records))
    checks.append(self._check_source_distribution(records))

    # Determine overall status
    statuses = [c.status for c in checks]
    overall: Literal["pass", "warn", "fail"]
    if "fail" in statuses:
        overall = "fail"
    elif "warn" in statuses:
        overall = "warn"
    else:
        overall = "pass"

    # Count records that would pass (all non-duplicate records with identifiers)
    seen = set()
    passed = 0
    for r in records:
        key = (r.source, r.source_id)
        if key not in seen:
            seen.add(key)
            passed += 1

    return QualityReport(
        checks=checks,
        overall_status=overall,
        records_in=len(records),
        records_passed=passed,
    )

enforce

enforce(
    records: list[NormalizedRecord],
) -> list[NormalizedRecord]

Validate and filter a batch, raising on critical failures.

First runs validate_batch(). If the overall status is "fail", raises ValueError with details of the failing checks. Otherwise, returns the deduplicated record list.

PARAMETER DESCRIPTION
records

Batch of NormalizedRecords to validate and filter.

TYPE: list[NormalizedRecord]

RETURNS DESCRIPTION
list[NormalizedRecord]

Validated records with duplicates (same source + source_id) removed, preserving first-seen order.

RAISES DESCRIPTION
ValueError

If any quality check has status "fail" (e.g., zero identifier coverage or duplicate records).

Source code in src/music_attribution/etl/quality_gate.py
def enforce(self, records: list[NormalizedRecord]) -> list[NormalizedRecord]:
    """Validate and filter a batch, raising on critical failures.

    First runs ``validate_batch()``.  If the overall status is
    ``"fail"``, raises ``ValueError`` with details of the failing
    checks.  Otherwise, returns the deduplicated record list.

    Parameters
    ----------
    records : list[NormalizedRecord]
        Batch of NormalizedRecords to validate and filter.

    Returns
    -------
    list[NormalizedRecord]
        Validated records with duplicates (same ``source`` +
        ``source_id``) removed, preserving first-seen order.

    Raises
    ------
    ValueError
        If any quality check has status ``"fail"`` (e.g., zero
        identifier coverage or duplicate records).
    """
    report = self.validate_batch(records)
    if report.overall_status == "fail":
        failures = [c for c in report.checks if c.status == "fail"]
        msg = "; ".join(f"{c.check_name}: {c.message}" for c in failures)
        raise ValueError(f"Batch failed quality gate: {msg}")

    # Remove duplicates
    seen: set[tuple] = set()
    unique: list[NormalizedRecord] = []
    for r in records:
        key = (r.source, r.source_id)
        if key not in seen:
            seen.add(key)
            unique.append(r)

    return unique

Rate Limiter

rate_limiter

Token-bucket rate limiter for external API compliance.

Provides an async-safe TokenBucketRateLimiter that enforces per-source request rate limits to avoid API bans. Used by all ETL connectors (MusicBrainz, Discogs, AcoustID) to throttle outbound requests.

The token-bucket algorithm allows controlled bursts (up to capacity requests) while maintaining a steady-state rate of rate requests per second. Each acquire() call blocks until a token is available.

Notes

The implementation uses time.monotonic() for clock-skew-resistant timing and asyncio.Lock() for thread-safety within the async event loop.

TokenBucketRateLimiter

TokenBucketRateLimiter(
    rate: float = 1.0, capacity: int = 1
)

Async token-bucket rate limiter.

Enforces a maximum request rate to comply with external API rate limits. Common configurations:

  • MusicBrainz: rate=1.0, capacity=1 (1 req/s)
  • Discogs authenticated: rate=1.0, capacity=1 (60 req/min)
  • Discogs unauthenticated: rate=0.42, capacity=1 (25 req/min)
  • AcoustID: rate=3.0, capacity=3 (3 req/s)
PARAMETER DESCRIPTION
rate

Tokens added per second (steady-state request rate), by default 1.0.

TYPE: float DEFAULT: 1.0

capacity

Maximum tokens in the bucket (burst size), by default 1.

TYPE: int DEFAULT: 1

ATTRIBUTE DESCRIPTION
_tokens

Current number of available tokens.

TYPE: float

_last_refill

Monotonic timestamp of the last token refill.

TYPE: float

Examples:

>>> limiter = TokenBucketRateLimiter(rate=1.0, capacity=1)
>>> await limiter.acquire()  # returns immediately if bucket is full
True
Source code in src/music_attribution/etl/rate_limiter.py
def __init__(self, rate: float = 1.0, capacity: int = 1) -> None:
    self._rate = rate
    self._capacity = capacity
    self._tokens = float(capacity)
    self._last_refill = time.monotonic()
    self._lock = asyncio.Lock()

acquire async

acquire() -> bool

Acquire a token, waiting if the bucket is empty.

If no token is immediately available, calculates the minimum wait time for the next refill and sleeps asynchronously. The method is serialised via an asyncio.Lock to prevent race conditions between concurrent coroutines.

RETURNS DESCRIPTION
bool

Always True once a token has been successfully acquired.

Source code in src/music_attribution/etl/rate_limiter.py
async def acquire(self) -> bool:
    """Acquire a token, waiting if the bucket is empty.

    If no token is immediately available, calculates the minimum
    wait time for the next refill and sleeps asynchronously.  The
    method is serialised via an ``asyncio.Lock`` to prevent race
    conditions between concurrent coroutines.

    Returns
    -------
    bool
        Always ``True`` once a token has been successfully acquired.
    """
    async with self._lock:
        self._refill()
        if self._tokens >= 1.0:
            self._tokens -= 1.0
            return True
        wait_time = (1.0 - self._tokens) / self._rate
        await asyncio.sleep(wait_time)
        self._refill()
        self._tokens -= 1.0
        return True