Skip to content

Chat Agent

PydanticAI agent with 4 tools, streamed via AG-UI protocol.

Agent

agent

PydanticAI attribution agent with domain tools.

Provides four domain tools for CopilotKit frontend interaction via the AG-UI (Agent-GUI) protocol. The agent acts as a conversational interface to the attribution scaffold, enabling artists, managers, and musicologists to query, review, and improve attribution records through natural language.

Tools

explain_confidence Explain why a work has a given confidence score by decomposing source agreement, corroborating sources, and assurance level. search_attributions Search across attribution records by title, artist, or keyword using the hybrid search service (text + vector + graph). suggest_correction Propose a correction to a specific field on an attribution record, generating a CorrectionPreview for frontend review. submit_feedback Submit a structured FeedbackCard for an attribution record, including overall assessment and optional free-text notes.

Notes

The agent uses PydanticAI's Agent class with dependency injection (AgentDeps) for database access. The system prompt encodes domain knowledge about A0-A3 assurance levels, the Oracle Problem, and conformal prediction (see Teikari 2026, Sections 3-5).

The model identifier is loaded lazily from Settings.attribution_agent_model so that import-time side effects are avoided and tests can mock the model.

See Also

music_attribution.chat.state : Shared agent/frontend state model. music_attribution.chat.agui_endpoint : AG-UI SSE streaming endpoint.

AgentDeps dataclass

AgentDeps(
    state: AttributionAgentState,
    session_factory: async_sessionmaker[AsyncSession]
    | None = None,
)

Dependencies injected into PydanticAI agent tools at runtime.

PydanticAI's dependency injection system passes this object to every tool call via ctx.deps. Tools use the session factory for async database access and the state object for AG-UI state synchronisation with the CopilotKit frontend.

ATTRIBUTE DESCRIPTION
state

Shared mutable state that is serialised as AG-UI StateSnapshot events and consumed by CopilotKit useCopilotReadable hooks.

TYPE: AttributionAgentState

session_factory

SQLAlchemy async session factory bound to the PostgreSQL engine. None when running without a database (e.g. unit tests), in which case tools return a graceful fallback message.

TYPE: async_sessionmaker[AsyncSession] | None

ExplainConfidenceResult

Bases: BaseModel

Structured result of the explain_confidence tool.

Captures both the numeric score and the human-readable explanation with contributing factors, enabling the frontend to render a rich confidence breakdown panel.

ATTRIBUTE DESCRIPTION
work_id

Attribution record identifier that was explained.

TYPE: str

confidence_score

Overall confidence score (0.0--1.0).

TYPE: float

explanation

Human-readable explanation combining all factors.

TYPE: str

factors

Individual contributing factors (source agreement, number of sources, assurance level) as separate strings.

TYPE: list[str]

SearchResult

Bases: BaseModel

A single attribution search result returned by the agent.

Provides the minimal fields needed for the frontend to render a search result row: title, artist, confidence, and assurance.

ATTRIBUTE DESCRIPTION
attribution_id

UUID of the matching attribution record.

TYPE: str

work_title

Display title of the musical work.

TYPE: str

artist_name

Primary artist name.

TYPE: str

confidence_score

Overall confidence score (0.0--1.0).

TYPE: float

assurance_level

A0--A3 assurance level string (e.g. "A3").

TYPE: str

SearchResultSet

Bases: BaseModel

Aggregated result set from the search_attributions tool.

Wraps the query, matching results, and total count for pagination.

ATTRIBUTE DESCRIPTION
query

The original search query string.

TYPE: str

results

Matching attribution records (up to the limit).

TYPE: list[SearchResult]

total_count

Total number of matches (may exceed len(results)).

TYPE: int

SuggestCorrectionResult

Bases: BaseModel

Result of the suggest_correction tool.

Pairs the target work with a CorrectionPreview that the frontend renders as a diff (current value vs. suggested value) for user approval before submission.

ATTRIBUTE DESCRIPTION
work_id

Attribution record identifier to correct.

TYPE: str

correction

Structured preview of the proposed change.

TYPE: CorrectionPreview

SubmitFeedbackResult

Bases: BaseModel

