Files
Ben Stull ee6e3491e7 Drop "prototype/carryover" framing now that v1 is shipped
SPEC, DEV docs, and code comments still talked about the codebase as
a rewrite-in-progress against an external prototype. With v1 shipped
the framing reads oddly — it implies code is provisional when it's
the production thing. Recast §18 as "the technical stack," strip
"carryover from the prototype" comments across backend (api.py,
chat.py, providers.py) and frontend (DiffView, PromptBar,
SelectionTooltip, modelStyles), and rework SPEC §1 / §18 to introduce
OHM up front rather than as a follow-on to a prototype reference.

Also:
- RUNBOOK: bump Python prereq to 3.11+ to match the production VM
  (was 3.13).
- Remove IMPLEMENTATION-PROMPT.md — the original implementation brief
  is no longer load-bearing.
- Add deploy/DEPLOY-NEW-SESSION-PROMPT.md as the durable
  deploy-handoff prompt for new sessions.
2026-05-25 10:32:46 -07:00

358 lines
12 KiB
Python

"""SSE-streaming chat layer — §18 stack, applied to the §5 schema.
Conversation history is the durable list of `thread_messages` rows on
a `threads` row, scoped to one branch (or to a sub-thread anchored to
a range or paragraph within it). The streaming response is parsed for
`<change>` blocks per §18; each `<change>` becomes a `changes` row
with `state='pending'` per §8.14 the moment it is parsed, regardless
of mode.
This module exposes two seams:
- `build_history` and `build_system_prompt` — pure functions a caller
can use to assemble the LLM request without owning a provider.
- `stream_assistant_turn` — the orchestration that creates the
assistant `thread_messages` row, runs the provider's streaming
interface, parses `<change>` blocks as they accumulate, materializes
`changes` rows on completion, and yields SSE-shaped text chunks.
Per the §1 invariant, no Git writes happen here — chat is app data;
turning an accepted `<change>` into a commit is a separate gesture
that goes through `bot.py`.
"""
from __future__ import annotations
import base64
import json
import logging
import re
from dataclasses import dataclass
from typing import AsyncIterator, Iterator
from . import db
from .providers import BaseProvider
log = logging.getLogger(__name__)
# The §18 system prompt. The document loaded as context is the
# branch's RFC.md at its current tip. The selection-quote shape
# (§8.12) is wired by the caller into the user message text — not the
# system prompt — so the model sees it as part of the turn.
SYSTEM_PROMPT = """You are a participant in the Wiggleverse RFC framework — a standardization process for natural-language vocabulary that humans and machines need to share. You are collaborating with a human contributor on the RFC titled "{title}".
The contributor's gestures may be questions, objections, sketches of new framings, or direct edit requests. Your role is to translate them into concrete proposed edits where possible. The transcript of this conversation is the durable evidence the definition was earned, so be specific and stay close to the text.
Format each proposed change as one <change> block:
<change>
<original>exact text to replace, copied verbatim from the document</original>
<proposed>replacement text</proposed>
<reason>why this change improves the document — one or two short sentences</reason>
</change>
Rules:
- The <original> text must match the document character-for-character. Do not paraphrase, do not abbreviate.
- One <change> block per distinct edit. Multiple blocks are encouraged when the contributor's input touches several passages.
- If the contributor is asking a general question or exploring an idea not yet ready to become an edit, respond in plain prose. When in doubt, lean toward proposing an edit.
- After your <change> blocks you may add a brief conversational note. Keep it short.
---
The current document:
{body}
"""
# ---------------------------------------------------------------------------
# History / prompt assembly
# ---------------------------------------------------------------------------
def build_history(thread_id: int) -> list[dict]:
"""Pull the thread's messages in chronological order, in the
{role, content} shape every provider's `send` interface expects.
System-author rows are excluded — the prompt template carries the
standing instructions; system-author messages are inline narrative
that doesn't change the model's behavior.
"""
rows = db.conn().execute(
"""
SELECT role, text FROM thread_messages
WHERE thread_id = ? AND role IN ('user', 'assistant')
ORDER BY id
""",
(thread_id,),
).fetchall()
return [{"role": r["role"], "content": r["text"]} for r in rows]
def build_system_prompt(*, title: str, body: str) -> str:
return SYSTEM_PROMPT.format(title=title, body=body)
# ---------------------------------------------------------------------------
# <change> parsing
# ---------------------------------------------------------------------------
_CHANGE_RE = re.compile(
r"<change>\s*<original>([\s\S]*?)</original>\s*<proposed>([\s\S]*?)</proposed>\s*<reason>([\s\S]*?)</reason>\s*</change>",
re.MULTILINE,
)
@dataclass(frozen=True)
class ParsedChange:
original: str
proposed: str
reason: str
def parse_changes(text: str) -> list[ParsedChange]:
"""Per §18: pull every well-formed <change> block out of an assistant
message. Mid-stream partials are simply not matched yet; the parser
runs once on completion."""
return [
ParsedChange(m.group(1).strip(), m.group(2).strip(), m.group(3).strip())
for m in _CHANGE_RE.finditer(text)
]
# ---------------------------------------------------------------------------
# Persistence — turn boundaries
# ---------------------------------------------------------------------------
def append_user_message(
*,
thread_id: int,
author_user_id: int,
text: str,
quote: str | None,
) -> int:
cur = db.conn().execute(
"""
INSERT INTO thread_messages (thread_id, role, author_user_id, text, quote)
VALUES (?, 'user', ?, ?, ?)
""",
(thread_id, author_user_id, text, quote),
)
message_id = cur.lastrowid
# §15 chokepoint per Slice 6: chat messages don't flow through the
# bot wrapper (no Git write), so the fan-out is anchored here. The
# routing is: prior thread authors get personal-direct
# chat_reply_to_my_message; RFC watchers get churn-class
# chat_message_in_participated_thread. The notify module resolves
# the thread's RFC/branch context internally.
_fan_out_chat(thread_id, author_user_id, message_id)
return message_id
def _fan_out_chat(thread_id: int, author_user_id: int, message_id: int) -> None:
from . import notify
row = db.conn().execute(
"SELECT rfc_slug, branch_name, thread_kind FROM threads WHERE id = ?",
(thread_id,),
).fetchone()
if row is None or not row["rfc_slug"]:
return
pr_number = None
if row["thread_kind"] == "review":
pr_row = db.conn().execute(
"""
SELECT pr_number FROM cached_prs
WHERE rfc_slug = ? AND head_branch = ? AND state = 'open'
ORDER BY pr_number DESC LIMIT 1
""",
(row["rfc_slug"], row["branch_name"]),
).fetchone()
if pr_row:
pr_number = pr_row["pr_number"]
notify.fan_out_chat_message(
actor_user_id=author_user_id,
rfc_slug=row["rfc_slug"],
branch_name=row["branch_name"] or "main",
thread_id=thread_id,
message_id=message_id,
is_review_thread=(row["thread_kind"] == "review"),
pr_number=pr_number,
)
def append_assistant_placeholder(*, thread_id: int, model_id: str) -> int:
cur = db.conn().execute(
"""
INSERT INTO thread_messages (thread_id, role, model_id, text)
VALUES (?, 'assistant', ?, '')
""",
(thread_id, model_id),
)
return cur.lastrowid
def finalize_assistant_message(*, message_id: int, text: str) -> None:
db.conn().execute(
"UPDATE thread_messages SET text = ? WHERE id = ?",
(text, message_id),
)
def append_system_message(*, thread_id: int, text: str) -> int:
"""Used by §10.6 (manual-edit-flush markers), §9.3 (decline-comment
record), and any other system-narrated event that needs to live
inline in chat. role='system', author_user_id=NULL."""
cur = db.conn().execute(
"""
INSERT INTO thread_messages (thread_id, role, text)
VALUES (?, 'system', ?)
""",
(thread_id, text),
)
return cur.lastrowid
def materialize_changes(
*,
rfc_slug: str,
branch_name: str,
thread_id: int,
source_message_id: int,
parsed: list[ParsedChange],
) -> list[int]:
"""Per §8.14: every <change> block becomes a `changes` row with
state='pending' immediately, regardless of mode. Returns the new
row ids in source order."""
ids: list[int] = []
for ch in parsed:
cur = db.conn().execute(
"""
INSERT INTO changes
(rfc_slug, branch_name, thread_id, source_message_id,
kind, state, original, proposed, reason)
VALUES (?, ?, ?, ?, 'ai', 'pending', ?, ?, ?)
""",
(rfc_slug, branch_name, thread_id, source_message_id, ch.original, ch.proposed, ch.reason),
)
ids.append(cur.lastrowid)
return ids
def mark_stale_overlapping(
*,
rfc_slug: str,
branch_name: str,
new_body: str,
) -> int:
"""Per §8.11: when a manual edit changes the document such that a
pending AI proposal's `original` no longer locates, set its
`stale_since`. The contributor's action stays gated on the stale
card; state stays `pending`.
Returns the number of rows marked stale on this call (idempotent
on re-entry — already-stale rows aren't touched twice).
"""
rows = db.conn().execute(
"""
SELECT id, original FROM changes
WHERE rfc_slug = ? AND branch_name = ?
AND kind = 'ai' AND state = 'pending' AND stale_since IS NULL
""",
(rfc_slug, branch_name),
).fetchall()
marked = 0
for r in rows:
original = (r["original"] or "").strip()
if original and original not in new_body:
db.conn().execute(
"UPDATE changes SET stale_since = datetime('now') WHERE id = ?",
(r["id"],),
)
marked += 1
return marked
# ---------------------------------------------------------------------------
# SSE shape — base64 chunks for binary-safe transport
# ---------------------------------------------------------------------------
def sse_chunk(text: str) -> str:
encoded = base64.b64encode(text.encode("utf-8")).decode("ascii")
return f"data: {encoded}\n\n"
def sse_event(name: str, payload: dict) -> str:
return f"event: {name}\ndata: {json.dumps(payload)}\n\n"
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
async def stream_assistant_turn(
*,
provider: BaseProvider,
system_prompt: str,
history: list[dict],
user_message: str,
thread_id: int,
rfc_slug: str,
branch_name: str,
assistant_message_id: int,
) -> AsyncIterator[str]:
"""Run the provider's streaming interface, yielding SSE-encoded
chunks. On completion, materializes `changes` rows from any
`<change>` blocks in the assembled text and emits a trailing
`changes` event listing the new change ids.
The user's message must already have been persisted by the caller
before this is invoked; the placeholder assistant row whose id is
`assistant_message_id` must exist too. This module's job is to
populate the assistant row's text and materialize the changes; the
caller wires it into a FastAPI StreamingResponse.
Provider streaming is synchronous (an `Iterator[str]`) per §18; we
drain it eagerly into chunks and yield them as async strings. This
is sufficient at single-process scale (§4.2); re-wrapping it in a
worker thread for a future deployment shape is a one-liner if it
matters.
"""
full_text_chunks: list[str] = []
# Hand the provider the user turn appended to history.
history_for_call = list(history) + [{"role": "user", "content": user_message}]
def _drain() -> Iterator[str]:
try:
yield from provider.send_streaming(system_prompt, history_for_call)
except Exception as e:
log.exception("provider stream failed")
yield f"\n\n[Provider error: {e}]"
for chunk in _drain():
if not chunk:
continue
full_text_chunks.append(chunk)
yield sse_chunk(chunk)
full_text = "".join(full_text_chunks)
finalize_assistant_message(message_id=assistant_message_id, text=full_text)
parsed = parse_changes(full_text)
new_ids = materialize_changes(
rfc_slug=rfc_slug,
branch_name=branch_name,
thread_id=thread_id,
source_message_id=assistant_message_id,
parsed=parsed,
)
yield sse_event(
"changes",
{
"message_id": assistant_message_id,
"change_ids": new_ids,
"count": len(new_ids),
},
)
yield "data: DONE\n\n"