ETL Pipeline¶
Extract-Transform-Load pipeline: raw data from 5 sources → NormalizedRecord.
MusicBrainz Extractor¶
musicbrainz
¶
MusicBrainz ETL connector.
Fetches recordings, works, artists, and relationships from the MusicBrainz
API and transforms raw API responses into NormalizedRecord boundary
objects. Handles rate limiting (1 req/s per MusicBrainz policy),
pagination, and exponential-backoff retries.
MusicBrainz is the highest-confidence open data source (source_confidence
= 0.9) because it is community-curated with editorial review. It provides
ISRC, ISWC, and ISNI identifiers that map directly to the A0-A3 assurance
levels described in the companion paper (Section 3).
Notes
musicbrainzngs is synchronous — all API calls are wrapped in
asyncio.to_thread() to avoid blocking the event loop.
The _RELATION_TYPE_MAP translates MusicBrainz relationship type
strings (e.g., "producer", "lyricist") into the normalised
RelationshipTypeEnum used throughout the attribution pipeline.
MusicBrainzConnector
¶
ETL connector for the MusicBrainz API.
Provides async fetch_* methods that query the MusicBrainz web
service and transform_* methods that convert raw JSON responses
into NormalizedRecord boundary objects for the downstream entity
resolution pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
user_agent
|
User-Agent string for API compliance. Must include the
application name, version, and contact email (e.g.,
TYPE:
|
rate
|
Maximum requests per second, by default 1.0 (MusicBrainz policy).
TYPE:
|
max_retries
|
Maximum retry attempts on transient API errors, by default 3. Uses exponential backoff (2^attempt seconds).
TYPE:
|
| ATTRIBUTE | DESCRIPTION |
|---|---|
_rate_limiter |
Token-bucket limiter enforcing the per-second request cap.
TYPE:
|
Examples:
>>> connector = MusicBrainzConnector("MusicAttribution/0.1.0 (user@example.com)")
>>> record = await connector.fetch_recording("some-mbid")
>>> record.source
<SourceEnum.MUSICBRAINZ: 'musicbrainz'>
Source code in src/music_attribution/etl/musicbrainz.py
fetch_recording
async
¶
fetch_recording(mbid: str) -> NormalizedRecord
Fetch a recording by MusicBrainz ID.
Retrieves the recording with artist credits, ISRCs, releases,
and artist relationships, then transforms the response into a
NormalizedRecord.
| PARAMETER | DESCRIPTION |
|---|---|
mbid
|
MusicBrainz recording UUID (e.g.,
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NormalizedRecord
|
Normalised recording with relationships and identifiers. |
| RAISES | DESCRIPTION |
|---|---|
WebServiceError
|
If the MusicBrainz API returns an error after all retries. |
Source code in src/music_attribution/etl/musicbrainz.py
fetch_artist
async
¶
fetch_artist(mbid: str) -> NormalizedRecord
Fetch an artist by MusicBrainz ID.
Retrieves the artist profile with aliases, then transforms the
response into a NormalizedRecord with ISNI identifiers and
alternative name variants.
| PARAMETER | DESCRIPTION |
|---|---|
mbid
|
MusicBrainz artist UUID.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NormalizedRecord
|
Normalised artist with identifiers and alternative names. |
| RAISES | DESCRIPTION |
|---|---|
WebServiceError
|
If the MusicBrainz API returns an error after all retries. |
Source code in src/music_attribution/etl/musicbrainz.py
transform_recording
¶
transform_recording(data: dict) -> NormalizedRecord
Transform a MusicBrainz recording response to a NormalizedRecord.
Extracts ISRC identifiers, artist relationships, and structured
metadata (roles, release date, country, duration) from the raw
API response. The source_confidence is set to 0.9
reflecting MusicBrainz's community-curated editorial quality.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
Raw MusicBrainz recording dictionary as returned by
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NormalizedRecord
|
Normalised recording with identifiers, relationships, and source metadata. |
Source code in src/music_attribution/etl/musicbrainz.py
transform_artist
¶
transform_artist(data: dict) -> NormalizedRecord
Transform a MusicBrainz artist response to a NormalizedRecord.
Extracts ISNI identifiers and alias-list name variants. The
source_confidence is set to 0.9 reflecting MusicBrainz's
community-curated editorial quality.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
Raw MusicBrainz artist dictionary as returned by
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NormalizedRecord
|
Normalised artist with identifiers and alternative names. |
Source code in src/music_attribution/etl/musicbrainz.py
Discogs Extractor¶
discogs
¶
Discogs ETL connector.
Fetches release credits, artist profiles, and label information from the
Discogs API and transforms raw API responses into NormalizedRecord
boundary objects. Handles rate limiting (60 req/min authenticated,
25 req/min unauthenticated) and exponential-backoff retries.
Discogs is particularly valuable for detailed credit information (producer,
engineer, mix, mastering) that is often missing from other sources. The
source_confidence is set to 0.85, slightly below MusicBrainz, because
Discogs data is user-contributed without the same editorial review process.
Notes
python3-discogs-client is synchronous — all API calls are wrapped in
asyncio.to_thread() to avoid blocking the event loop.
The _ROLE_MAP translates Discogs credit role strings (e.g.,
"Producer", "Mixed By") into the normalised
RelationshipTypeEnum. Discogs roles can be comma-separated (e.g.,
"Producer, Engineer"), so each role part is mapped independently.
DiscogsConnector
¶
DiscogsConnector(
user_agent: str,
token: str | None = None,
rate: float | None = None,
max_retries: int = 3,
)
ETL connector for the Discogs API.
Provides async fetch_* methods that query the Discogs web
service and transform_* methods that convert raw JSON responses
into NormalizedRecord boundary objects.
| PARAMETER | DESCRIPTION |
|---|---|
user_agent
|
User-Agent string for API compliance. Discogs requires a unique user-agent identifying the application.
TYPE:
|
token
|
Discogs personal access token. When provided, the rate limit increases from 25 req/min to 60 req/min.
TYPE:
|
rate
|
Override the requests-per-second cap. If
TYPE:
|
max_retries
|
Maximum retry attempts on transient errors, by default 3. Uses exponential backoff (2^attempt seconds).
TYPE:
|
| ATTRIBUTE | DESCRIPTION |
|---|---|
_authenticated |
Whether a personal access token was provided.
TYPE:
|
_rate_limiter |
Token-bucket limiter enforcing the per-second request cap.
TYPE:
|
Examples:
>>> connector = DiscogsConnector("MyApp/1.0", token="secret")
>>> records = await connector.fetch_release(12345)
>>> len(records) > 0
True
Source code in src/music_attribution/etl/discogs.py
fetch_release
async
¶
fetch_release(release_id: int) -> list[NormalizedRecord]
Fetch a release by Discogs ID.
Retrieves the release with full credits and tracklist, then
transforms it into one NormalizedRecord per track plus one
for the release itself.
| PARAMETER | DESCRIPTION |
|---|---|
release_id
|
Discogs release ID (numeric).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[NormalizedRecord]
|
List containing one release-level record followed by one record per track. |
| RAISES | DESCRIPTION |
|---|---|
Exception
|
If the Discogs API returns an error after all retries. |
Source code in src/music_attribution/etl/discogs.py
fetch_artist
async
¶
fetch_artist(artist_id: int) -> NormalizedRecord
Fetch an artist profile by Discogs ID.
Retrieves the artist profile with name variations and transforms
it into a NormalizedRecord.
| PARAMETER | DESCRIPTION |
|---|---|
artist_id
|
Discogs artist ID (numeric).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NormalizedRecord
|
Normalised artist with alternative name variants. |
| RAISES | DESCRIPTION |
|---|---|
Exception
|
If the Discogs API returns an error after all retries. |
Source code in src/music_attribution/etl/discogs.py
search_releases
async
¶
search_releases(query: str) -> list[NormalizedRecord]
Search for releases by a free-text query string.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Search query (e.g., artist name, album title).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[NormalizedRecord]
|
Flattened list of |
| RAISES | DESCRIPTION |
|---|---|
Exception
|
If the Discogs API returns an error after all retries. |
Source code in src/music_attribution/etl/discogs.py
transform_release
¶
transform_release(data: dict) -> list[NormalizedRecord]
Transform a Discogs release response into NormalizedRecords.
Creates one NormalizedRecord for the release itself
(EntityTypeEnum.RELEASE) and one per track
(EntityTypeEnum.RECORDING). Release-level credits from
extraartists are attached to the release record, while
track-level credits are attached to each track record.
The source_confidence is set to 0.85 for all Discogs
records.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
Raw Discogs release dictionary as returned by the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[NormalizedRecord]
|
Release record followed by per-track recording records. |
Source code in src/music_attribution/etl/discogs.py
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 | |
transform_artist
¶
transform_artist(data: dict) -> NormalizedRecord
Transform a Discogs artist response into a NormalizedRecord.
Extracts the artist's canonical name and all name variations from the raw Discogs response.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
Raw Discogs artist dictionary.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NormalizedRecord
|
Normalised artist with |
Source code in src/music_attribution/etl/discogs.py
AcoustID Extractor¶
acoustid
¶
AcoustID fingerprint connector.
Audio fingerprinting via AcoustID / Chromaprint. Given an audio file, generates a Chromaprint fingerprint and looks it up against the AcoustID database to find matching MusicBrainz recording IDs. This answers the question "what recording is this?" — the first step before attribution metadata can be attached.
The source_confidence for AcoustID results is the AcoustID match
score (0.0-1.0), which reflects the fingerprint similarity between the
query audio and the database entry. High scores (> 0.9) indicate near-
certain identification.
Notes
acoustid.fingerprint_file and acoustid.lookup are synchronous —
they are wrapped in asyncio.to_thread() to avoid blocking the event
loop.
AcoustID results link directly to MusicBrainz recording IDs (MBIDs),
enabling seamless cross-referencing with the MusicBrainzConnector
for full metadata enrichment.
AcoustIDConnector
¶
ETL connector for the AcoustID fingerprint service.
Provides async methods for fingerprinting audio files and looking up fingerprints against the AcoustID database to resolve MusicBrainz recording IDs.
| PARAMETER | DESCRIPTION |
|---|---|
api_key
|
AcoustID API key (obtain from https://acoustid.org/).
TYPE:
|
rate
|
Maximum requests per second, by default 3.0 (AcoustID policy).
TYPE:
|
max_retries
|
Maximum retry attempts on transient API errors, by default 3. Uses exponential backoff (2^attempt seconds).
TYPE:
|
| ATTRIBUTE | DESCRIPTION |
|---|---|
_rate_limiter |
Token-bucket limiter enforcing the per-second request cap.
TYPE:
|
Examples:
>>> connector = AcoustIDConnector(api_key="your-key")
>>> fp, dur = await connector.fingerprint_file(Path("song.mp3"))
>>> records = await connector.lookup(fp, dur)
>>> records[0].source
<SourceEnum.ACOUSTID: 'acoustid'>
Source code in src/music_attribution/etl/acoustid.py
fingerprint_file
async
¶
Generate a Chromaprint fingerprint from an audio file.
Delegates to acoustid.fingerprint_file in a worker thread
to avoid blocking the event loop.
| PARAMETER | DESCRIPTION |
|---|---|
file_path
|
Path to the audio file (MP3, FLAC, WAV, OGG, etc.).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[str, int]
|
A 2-tuple of |
| RAISES | DESCRIPTION |
|---|---|
FingerprintGenerationError
|
If Chromaprint cannot process the audio file. |
Source code in src/music_attribution/etl/acoustid.py
lookup
async
¶
lookup(
fingerprint: str, duration: int
) -> list[NormalizedRecord]
Look up a fingerprint against the AcoustID database.
Queries AcoustID with the fingerprint and duration, then
transforms matched results into NormalizedRecord objects
with MusicBrainz recording IDs for cross-referencing.
| PARAMETER | DESCRIPTION |
|---|---|
fingerprint
|
Chromaprint fingerprint string generated by
TYPE:
|
duration
|
Audio duration in seconds.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[NormalizedRecord]
|
Matching recordings sorted by confidence score (descending). Empty list if no matches are found. |
| RAISES | DESCRIPTION |
|---|---|
WebServiceError
|
If the AcoustID API returns an error after all retries. |
Source code in src/music_attribution/etl/acoustid.py
transform_lookup_results
¶
transform_lookup_results(
response: dict,
) -> list[NormalizedRecord]
Transform an AcoustID lookup response into NormalizedRecords.
Each AcoustID result may contain multiple MusicBrainz recording
matches. Every recording is emitted as a separate
NormalizedRecord with:
source_confidenceset to the AcoustID match scoreidentifiers.mbidlinking to the MusicBrainz recordingidentifiers.acoustidlinking to the AcoustID entry
| PARAMETER | DESCRIPTION |
|---|---|
response
|
Raw AcoustID API response containing a
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[NormalizedRecord]
|
Normalised recordings sorted by |
Source code in src/music_attribution/etl/acoustid.py
File Metadata Extractor¶
file_metadata
¶
File metadata reader using tinytag.
Reads metadata (title, artist, album, duration, year) from audio files
using tinytag (MIT-licensed, pure Python, zero dependencies) and produces
NormalizedRecord objects for the ETL pipeline.
This is the lowest-confidence data source (source_confidence = 0.7)
because embedded file metadata is often incomplete, inconsistent, or
user-edited. It serves as a baseline that is enriched by higher-
confidence sources (MusicBrainz, Discogs, AcoustID).
Notes
tinytag provides normalised high-level attributes only. Low-level ID3 frames (TIPL, TMCL, TSRC) are not available through tinytag:
- ISRC: Not exposed by tinytag. Use AcoustID fingerprinting or MusicBrainz lookup to obtain ISRCs.
- Credits (TIPL/TMCL): Not available. Detailed credits come from MusicBrainz and Discogs APIs instead.
Supported formats: MP3, M4A, WAV, OGG, FLAC, WMA, AIFF.
FileMetadataReader
¶
Reads audio file metadata and produces NormalizedRecords.
A stateless reader that extracts embedded metadata from audio files using tinytag. When metadata cannot be read (corrupted file, DRM, unsupported codec), a minimal fallback record is produced using only the filename.
Supported formats: MP3, M4A, WAV, OGG, FLAC, WMA, AIFF.
Examples:
>>> reader = FileMetadataReader()
>>> record = reader.read(Path("hide_and_seek.mp3"))
>>> record.canonical_name
'Hide and Seek'
>>> record.source
<SourceEnum.FILE_METADATA: 'file_metadata'>
read
¶
read(file_path: Path) -> NormalizedRecord
Read metadata from an audio file.
Extracts title, artist, album, year, and duration from the file's embedded tags. Falls back to a minimal record (using the filename stem as title) if tags cannot be read.
The source_confidence is set to 0.7 for successfully read
files and 0.3 for fallback records.
| PARAMETER | DESCRIPTION |
|---|---|
file_path
|
Path to the audio file.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NormalizedRecord
|
Normalised recording with extracted metadata. The
|
Source code in src/music_attribution/etl/file_metadata.py
Quality Gate¶
quality_gate
¶
Data quality gate for NormalizedRecord batches.
Validates batches of NormalizedRecord objects before they are passed
to the Entity Resolution pipeline. Checks statistical properties beyond
what Pydantic field validators can cover, including:
- Identifier coverage — what fraction of records carry at least one standard identifier (ISRC, MBID, ISNI, etc.)
- Duplicate detection — same
(source, source_id)appearing more than once in the batch - Source distribution — whether the batch is dangerously skewed toward a single data source
These checks form a quality firewall between the ETL layer and the downstream entity resolution layer, preventing garbage-in/garbage-out propagation through the attribution pipeline.
Notes
The gate operates in two modes:
- Reporting (
validate_batch) — produces aQualityReportwithout modifying the input. - Enforcement (
enforce) — raisesValueErroron critical failures and removes duplicates on success.
QualityCheckResult
¶
Bases: BaseModel
Result of a single quality check.
| ATTRIBUTE | DESCRIPTION |
|---|---|
check_name |
Machine-readable name of the check (e.g.,
TYPE:
|
status |
Outcome of the check.
TYPE:
|
message |
Human-readable description of the result.
TYPE:
|
metric_value |
Numeric metric associated with the check (e.g., coverage
fraction), or
TYPE:
|
QualityReport
¶
Bases: BaseModel
Aggregate quality report for a batch of NormalizedRecords.
| ATTRIBUTE | DESCRIPTION |
|---|---|
batch_id |
Unique identifier for this quality report.
TYPE:
|
checks |
Individual check results.
TYPE:
|
overall_status |
Worst status across all checks (
TYPE:
|
records_in |
Number of records in the input batch.
TYPE:
|
records_passed |
Number of unique (non-duplicate) records.
TYPE:
|
timestamp |
UTC timestamp when the report was generated.
TYPE:
|
DataQualityGate
¶
Validates batches of NormalizedRecords before entity resolution.
Runs a configurable series of quality checks and produces a
QualityReport. The gate can operate in report-only mode
(validate_batch) or enforcement mode (enforce) which
raises on critical failures.
| PARAMETER | DESCRIPTION |
|---|---|
min_identifier_coverage
|
Minimum fraction of records that must have at least one standard identifier (ISRC, MBID, etc.), by default 0.5. Below this threshold the check emits a warning; zero coverage is a hard failure.
TYPE:
|
max_single_source_fraction
|
Maximum fraction of records allowed from a single data source, by default 0.95. If 100% of records come from one source, a warning is emitted (not a failure, since single-source batches are valid for targeted fetches).
TYPE:
|
Examples:
>>> gate = DataQualityGate(min_identifier_coverage=0.6)
>>> report = gate.validate_batch(records)
>>> report.overall_status
'pass'
Source code in src/music_attribution/etl/quality_gate.py
validate_batch
¶
validate_batch(
records: list[NormalizedRecord],
) -> QualityReport
Validate a batch of NormalizedRecords (report-only mode).
Runs all quality checks and returns a QualityReport without
modifying the input batch.
| PARAMETER | DESCRIPTION |
|---|---|
records
|
Batch of NormalizedRecords to validate.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
QualityReport
|
Report containing individual check results, overall status, and record counts. |
Source code in src/music_attribution/etl/quality_gate.py
enforce
¶
enforce(
records: list[NormalizedRecord],
) -> list[NormalizedRecord]
Validate and filter a batch, raising on critical failures.
First runs validate_batch(). If the overall status is
"fail", raises ValueError with details of the failing
checks. Otherwise, returns the deduplicated record list.
| PARAMETER | DESCRIPTION |
|---|---|
records
|
Batch of NormalizedRecords to validate and filter.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[NormalizedRecord]
|
Validated records with duplicates (same |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If any quality check has status |
Source code in src/music_attribution/etl/quality_gate.py
Rate Limiter¶
rate_limiter
¶
Token-bucket rate limiter for external API compliance.
Provides an async-safe TokenBucketRateLimiter that enforces per-source
request rate limits to avoid API bans. Used by all ETL connectors
(MusicBrainz, Discogs, AcoustID) to throttle outbound requests.
The token-bucket algorithm allows controlled bursts (up to capacity
requests) while maintaining a steady-state rate of rate requests per
second. Each acquire() call blocks until a token is available.
Notes
The implementation uses time.monotonic() for clock-skew-resistant
timing and asyncio.Lock() for thread-safety within the async
event loop.
TokenBucketRateLimiter
¶
Async token-bucket rate limiter.
Enforces a maximum request rate to comply with external API rate limits. Common configurations:
- MusicBrainz:
rate=1.0, capacity=1(1 req/s) - Discogs authenticated:
rate=1.0, capacity=1(60 req/min) - Discogs unauthenticated:
rate=0.42, capacity=1(25 req/min) - AcoustID:
rate=3.0, capacity=3(3 req/s)
| PARAMETER | DESCRIPTION |
|---|---|
rate
|
Tokens added per second (steady-state request rate), by default 1.0.
TYPE:
|
capacity
|
Maximum tokens in the bucket (burst size), by default 1.
TYPE:
|
| ATTRIBUTE | DESCRIPTION |
|---|---|
_tokens |
Current number of available tokens.
TYPE:
|
_last_refill |
Monotonic timestamp of the last token refill.
TYPE:
|
Examples:
>>> limiter = TokenBucketRateLimiter(rate=1.0, capacity=1)
>>> await limiter.acquire() # returns immediately if bucket is full
True
Source code in src/music_attribution/etl/rate_limiter.py
acquire
async
¶
Acquire a token, waiting if the bucket is empty.
If no token is immediately available, calculates the minimum
wait time for the next refill and sleeps asynchronously. The
method is serialised via an asyncio.Lock to prevent race
conditions between concurrent coroutines.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
Always |