Result of the submit_feedback tool.

Returned after a FeedbackCard is persisted, confirming the submission and indicating whether center-bias was detected.

ATTRIBUTE DESCRIPTION
feedback_id

UUID of the created FeedbackCard.

TYPE: str

attribution_id

UUID of the attribution record the feedback targets.

TYPE: str

accepted

Whether the feedback was successfully stored.

TYPE: bool

message

Human-readable confirmation message (may include bias warning).

TYPE: str

create_attribution_agent

create_attribution_agent() -> Agent[AgentDeps, str]

Create and configure the PydanticAI attribution agent.

Instantiates a PydanticAI Agent with a domain-specific system prompt and registers four tool functions (explain_confidence, search_attributions, suggest_correction, submit_feedback).

The agent model identifier is resolved lazily from application settings via _get_agent_model(). The agent is configured with retries=2 for transient LLM failures.

RETURNS DESCRIPTION
Agent[AgentDeps, str]

Configured PydanticAI Agent with four domain tools, typed to accept AgentDeps and return str responses.

Notes

Tool functions are registered using PydanticAI's @agent.tool decorator, which provides automatic parameter validation and dependency injection via ctx.deps.

Each tool updates ctx.deps.state to synchronise AG-UI state with the CopilotKit frontend (e.g. setting current_work_id after explaining a confidence score).

