"""SSE-streaming chat layer — §18 carryover, adapted to the §5 schema. The prototype kept conversation state in an in-memory `RFCChat` keyed by a session_id. Here, history is the durable list of `thread_messages` rows on a `threads` row, scoped to one branch (or to a sub-thread anchored to a range or paragraph within it). The streaming response is parsed for `` blocks per §18; each `` becomes a `changes` row with `state='pending'` per §8.14 the moment it is parsed, regardless of mode. This module exposes two seams: - `build_history` and `build_system_prompt` — pure functions a caller can use to assemble the LLM request without owning a provider. - `stream_assistant_turn` — the orchestration that creates the assistant `thread_messages` row, runs the provider's streaming interface, parses `` blocks as they accumulate, materializes `changes` rows on completion, and yields SSE-shaped text chunks. Per the §1 invariant, no Git writes happen here — chat is app data; turning an accepted `` into a commit is a separate gesture that goes through `bot.py`. """ from __future__ import annotations import base64 import json import logging import re from dataclasses import dataclass from typing import AsyncIterator, Iterator from . import db from .providers import BaseProvider log = logging.getLogger(__name__) # The §18 system prompt, adapted from the prototype. The prototype's # version assumed one RFC document loaded as context; here the document # is the branch's RFC.md at its current tip. The selection-quote shape # (§8.12) is wired by the caller into the user message text — not the # system prompt — so the model sees it as part of the turn. SYSTEM_PROMPT = """You are a participant in the Wiggleverse RFC framework — a standardization process for natural-language vocabulary that humans and machines need to share. You are collaborating with a human contributor on the RFC titled "{title}". The contributor's gestures may be questions, objections, sketches of new framings, or direct edit requests. Your role is to translate them into concrete proposed edits where possible. The transcript of this conversation is the durable evidence the definition was earned, so be specific and stay close to the text. Format each proposed change as one block: exact text to replace, copied verbatim from the document replacement text why this change improves the document — one or two short sentences Rules: - The text must match the document character-for-character. Do not paraphrase, do not abbreviate. - One block per distinct edit. Multiple blocks are encouraged when the contributor's input touches several passages. - If the contributor is asking a general question or exploring an idea not yet ready to become an edit, respond in plain prose. When in doubt, lean toward proposing an edit. - After your blocks you may add a brief conversational note. Keep it short. --- The current document: {body} """ # --------------------------------------------------------------------------- # History / prompt assembly # --------------------------------------------------------------------------- def build_history(thread_id: int) -> list[dict]: """Pull the thread's messages in chronological order, in the {role, content} shape every provider's `send` interface expects. System-author rows are excluded — the prompt template carries the standing instructions; system-author messages are inline narrative that doesn't change the model's behavior. """ rows = db.conn().execute( """ SELECT role, text FROM thread_messages WHERE thread_id = ? AND role IN ('user', 'assistant') ORDER BY id """, (thread_id,), ).fetchall() return [{"role": r["role"], "content": r["text"]} for r in rows] def build_system_prompt(*, title: str, body: str) -> str: return SYSTEM_PROMPT.format(title=title, body=body) # --------------------------------------------------------------------------- # parsing # --------------------------------------------------------------------------- _CHANGE_RE = re.compile( r"\s*([\s\S]*?)\s*([\s\S]*?)\s*([\s\S]*?)\s*", re.MULTILINE, ) @dataclass(frozen=True) class ParsedChange: original: str proposed: str reason: str def parse_changes(text: str) -> list[ParsedChange]: """Per §18: pull every well-formed block out of an assistant message. Mid-stream partials are simply not matched yet; the parser runs once on completion.""" return [ ParsedChange(m.group(1).strip(), m.group(2).strip(), m.group(3).strip()) for m in _CHANGE_RE.finditer(text) ] # --------------------------------------------------------------------------- # Persistence — turn boundaries # --------------------------------------------------------------------------- def append_user_message( *, thread_id: int, author_user_id: int, text: str, quote: str | None, ) -> int: cur = db.conn().execute( """ INSERT INTO thread_messages (thread_id, role, author_user_id, text, quote) VALUES (?, 'user', ?, ?, ?) """, (thread_id, author_user_id, text, quote), ) return cur.lastrowid def append_assistant_placeholder(*, thread_id: int, model_id: str) -> int: cur = db.conn().execute( """ INSERT INTO thread_messages (thread_id, role, model_id, text) VALUES (?, 'assistant', ?, '') """, (thread_id, model_id), ) return cur.lastrowid def finalize_assistant_message(*, message_id: int, text: str) -> None: db.conn().execute( "UPDATE thread_messages SET text = ? WHERE id = ?", (text, message_id), ) def append_system_message(*, thread_id: int, text: str) -> int: """Used by §10.6 (manual-edit-flush markers), §9.3 (decline-comment record), and any other system-narrated event that needs to live inline in chat. role='system', author_user_id=NULL.""" cur = db.conn().execute( """ INSERT INTO thread_messages (thread_id, role, text) VALUES (?, 'system', ?) """, (thread_id, text), ) return cur.lastrowid def materialize_changes( *, rfc_slug: str, branch_name: str, thread_id: int, source_message_id: int, parsed: list[ParsedChange], ) -> list[int]: """Per §8.14: every block becomes a `changes` row with state='pending' immediately, regardless of mode. Returns the new row ids in source order.""" ids: list[int] = [] for ch in parsed: cur = db.conn().execute( """ INSERT INTO changes (rfc_slug, branch_name, thread_id, source_message_id, kind, state, original, proposed, reason) VALUES (?, ?, ?, ?, 'ai', 'pending', ?, ?, ?) """, (rfc_slug, branch_name, thread_id, source_message_id, ch.original, ch.proposed, ch.reason), ) ids.append(cur.lastrowid) return ids def mark_stale_overlapping( *, rfc_slug: str, branch_name: str, new_body: str, ) -> int: """Per §8.11: when a manual edit changes the document such that a pending AI proposal's `original` no longer locates, set its `stale_since`. The contributor's action stays gated on the stale card; state stays `pending`. Returns the number of rows marked stale on this call (idempotent on re-entry — already-stale rows aren't touched twice). """ rows = db.conn().execute( """ SELECT id, original FROM changes WHERE rfc_slug = ? AND branch_name = ? AND kind = 'ai' AND state = 'pending' AND stale_since IS NULL """, (rfc_slug, branch_name), ).fetchall() marked = 0 for r in rows: original = (r["original"] or "").strip() if original and original not in new_body: db.conn().execute( "UPDATE changes SET stale_since = datetime('now') WHERE id = ?", (r["id"],), ) marked += 1 return marked # --------------------------------------------------------------------------- # SSE shape — base64 chunks for binary-safe transport # --------------------------------------------------------------------------- def sse_chunk(text: str) -> str: encoded = base64.b64encode(text.encode("utf-8")).decode("ascii") return f"data: {encoded}\n\n" def sse_event(name: str, payload: dict) -> str: return f"event: {name}\ndata: {json.dumps(payload)}\n\n" # --------------------------------------------------------------------------- # Orchestration # --------------------------------------------------------------------------- async def stream_assistant_turn( *, provider: BaseProvider, system_prompt: str, history: list[dict], user_message: str, thread_id: int, rfc_slug: str, branch_name: str, assistant_message_id: int, ) -> AsyncIterator[str]: """Run the provider's streaming interface, yielding SSE-encoded chunks. On completion, materializes `changes` rows from any `` blocks in the assembled text and emits a trailing `changes` event listing the new change ids. The user's message must already have been persisted by the caller before this is invoked; the placeholder assistant row whose id is `assistant_message_id` must exist too. This module's job is to populate the assistant row's text and materialize the changes; the caller wires it into a FastAPI StreamingResponse. Provider streaming is synchronous (an `Iterator[str]`) per §18; we drain it eagerly into chunks and yield them as async strings. This is sufficient at single-process scale (§4.2) and the streaming impl is what the prototype shipped — re-wrapping it in a worker thread for a future deployment shape is a one-liner if it matters. """ full_text_chunks: list[str] = [] # Hand the provider the user turn appended to history. history_for_call = list(history) + [{"role": "user", "content": user_message}] def _drain() -> Iterator[str]: try: yield from provider.send_streaming(system_prompt, history_for_call) except Exception as e: log.exception("provider stream failed") yield f"\n\n[Provider error: {e}]" for chunk in _drain(): if not chunk: continue full_text_chunks.append(chunk) yield sse_chunk(chunk) full_text = "".join(full_text_chunks) finalize_assistant_message(message_id=assistant_message_id, text=full_text) parsed = parse_changes(full_text) new_ids = materialize_changes( rfc_slug=rfc_slug, branch_name=branch_name, thread_id=thread_id, source_message_id=assistant_message_id, parsed=parsed, ) yield sse_event( "changes", { "message_id": assistant_message_id, "change_ids": new_ids, "count": len(new_ids), }, ) yield "data: DONE\n\n"