Source code in src/music_attribution/chat/agent.py
def create_attribution_agent() -> Agent[AgentDeps, str]:
    """Create and configure the PydanticAI attribution agent.

    Instantiates a PydanticAI ``Agent`` with a domain-specific system
    prompt and registers four tool functions (``explain_confidence``,
    ``search_attributions``, ``suggest_correction``, ``submit_feedback``).

    The agent model identifier is resolved lazily from application
    settings via ``_get_agent_model()``. The agent is configured with
    ``retries=2`` for transient LLM failures.

    Returns
    -------
    Agent[AgentDeps, str]
        Configured PydanticAI Agent with four domain tools, typed to
        accept ``AgentDeps`` and return ``str`` responses.

    Notes
    -----
    Tool functions are registered using PydanticAI's ``@agent.tool``
    decorator, which provides automatic parameter validation and
    dependency injection via ``ctx.deps``.

    Each tool updates ``ctx.deps.state`` to synchronise AG-UI state
    with the CopilotKit frontend (e.g. setting ``current_work_id``
    after explaining a confidence score).
    """
    agent = Agent(
        _get_agent_model(),
        system_prompt=SYSTEM_PROMPT,
        deps_type=AgentDeps,
        retries=2,
    )

    @agent.tool
    async def explain_confidence(ctx, work_id: str) -> str:
        """Explain the confidence score for a specific attribution record.

        Fetches the full ``AttributionRecord`` from the database and
        decomposes the confidence score into contributing factors:
        source agreement level, number of corroborating data sources,
        and the A0--A3 assurance level.

        Updates ``ctx.deps.state`` with the current work context so the
        CopilotKit frontend can highlight the relevant record.

        Parameters
        ----------
        ctx : RunContext[AgentDeps]
            PydanticAI run context providing access to ``AgentDeps``.
        work_id : str
            UUID string of the attribution record to explain.

        Returns
        -------
        str
            Human-readable explanation combining all confidence factors,
            or an error message if the record is not found.
        """
        import uuid as _uuid

        from music_attribution.attribution.persistence import AsyncAttributionRepository

        if not ctx.deps.session_factory:
            return "Database not available. Cannot look up attribution records."

        repo = AsyncAttributionRepository()
        async with ctx.deps.session_factory() as session:
            record = await repo.find_by_id(_uuid.UUID(work_id), session)
        if not record:
            return f"No attribution record found for ID: {work_id}"

        score = record.confidence_score
        sources: list[str] = []
        if record.credits:
            sources = [s.value for s in record.credits[0].sources]
        assurance = record.assurance_level.value
        agreement = record.source_agreement

        factors = []
        if agreement > AGREEMENT_HIGH_THRESHOLD:
            factors.append(f"High source agreement ({agreement:.0%})")
        elif agreement > AGREEMENT_MODERATE_THRESHOLD:
            factors.append(f"Moderate source agreement ({agreement:.0%})")
        else:
            factors.append(f"Low source agreement ({agreement:.0%})")

        factors.append(f"Data from {len(sources)} source(s): {', '.join(sources) if sources else 'none'}")
        factors.append(f"Assurance level: {assurance}")

        explanation = f"Confidence score: {score:.0%}. " + " ".join(factors)

        ctx.deps.state.current_work_id = work_id
        ctx.deps.state.confidence_score = score
        ctx.deps.state.explanation_text = explanation

        return explanation

    @agent.tool
    async def search_attributions(ctx, query: str) -> str:
        """Search attribution records by title, artist, or keyword.

        Delegates to ``HybridSearchService`` which fuses text search,
        vector similarity, and graph context via Reciprocal Rank Fusion
        (RRF). Results are formatted as a bullet list with title,
        artist, confidence, and assurance level.

        Updates ``ctx.deps.state`` with the search query and result
        count for the CopilotKit frontend to display.

        Parameters
        ----------
        ctx : RunContext[AgentDeps]
            PydanticAI run context providing access to ``AgentDeps``.
        query : str
            Free-text search query (title, artist name, or keyword).

        Returns
        -------
        str
            Formatted search results as a bullet list, or a "not found"
            message if no records match.
        """
        results: list[str] = []

        if not ctx.deps.session_factory:
            return "Database not available. Cannot search attribution records."

        from music_attribution.search.hybrid_search import HybridSearchService

        svc = HybridSearchService()
        async with ctx.deps.session_factory() as session:
            hits = await svc.search(query, session=session, limit=10)
        for hit in hits:
            rec = hit.record
            results.append(
                f"- {rec.work_title} by {rec.artist_name} "
                f"(confidence: {rec.confidence_score:.0%}, "
                f"assurance: {rec.assurance_level.value})"
            )

        ctx.deps.state.last_search_query = query
        ctx.deps.state.last_search_count = len(results)

        if not results:
            return f"No attributions found for '{query}'."
        return f"Found {len(results)} result(s):\n" + "\n".join(results)

    @agent.tool
    async def suggest_correction(
        ctx,
        work_id: str,
        field: str,
        current_value: str,
        suggested_value: str,
        reason: str,
    ) -> str:
        """Suggest a correction to an attribution record field.

        Creates a ``CorrectionPreview`` and stores it in
        ``ctx.deps.state.pending_correction`` so the CopilotKit
        frontend can render a before/after diff for user approval.
        The correction is not applied until the user confirms via
        ``submit_feedback``.

        Parameters
        ----------
        ctx : RunContext[AgentDeps]
            PydanticAI run context providing access to ``AgentDeps``.
        work_id : str
            UUID string of the attribution record to correct.
        field : str
            Name of the field being corrected (e.g. ``"role_detail"``).
        current_value : str
            Current value of the field in the attribution record.
        suggested_value : str
            Proposed replacement value.
        reason : str
            Justification for the correction.

        Returns
        -------
        str
            Confirmation message with a human-readable correction
            preview and instructions to finalise via ``submit_feedback``.
        """
        preview = CorrectionPreview(
            field=field,
            current_value=current_value,
            suggested_value=suggested_value,
            reason=reason,
            confidence_in_correction=0.8,
        )

        ctx.deps.state.pending_correction = preview
        ctx.deps.state.current_work_id = work_id

        return (
            f"Correction suggested for {work_id}:\n"
            f"  {field}: '{current_value}' → '{suggested_value}'\n"
            f"  Reason: {reason}\n"
            "Use submit_feedback to finalize."
        )

    @agent.tool
    async def submit_feedback(
        ctx,
        work_id: str,
        overall_assessment: float,
        free_text: str | None = None,
    ) -> str:
        """Submit structured feedback for an attribution record.

        Creates and persists a ``FeedbackCard`` (BO-4 boundary object)
        via ``AsyncFeedbackRepository``. Detects center-bias when the
        assessment falls in the 0.45--0.55 range and includes a warning
        in the response (see Teikari 2026, Section 5 on calibration).

        Clears ``ctx.deps.state.pending_correction`` after successful
        submission.

        Parameters
        ----------
        ctx : RunContext[AgentDeps]
            PydanticAI run context providing access to ``AgentDeps``.
        work_id : str
            UUID string of the attribution record to submit feedback for.
        overall_assessment : float
            Overall quality assessment on a 0.0--1.0 scale. Values in
            the 0.45--0.55 range trigger a center-bias warning.
        free_text : str | None, optional
            Optional free-text notes from the reviewer.

        Returns
        -------
        str
            Confirmation message including feedback ID, assessment
            value, and any bias warning.
        """
        import uuid as _uuid

        center_bias = CENTER_BIAS_LOW <= overall_assessment <= CENTER_BIAS_HIGH
        bias_warning = ""
        if center_bias:
            bias_warning = " (center bias detected — consider whether you can be more decisive)"

        if not ctx.deps.session_factory:
            return "Database not available. Cannot submit feedback."

        from datetime import UTC, datetime

        from music_attribution.feedback.persistence import AsyncFeedbackRepository
        from music_attribution.schemas.enums import EvidenceTypeEnum, ReviewerRoleEnum
        from music_attribution.schemas.feedback import FeedbackCard

        card = FeedbackCard(
            feedback_id=_uuid.uuid4(),
            attribution_id=_uuid.UUID(work_id),
            reviewer_id="agent-assisted",
            reviewer_role=ReviewerRoleEnum.MUSICOLOGIST,
            attribution_version=1,
            corrections=[],
            overall_assessment=overall_assessment,
            center_bias_flag=center_bias,
            free_text=free_text,
            evidence_type=EvidenceTypeEnum.OTHER,
            submitted_at=datetime.now(UTC),
        )
        repo = AsyncFeedbackRepository()
        async with ctx.deps.session_factory() as session:
            await repo.store(card, session)
            await session.commit()
        feedback_id = str(card.feedback_id)

        ctx.deps.state.pending_correction = None

        return (
            f"Feedback submitted (ID: {feedback_id}):\n"
            f"  Attribution: {work_id}\n"
            f"  Assessment: {overall_assessment:.0%}{bias_warning}\n"
            + (f"  Notes: {free_text}\n" if free_text else "")
            + "Thank you for your feedback."
        )

    return agent

Chat State

state

Shared state model for the attribution chat agent.

Defines the Pydantic models that represent the mutable conversation state shared between the PydanticAI agent (backend) and CopilotKit (frontend) via the AG-UI protocol. State changes are serialised as StateSnapshot and StateDelta Server-Sent Events.

The frontend observes this state through CopilotKit's useCopilotReadable hooks, enabling real-time UI updates when the agent sets current_work_id, confidence_score, or pending_correction.

CLASS DESCRIPTION
AttributionAgentState

Main state container with fields for the current work context, search state, and pending corrections.

CorrectionPreview

Preview of a proposed field correction awaiting user approval.

See Also

music_attribution.chat.agent : Agent that mutates this state. music_attribution.chat.agui_endpoint : SSE endpoint that serialises this state.

AttributionAgentState

Bases: BaseModel

Shared state between the PydanticAI agent and CopilotKit frontend.

Every field in this model is serialised as an AG-UI StateSnapshot event after each agent turn. CopilotKit useCopilotReadable hooks on the frontend subscribe to individual fields for reactive UI updates (e.g. highlighting the current work, showing search counts).

Agent tools mutate this state via ctx.deps.state during execution. The AG-UI endpoint serialises the final state at the end of each run.

ATTRIBUTE DESCRIPTION
current_work_id

Attribution ID of the work currently being discussed. Set by explain_confidence and suggest_correction tools.

TYPE: str | None

current_work_title

Display title of the current work for the frontend header.

TYPE: str | None

confidence_score

Confidence score (0.0--1.0) currently under discussion. Constrained to [0.0, 1.0] via Pydantic validators.

TYPE: float | None

review_queue_size

Number of works pending human review. Non-negative.

TYPE: int

pending_correction

Preview of a correction the agent is suggesting. The frontend renders this as a before/after diff. Cleared after feedback submission.

TYPE: CorrectionPreview | None

explanation_text

Agent's current natural-language explanation of a confidence score, displayed in the chat sidebar.

TYPE: str | None

last_search_query

Most recent search query executed by the agent.

TYPE: str | None

last_search_count

Number of results from the most recent search. Non-negative.

TYPE: int

CorrectionPreview

Bases: BaseModel

Preview of a proposed field correction before user approval.

Generated by the suggest_correction agent tool and stored in AttributionAgentState.pending_correction. The CopilotKit frontend renders this as a diff view (current vs. suggested value) with the agent's reasoning and confidence level.

ATTRIBUTE DESCRIPTION
field

Name of the attribution record field being corrected (e.g. "role_detail", "artist_name").

TYPE: str

current_value

Current value of the field in the attribution record.

TYPE: str

suggested_value

Proposed replacement value.

TYPE: str

reason

Agent's justification for the correction, displayed to the reviewer for informed approval/rejection.

TYPE: str

confidence_in_correction

Agent's confidence in the suggested correction (0.0--1.0). Constrained to [0.0, 1.0] via Pydantic validators.

TYPE: float

AG-UI Endpoint

agui_endpoint

AG-UI protocol endpoint for CopilotKit integration.

Implements a simplified AG-UI (Agent-GUI) protocol adapter that streams Server-Sent Events (SSE) from the PydanticAI attribution agent to the CopilotKit frontend. The AG-UI protocol defines 31 event types; this adapter implements the subset needed for text-based conversation:

  • RunStarted / RunFinished -- lifecycle bookends
  • TextMessageStart / TextMessageContent / TextMessageEnd -- streamed assistant response
  • StateSnapshot -- full agent state for frontend synchronisation

In production, PydanticAI's AGUIAdapter would replace this simplified implementation to support the full event catalogue (tool calls, state deltas, action requests, etc.).

The endpoint is mounted at /api/v1/copilotkit and accepts POST requests with the CopilotKit message payload.

Notes

The agent instance is a lazy singleton (_get_agent) to avoid requiring an Anthropic API key at import time. Tests mock this function to inject a test agent.

See Also

music_attribution.chat.agent : Agent factory and tool definitions. music_attribution.chat.state : Shared state model.

copilotkit_endpoint async

copilotkit_endpoint(request: Request) -> StreamingResponse

CopilotKit AG-UI endpoint for agent conversation.

Receives a CopilotKit message payload via POST, extracts the conversation history, and returns an SSE stream of AG-UI events generated by the PydanticAI attribution agent.

The endpoint extracts the async_session_factory from request.app.state (set during FastAPI startup) to give agent tools database access.

SSE response headers disable caching and buffering to ensure low-latency event delivery to the frontend.

PARAMETER DESCRIPTION
request

FastAPI request containing a JSON body with messages (list of {role, content} dicts) from CopilotKit.

TYPE: Request

RETURNS DESCRIPTION
StreamingResponse

SSE stream (text/event-stream) with AG-UI events.

Source code in src/music_attribution/chat/agui_endpoint.py
@router.post("/copilotkit")
async def copilotkit_endpoint(request: Request) -> StreamingResponse:
    """CopilotKit AG-UI endpoint for agent conversation.

    Receives a CopilotKit message payload via POST, extracts the
    conversation history, and returns an SSE stream of AG-UI events
    generated by the PydanticAI attribution agent.

    The endpoint extracts the ``async_session_factory`` from
    ``request.app.state`` (set during FastAPI startup) to give agent
    tools database access.

    SSE response headers disable caching and buffering to ensure
    low-latency event delivery to the frontend.

    Parameters
    ----------
    request : Request
        FastAPI request containing a JSON body with ``messages``
        (list of ``{role, content}`` dicts) from CopilotKit.

    Returns
    -------
    StreamingResponse
        SSE stream (``text/event-stream``) with AG-UI events.
    """
    body = await request.json()
    messages = body.get("messages", [])

    # Get or create state
    state = AttributionAgentState()

    # Extract async session factory for DB access
    session_factory: Any = getattr(request.app.state, "async_session_factory", None)

    return StreamingResponse(
        _generate_sse_events(messages, state, session_factory),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )