Files
rfc-app/backend/app/api_prs.py
T
Ben Stull 4565a6cb95 Slice 4: super-draft body editing per §9.5 + §9.6
The §17 routing-collapse rule lands in api_branches.py and
api_prs.py — every branches/<branch>/... and prs/<n>/... route
dispatches on the entry's state to pick the right Gitea repo, and
the body extracted from the entry's frontmatter envelope is what
the editor and the diff see. The bot grows open_metadata_pr;
cache grows refresh_meta_branches. Two §17 routes added:
start-edit-branch and metadata. The §9.4 super-draft view replaces
RFCView.jsx's Slice 2 placeholder; a metadata pane modal opens
from the breadcrumb. Branch naming uses edit-<slug>-<6hex> to
dodge the §19.2 path-routing candidate while preserving §9.5's
structural shape.

Covered by tests/test_super_draft_vertical.py (10 tests). The
full Slices 1-4 suite is 35/35 green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 15:43:21 -07:00

989 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Slice 3 API surface — the §10 PR flow's endpoints.
Owns every `prs/<pr_number>/...` route from §17, plus the branch-scoped
`pr-draft` and `open-pr` endpoints that compose the §10.2 modal. Read
paths fetch branch and main bodies live from Gitea; write paths funnel
through `bot.py` so the §1 chokepoint and the §6.5 trailer hold.
Visibility and the §11.3 universal-public rule fall out structurally:
opening a PR is the moment a private branch goes public, and the
confirmation lives on the §10.1 affordance rather than as a side-effect
of toggling visibility. The frontend confirms; the server is the
authority that flips the branch visibility row at open time.
Permission gates per §6.1 (app admin/owner), §6.3 (per-RFC owners and
arbiters), and §6.5 (every PR write carries an On-behalf-of trailer).
"""
from __future__ import annotations
import json
import logging
from typing import Any
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel, Field
from . import auth, cache, chat as chat_layer, db, entry as entry_mod
from .bot import Bot
from .config import Config
from .gitea import Gitea, GiteaError
from .providers import BaseProvider
log = logging.getLogger(__name__)
RFC_FILE_PATH = "RFC.md"
# ---------------------------------------------------------------------------
# Request bodies
# ---------------------------------------------------------------------------
class OpenPRBody(BaseModel):
title: str = Field(min_length=1, max_length=240)
description: str = Field(max_length=8000)
class PRDescriptionBody(BaseModel):
title: str = Field(min_length=1, max_length=240)
description: str = Field(max_length=8000)
class PRSeenBody(BaseModel):
last_seen_commit_sha: str | None = None
last_seen_message_id: int | None = None
class PRReviewBody(BaseModel):
text: str = Field(min_length=1, max_length=20_000)
anchor_payload: dict = Field(default_factory=dict)
quote: str | None = Field(default=None, max_length=2000)
def make_router(
config: Config,
gitea: Gitea,
bot: Bot,
providers: dict[str, BaseProvider],
) -> APIRouter:
router = APIRouter()
default_model = next(iter(providers)) if providers else ""
# -------------------------------------------------------------------
# §10.2: AI-drafted PR title and description.
# Returned ahead of submit so the modal renders with prefilled values
# the contributor can edit. The contributor's gesture is what
# produces the open-pr call; the draft is just a starting point.
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/branches/{branch}/pr-draft")
async def draft_pr_text(slug: str, branch: str, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_active_rfc(slug)
owner, repo = _owner_repo(rfc)
path = _file_path_for(rfc)
if not _branch_has_commits_ahead(slug, branch):
raise HTTPException(409, "Branch has no commits ahead of main")
main_fetched = await gitea.read_file(owner, repo, path, ref="main")
branch_fetched = await gitea.read_file(owner, repo, path, ref=branch)
if not branch_fetched:
raise HTTPException(404, f"Branch {path} not found")
chat_messages = _branch_chat_excerpt(slug, branch)
title, description = _draft_with_provider(
providers=providers,
default_model=default_model,
rfc_title=rfc["title"],
main_body=_extract_body(rfc, (main_fetched or ("", ""))[0]),
branch_body=_extract_body(rfc, branch_fetched[0]),
chat_messages=chat_messages,
)
_ = viewer # silence unused
return {"title": title, "description": description}
# -------------------------------------------------------------------
# §10.1: open a PR. The §11.3 universal-public flip is server-side —
# the frontend confirms before calling; this endpoint flips the
# branch's read_public unconditionally.
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/branches/{branch}/open-pr")
async def open_pr(slug: str, branch: str, body: OpenPRBody, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_active_rfc(slug)
if branch == "main":
raise HTTPException(409, "PRs open from non-main branches")
owner, repo = _owner_repo(rfc)
# §10.1: branch must have commits ahead of main.
if not _branch_has_commits_ahead(slug, branch):
raise HTTPException(409, "Branch has no commits ahead of main")
# §10.9: at most one open PR per branch.
existing = db.conn().execute(
"""
SELECT pr_number FROM cached_prs
WHERE rfc_slug = ? AND head_branch = ? AND state = 'open'
""",
(slug, branch),
).fetchone()
if existing:
raise HTTPException(409, "This branch already has an open PR")
# §11.3: opening a PR makes the branch publicly readable.
db.conn().execute(
"""
INSERT INTO branch_visibility (rfc_slug, branch_name, read_public, contribute_mode)
VALUES (?, ?, 1, 'just-me')
ON CONFLICT(rfc_slug, branch_name) DO UPDATE SET read_public = 1
""",
(slug, branch),
)
# §10.9: when the branch is a resolution branch, the open carries
# a `Supersedes:` trailer naming the original PR so the cache
# closes it on the resolution PR's merge.
supersedes = _resolution_origin(slug, branch)
try:
pr = await bot.open_branch_pr(
viewer.as_actor(),
owner=owner,
repo=repo,
head_branch=branch,
title=body.title.strip(),
description=body.description.strip(),
slug=slug,
supersedes_pr_number=supersedes,
)
except GiteaError as e:
raise HTTPException(502, f"Gitea: {e.detail}")
await _refresh_after_pr_write(rfc)
return {"pr_number": pr["number"], "slug": slug, "branch": branch}
# -------------------------------------------------------------------
# §10.3: the PR review page data.
# -------------------------------------------------------------------
@router.get("/api/rfcs/{slug}/prs/{pr_number}")
async def get_pr(slug: str, pr_number: int, request: Request) -> dict[str, Any]:
viewer = auth.current_user(request)
rfc = _require_active_rfc(slug)
pr_row = _require_pr(slug, pr_number)
owner, repo = _owner_repo(rfc)
path = _file_path_for(rfc)
head_branch = pr_row["head_branch"]
# §11.3: PRs are always public; no visibility check.
main_fetched = await gitea.read_file(owner, repo, path, ref="main")
main_body = _extract_body(rfc, (main_fetched or ("", ""))[0])
merge_sha = pr_row["merge_commit_sha"] if "merge_commit_sha" in pr_row.keys() else None
branch_ref = merge_sha if pr_row["state"] == "merged" and merge_sha else head_branch
branch_fetched = await gitea.read_file(owner, repo, path, ref=branch_ref) if branch_ref else None
if branch_fetched is None:
# Fall back to head_branch if the merge commit is gone.
branch_fetched = await gitea.read_file(owner, repo, path, ref=head_branch) or ("", "")
branch_body = _extract_body(rfc, branch_fetched[0])
# Threads + messages — the branch chat is the PR's conversation
# surface per §10.4. Both `chat`/`flag` and `review` kinds
# surface here; the frontend renders them inline with visual
# distinction.
thread_rows = db.conn().execute(
"""
SELECT id, anchor_kind, anchor_payload, thread_kind, label, state,
created_by, created_at, resolved_at, resolved_by
FROM threads
WHERE rfc_slug = ? AND branch_name = ?
ORDER BY id
""",
(slug, head_branch),
).fetchall()
threads = [_serialize_thread(r) for r in thread_rows]
thread_ids = [t["id"] for t in threads]
messages_by_thread: dict[int, list[dict]] = {tid: [] for tid in thread_ids}
if thread_ids:
placeholders = ",".join("?" * len(thread_ids))
msg_rows = db.conn().execute(
f"""
SELECT m.id, m.thread_id, m.role, m.author_user_id,
u.gitea_login AS author_login, u.display_name AS author_display,
m.model_id, m.text, m.quote, m.created_at
FROM thread_messages m
LEFT JOIN users u ON u.id = m.author_user_id
WHERE m.thread_id IN ({placeholders})
ORDER BY m.id
""",
tuple(thread_ids),
).fetchall()
for r in msg_rows:
messages_by_thread.setdefault(r["thread_id"], []).append(_serialize_message(r))
# Per-user seen cursor per §10.3. Anonymous viewers get no
# cursor — they always see "everything new" but cannot advance
# the cursor (no row to write to).
seen = None
if viewer is not None:
seen_row = db.conn().execute(
"""
SELECT last_seen_commit_sha, last_seen_message_id, seen_at
FROM pr_seen
WHERE user_id = ? AND rfc_slug = ? AND pr_number = ?
""",
(viewer.user_id, slug, pr_number),
).fetchone()
if seen_row:
seen = {
"last_seen_commit_sha": seen_row["last_seen_commit_sha"],
"last_seen_message_id": seen_row["last_seen_message_id"],
"seen_at": seen_row["seen_at"],
}
# Live Gitea pull for mergeability per §10.5 / §10.9.
mergeable = None
conflict_files: list[str] = []
if pr_row["state"] == "open":
try:
live = await gitea.get_pull(owner, repo, pr_number)
except GiteaError:
live = None
if live is not None:
mergeable = bool(live.get("mergeable"))
if not mergeable:
conflict_files = [path]
# Aggregate counts the header strip surfaces per §10.3.
open_review = sum(1 for t in threads if t["thread_kind"] == "review" and t["state"] == "open")
open_chat = sum(1 for t in threads if t["thread_kind"] == "chat" and t["state"] == "open" and t["anchor_kind"] != "whole-doc")
open_flags = sum(1 for t in threads if t["thread_kind"] == "flag" and t["state"] == "open")
# §10.9: surface the supersession relationship in both
# directions. `superseded_by` carries the resolution PR that
# closed this one (set by the cache on the resolution merge).
# `supersedes` is parsed from this PR's body trailer so it
# surfaces immediately on open — without waiting for the
# original to close.
superseded_by = pr_row["superseded_by_pr_number"]
from .cache import _parse_supersedes
supersedes = _parse_supersedes(pr_row["description"] or "")
if supersedes is None:
row = db.conn().execute(
"""
SELECT original_pr_number FROM pr_resolution_branches
WHERE rfc_slug = ? AND resolution_branch = ?
""",
(slug, head_branch),
).fetchone()
if row:
supersedes = row["original_pr_number"]
capabilities = _pr_capabilities(rfc, pr_row, viewer)
return {
"slug": slug,
"rfc_title": rfc["title"],
"rfc_id": rfc["rfc_id"],
"pr_number": pr_number,
"title": pr_row["title"],
"description": pr_row["description"],
"state": pr_row["state"],
"opened_by": pr_row["opened_by"],
"opened_at": pr_row["opened_at"],
"merged_at": pr_row["merged_at"],
"closed_at": pr_row["closed_at"],
"merge_commit_sha": pr_row["merge_commit_sha"],
"head_branch": head_branch,
"head_sha": pr_row["head_sha"],
"base_branch": pr_row["base_branch"],
"superseded_by_pr_number": superseded_by,
"supersedes_pr_number": supersedes,
"main_body": main_body or "",
"branch_body": branch_body or "",
"threads": threads,
"messages_by_thread": messages_by_thread,
"seen": seen,
"mergeable": mergeable,
"conflict_files": conflict_files,
"counts": {
"open_review_threads": open_review,
"open_chat_threads": open_chat,
"open_flags": open_flags,
},
"capabilities": capabilities,
}
# -------------------------------------------------------------------
# §10.3: advance the per-user seen cursor.
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/prs/{pr_number}/seen")
async def advance_seen(slug: str, pr_number: int, body: PRSeenBody, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
_require_active_rfc(slug)
_require_pr(slug, pr_number)
# Take the max of stored and incoming for both cursors so a
# stale tab firing a seen-cursor advance after a fresher tab
# cannot roll the cursor back.
existing = db.conn().execute(
"""
SELECT last_seen_commit_sha, last_seen_message_id
FROM pr_seen
WHERE user_id = ? AND rfc_slug = ? AND pr_number = ?
""",
(viewer.user_id, slug, pr_number),
).fetchone()
new_sha = body.last_seen_commit_sha or (existing["last_seen_commit_sha"] if existing else None)
existing_msg = existing["last_seen_message_id"] if existing else None
new_msg = body.last_seen_message_id
if existing_msg is not None and new_msg is not None:
new_msg = max(existing_msg, new_msg)
elif new_msg is None:
new_msg = existing_msg
db.conn().execute(
"""
INSERT INTO pr_seen
(user_id, rfc_slug, pr_number, last_seen_commit_sha, last_seen_message_id, seen_at)
VALUES (?, ?, ?, ?, ?, datetime('now'))
ON CONFLICT(user_id, rfc_slug, pr_number) DO UPDATE SET
last_seen_commit_sha = excluded.last_seen_commit_sha,
last_seen_message_id = excluded.last_seen_message_id,
seen_at = excluded.seen_at
""",
(viewer.user_id, slug, pr_number, new_sha, new_msg),
)
return {"ok": True}
# -------------------------------------------------------------------
# §10.4: post a review-kind thread anchored to a diff range.
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/prs/{pr_number}/review")
async def post_review_thread(slug: str, pr_number: int, body: PRReviewBody, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
_require_active_rfc(slug)
pr_row = _require_pr(slug, pr_number)
head_branch = pr_row["head_branch"]
cur = db.conn().execute(
"""
INSERT INTO threads (rfc_slug, branch_name, anchor_kind, anchor_payload,
thread_kind, label, created_by)
VALUES (?, ?, 'range', ?, 'review', NULL, ?)
""",
(slug, head_branch, json.dumps(body.anchor_payload), viewer.user_id),
)
thread_id = cur.lastrowid
message_id = chat_layer.append_user_message(
thread_id=thread_id,
author_user_id=viewer.user_id,
text=body.text,
quote=body.quote,
)
return {"thread_id": thread_id, "message_id": message_id}
# -------------------------------------------------------------------
# §10.5: merge.
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/prs/{pr_number}/merge")
async def merge_pr(slug: str, pr_number: int, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_active_rfc(slug)
pr_row = _require_pr(slug, pr_number)
if not _can_merge(rfc, viewer):
raise HTTPException(403, "Only arbiters, RFC owners, and app admins/owners may merge")
if pr_row["state"] != "open":
raise HTTPException(409, f"PR is {pr_row['state']}, not open")
owner, repo = _owner_repo(rfc)
try:
await bot.merge_branch_pr(
viewer.as_actor(),
owner=owner,
repo=repo,
pr_number=pr_number,
head_branch=pr_row["head_branch"],
slug=slug,
)
except GiteaError as e:
# 409 from Gitea typically means a conflict — surface as
# the §10.9 conflict-replay signal rather than a generic 502.
if e.status == 409:
raise HTTPException(409, "Merge conflict with main — use Start resolution branch")
raise HTTPException(502, f"Gitea: {e.detail}")
await _refresh_after_pr_write(rfc)
return {"ok": True, "pr_number": pr_number}
# -------------------------------------------------------------------
# §10.8: withdraw.
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/prs/{pr_number}/withdraw")
async def withdraw_pr(slug: str, pr_number: int, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_active_rfc(slug)
pr_row = _require_pr(slug, pr_number)
if not _can_withdraw(rfc, pr_row, viewer):
raise HTTPException(403, "Only the contributor or an RFC owner/arbiter (or app admin/owner) may withdraw")
if pr_row["state"] != "open":
raise HTTPException(409, f"PR is {pr_row['state']}, not open")
owner, repo = _owner_repo(rfc)
try:
await bot.withdraw_branch_pr(
viewer.as_actor(),
owner=owner,
repo=repo,
pr_number=pr_number,
head_branch=pr_row["head_branch"],
slug=slug,
reason="withdraw",
)
except GiteaError as e:
raise HTTPException(502, f"Gitea: {e.detail}")
await _refresh_after_pr_write(rfc)
return {"ok": True, "pr_number": pr_number}
# -------------------------------------------------------------------
# §10.2: post-open title/description edits.
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/prs/{pr_number}/description")
async def edit_pr_description(
slug: str, pr_number: int, body: PRDescriptionBody, request: Request
) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_active_rfc(slug)
pr_row = _require_pr(slug, pr_number)
if not _can_edit_pr_text(rfc, pr_row, viewer):
raise HTTPException(403, "Only the contributor or an RFC owner/arbiter (or admin/owner) may edit")
# Per §10.2: title and description stay editable. For now we
# mutate the cache directly; the underlying Gitea PR could be
# updated too via the issues endpoint, but the cache is the
# source of truth for the surface so this is the relevant write.
db.conn().execute(
"""
UPDATE cached_prs
SET title = ?, description = ?
WHERE rfc_slug = ? AND pr_number = ?
""",
(body.title.strip(), body.description.strip(), slug, pr_number),
)
return {"ok": True}
# -------------------------------------------------------------------
# §10.9: cut a resolution branch and replay.
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/prs/{pr_number}/resolution-branch")
async def start_resolution_branch(slug: str, pr_number: int, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_active_rfc(slug)
pr_row = _require_pr(slug, pr_number)
if pr_row["state"] != "open":
raise HTTPException(409, f"PR is {pr_row['state']}, not open")
owner, repo = _owner_repo(rfc)
original_branch = pr_row["head_branch"]
# Confirm there is in fact a conflict — refusing this on a
# mergeable PR keeps the surface honest.
try:
live = await gitea.get_pull(owner, repo, pr_number)
except GiteaError as e:
raise HTTPException(502, f"Gitea: {e.detail}")
if live is None:
raise HTTPException(404, "PR not found on Gitea")
if live.get("mergeable") is True:
raise HTTPException(409, "PR is mergeable; no resolution branch needed")
resolution_branch = _resolution_branch_name(original_branch)
try:
await bot.cut_resolution_branch(
viewer.as_actor(),
owner=owner,
repo=repo,
original_branch=original_branch,
resolution_branch=resolution_branch,
slug=slug,
)
except GiteaError as e:
raise HTTPException(502, f"Gitea: {e.detail}")
# Record the parentage so subsequent open-pr on the resolution
# branch knows which original PR to supersede.
db.conn().execute(
"""
INSERT INTO pr_resolution_branches
(rfc_slug, original_pr_number, original_branch, resolution_branch)
VALUES (?, ?, ?, ?)
""",
(slug, pr_number, original_branch, resolution_branch),
)
# Default the resolution branch's visibility to public — it
# exists to land in a PR, and §11.3 will flip it anyway.
db.conn().execute(
"""
INSERT OR IGNORE INTO branch_visibility (rfc_slug, branch_name, read_public, contribute_mode)
VALUES (?, ?, 1, 'just-me')
""",
(slug, resolution_branch),
)
# Replay the original branch's accepted AI changes onto the
# resolution branch, one commit at a time. Per §10.9: the AI
# participant handles unambiguous conflicts; the rest surface
# to the contributor. For Slice 3 we apply changes whose
# `original` text still locates in the resolution branch's
# current RFC.md (the "unambiguous" case) and surface the
# rest as stale-pending changes on the resolution branch's
# chat, ready for the contributor to re-anchor.
unambiguous, ambiguous = await _replay_changes(
gitea=gitea,
bot=bot,
actor=viewer.as_actor(),
owner=owner,
repo=repo,
slug=slug,
file_path=_file_path_for(rfc),
is_super_draft=_is_super_draft(rfc),
original_branch=original_branch,
resolution_branch=resolution_branch,
)
# Seed the resolution branch's chat with a system-author
# message linking back to the original branch's chat per §10.9.
original_thread = db.conn().execute(
"""
SELECT id FROM threads
WHERE rfc_slug = ? AND branch_name = ?
AND anchor_kind = 'whole-doc' AND thread_kind = 'chat'
ORDER BY id LIMIT 1
""",
(slug, original_branch),
).fetchone()
# Materialize the resolution branch's whole-doc chat thread.
cur = db.conn().execute(
"""
INSERT INTO threads (rfc_slug, branch_name, anchor_kind, thread_kind, label, created_by)
VALUES (?, ?, 'whole-doc', 'chat', NULL, ?)
""",
(slug, resolution_branch, viewer.user_id),
)
new_thread_id = cur.lastrowid
link = (
f"Forked from this conversation → /rfc/{slug}?branch={original_branch} "
f"(thread id {original_thread['id'] if original_thread else 'n/a'}). "
f"Replayed {len(unambiguous)} change(s) cleanly; {len(ambiguous)} require manual re-anchoring."
)
chat_layer.append_system_message(thread_id=new_thread_id, text=link)
# Surface ambiguous changes as fresh pending stale rows on the
# resolution branch so the change panel offers the re-anchoring
# affordance immediately.
for ch in ambiguous:
db.conn().execute(
"""
INSERT INTO changes
(rfc_slug, branch_name, thread_id, kind, state,
original, proposed, reason, stale_since)
VALUES (?, ?, ?, 'ai', 'pending', ?, ?, ?, datetime('now'))
""",
(slug, resolution_branch, new_thread_id, ch["original"], ch["proposed"], ch["reason"]),
)
await _refresh_after_pr_write(rfc)
return {
"ok": True,
"resolution_branch": resolution_branch,
"replayed_clean": len(unambiguous),
"replayed_ambiguous": len(ambiguous),
}
# ------------------------------------------------------------------
# Helpers (closures over config/gitea/etc.)
# ------------------------------------------------------------------
def _require_rfc(slug: str):
row = db.conn().execute("SELECT * FROM cached_rfcs WHERE slug = ?", (slug,)).fetchone()
if row is None:
raise HTTPException(404, "RFC not found")
return row
def _require_active_rfc(slug: str):
"""Used by the §10 PR-flow read and write paths. Per §17's routing-
collapse rule, a super-draft RFC also routes here — its body-edit
PRs are meta-repo PRs with pr_kind='meta_body_edit', but the API
surface is identical."""
row = _require_rfc(slug)
if row["state"] not in ("active", "super-draft"):
raise HTTPException(409, f"RFC is {row['state']}")
if row["state"] == "active" and not row["repo"]:
raise HTTPException(409, "RFC has no repo")
return row
def _is_super_draft(rfc) -> bool:
return rfc["state"] == "super-draft"
def _owner_repo(rfc) -> tuple[str, str]:
if _is_super_draft(rfc):
return config.gitea_org, config.meta_repo
owner, repo = rfc["repo"].split("/", 1)
return owner, repo
def _file_path_for(rfc) -> str:
if _is_super_draft(rfc):
return f"rfcs/{rfc['slug']}.md"
return RFC_FILE_PATH
def _extract_body(rfc, file_contents: str) -> str:
"""For super-draft entries the file on disk is the full
frontmatter+body envelope; the editable body is entry.body."""
if not _is_super_draft(rfc):
return file_contents
try:
entry = entry_mod.parse(file_contents)
except Exception:
return file_contents
return entry.body
def _require_pr(slug: str, pr_number: int):
# Dispatch by RFC state: super-draft body-edit PRs live on the
# meta repo as pr_kind='meta_body_edit'; active RFC PRs live on
# the per-RFC repo as 'rfc_branch'. The API surface and the §10
# treatment are identical.
row = db.conn().execute(
"""
SELECT * FROM cached_prs
WHERE rfc_slug = ? AND pr_number = ?
AND pr_kind IN ('rfc_branch', 'meta_body_edit')
""",
(slug, pr_number),
).fetchone()
if not row:
raise HTTPException(404, "PR not found")
return row
def _branch_has_commits_ahead(slug: str, branch: str) -> bool:
"""Cheap heuristic: the cache records main + branch head shas,
which mismatch when the branch has any commit not on main. The
meta-repo branch refresh (cache.refresh_meta_branches) synthesizes
a per-slug 'main' row for super-drafts so this works uniformly."""
row = db.conn().execute(
"""
SELECT b.head_sha AS branch_sha,
(SELECT head_sha FROM cached_branches
WHERE rfc_slug = ? AND branch_name = 'main') AS main_sha
FROM cached_branches b
WHERE b.rfc_slug = ? AND b.branch_name = ?
""",
(slug, slug, branch),
).fetchone()
if not row or not row["branch_sha"]:
return False
return row["branch_sha"] != row["main_sha"]
def _resolution_origin(slug: str, branch: str) -> int | None:
row = db.conn().execute(
"""
SELECT original_pr_number FROM pr_resolution_branches
WHERE rfc_slug = ? AND resolution_branch = ?
""",
(slug, branch),
).fetchone()
return row["original_pr_number"] if row else None
async def _refresh_after_pr_write(rfc) -> None:
if _is_super_draft(rfc):
await cache.refresh_meta_repo(config, gitea)
await cache.refresh_meta_branches(config, gitea)
await cache.refresh_meta_pulls(config, gitea)
else:
await cache.refresh_rfc_repo(config, gitea, rfc["slug"])
return router
# ---------------------------------------------------------------------------
# Capability helpers (module-level, since they don't need closure)
# ---------------------------------------------------------------------------
def _can_merge(rfc, viewer) -> bool:
"""§6.1 admin/owner OR §6.3 RFC owners/arbiters."""
if viewer is None:
return False
if viewer.role in ("owner", "admin"):
return True
owners = json.loads(rfc["owners_json"] or "[]")
arbiters = json.loads(rfc["arbiters_json"] or "[]")
return viewer.gitea_login in owners or viewer.gitea_login in arbiters
def _can_withdraw(rfc, pr_row, viewer) -> bool:
"""§10.8: the contributor OR any arbiter / RFC owner / app admin/owner."""
if viewer is None:
return False
if _can_merge(rfc, viewer):
return True
return pr_row["opened_by"] == viewer.gitea_login
def _can_edit_pr_text(rfc, pr_row, viewer) -> bool:
"""Per §10.2 last paragraph: title/description editable by the
contributor or any RFC arbiter (which collapses to the same set as
withdraw)."""
return _can_withdraw(rfc, pr_row, viewer)
def _pr_capabilities(rfc, pr_row, viewer) -> dict:
return {
"can_merge": _can_merge(rfc, viewer) and pr_row["state"] == "open",
"can_withdraw": _can_withdraw(rfc, pr_row, viewer) and pr_row["state"] == "open",
"can_edit_text": _can_edit_pr_text(rfc, pr_row, viewer) and pr_row["state"] == "open",
"can_post_review": viewer is not None and pr_row["state"] == "open",
"can_resolve_conflict": viewer is not None and pr_row["state"] == "open",
"is_anonymous": viewer is None,
}
# ---------------------------------------------------------------------------
# AI-drafted title and description (§10.2)
# ---------------------------------------------------------------------------
def _branch_chat_excerpt(slug: str, branch: str, limit: int = 40) -> list[dict]:
rows = db.conn().execute(
"""
SELECT m.role, m.text
FROM thread_messages m
JOIN threads t ON t.id = m.thread_id
WHERE t.rfc_slug = ? AND t.branch_name = ?
AND t.thread_kind IN ('chat', 'review')
AND m.role IN ('user', 'assistant')
ORDER BY m.id DESC
LIMIT ?
""",
(slug, branch, limit),
).fetchall()
items = [{"role": r["role"], "text": r["text"]} for r in reversed(rows)]
return items
def _draft_with_provider(
*,
providers: dict[str, BaseProvider],
default_model: str,
rfc_title: str,
main_body: str,
branch_body: str,
chat_messages: list[dict],
) -> tuple[str, str]:
"""Per §10.2: AI-drafted title (spec voice) and description (24
sentences pulling from chat).
When no provider is configured we fall back to a deterministic
stub — the surface still works; the contributor just edits the
text. The fallback also matches the test seam where Slice 3
integration tests don't always inject a fake provider.
"""
if not providers:
return _stub_draft(rfc_title=rfc_title, main_body=main_body, branch_body=branch_body)
provider = providers.get(default_model) or next(iter(providers.values()))
system = (
"You are summarizing a contributor's proposed change to an RFC for an arbiter audience. "
"Output exactly two sections in this order: 'TITLE: <one line, structural, spec-voice>' "
"then 'DESCRIPTION: <two to four sentences pulling what was argued and what shifted>'. "
"No prelude, no closing."
)
chat_dump = "\n".join(f"- {m['role']}: {m['text'][:600]}" for m in chat_messages[-20:])
user_msg = (
f"RFC: {rfc_title}\n\n"
f"--- main RFC.md ---\n{main_body[:6000]}\n\n"
f"--- branch RFC.md ---\n{branch_body[:6000]}\n\n"
f"--- recent branch chat ---\n{chat_dump or '(empty)'}\n"
)
try:
text = provider.send(system, [{"role": "user", "content": user_msg}])
except Exception as exc:
log.warning("pr-draft provider failed: %s", exc)
return _stub_draft(rfc_title=rfc_title, main_body=main_body, branch_body=branch_body)
title, description = _split_title_description(text)
if not title:
title, _desc = _stub_draft(rfc_title=rfc_title, main_body=main_body, branch_body=branch_body)
return title, description
def _split_title_description(text: str) -> tuple[str, str]:
"""Parse the `TITLE: ... DESCRIPTION: ...` shape the prompt asks for.
Tolerant of variations the model might emit — leading/trailing
whitespace, the model adding markdown emphasis around the labels —
and falls back to the whole text as description if the title
label isn't present."""
title = ""
description = text.strip()
lower = text.lower()
title_idx = lower.find("title:")
desc_idx = lower.find("description:")
if title_idx >= 0:
end = desc_idx if desc_idx > title_idx else len(text)
title_line = text[title_idx + len("title:") : end].strip()
title = title_line.split("\n", 1)[0].strip().strip("*_`")
if desc_idx >= 0:
description = text[desc_idx + len("description:") :].strip().strip("*_`")
return title[:240], description[:8000]
def _stub_draft(*, rfc_title: str, main_body: str, branch_body: str) -> tuple[str, str]:
delta = abs(len(branch_body) - len(main_body))
title = f"Edits to {rfc_title}"
description = (
f"Proposed revisions to {rfc_title}. The branch's RFC.md differs from main "
f"by {delta} characters. Arbiters: please review the diff inline and the "
f"branch chat for the argument."
)
return title, description
# ---------------------------------------------------------------------------
# §10.9 replay
# ---------------------------------------------------------------------------
async def _replay_changes(
*,
gitea: Gitea,
bot: Bot,
actor,
owner: str,
repo: str,
slug: str,
file_path: str,
is_super_draft: bool,
original_branch: str,
resolution_branch: str,
) -> tuple[list[dict], list[dict]]:
"""Walk the original branch's accepted AI-kind changes in creation
order and try to apply each to the resolution branch.
Returns (unambiguous_changes_applied, ambiguous_changes_skipped).
For super-draft body edits the file is rfcs/<slug>.md and the body
lives inside the frontmatter envelope — extract the body for the
`original`-text match and re-wrap before committing.
"""
rows = db.conn().execute(
"""
SELECT id, kind, original, proposed, reason
FROM changes
WHERE rfc_slug = ? AND branch_name = ? AND state = 'accepted' AND kind = 'ai'
ORDER BY id
""",
(slug, original_branch),
).fetchall()
unambiguous: list[dict] = []
ambiguous: list[dict] = []
for r in rows:
fetched = await gitea.read_file(owner, repo, file_path, ref=resolution_branch)
if fetched is None:
ambiguous.append({"original": r["original"], "proposed": r["proposed"], "reason": r["reason"] or ""})
continue
current_content, current_sha = fetched
current_body = _extract_body_for_replay(is_super_draft, current_content)
original_text = r["original"] or ""
if not original_text or current_body.count(original_text) != 1:
ambiguous.append({"original": r["original"], "proposed": r["proposed"], "reason": r["reason"] or ""})
continue
new_body = current_body.replace(original_text, r["proposed"], 1)
new_content = _wrap_body_for_replay(is_super_draft, current_content, new_body)
try:
await bot.commit_replay_change(
actor,
owner=owner,
repo=repo,
branch=resolution_branch,
file_path=file_path,
new_content=new_content,
prior_sha=current_sha,
original_change_id=r["id"],
original=r["original"] or "",
proposed=r["proposed"] or "",
reason=r["reason"] or "",
slug=slug,
)
unambiguous.append({"original": r["original"], "proposed": r["proposed"], "reason": r["reason"] or ""})
except GiteaError as e:
log.warning("replay change %d failed: %s", r["id"], e)
ambiguous.append({"original": r["original"], "proposed": r["proposed"], "reason": r["reason"] or ""})
return unambiguous, ambiguous
def _extract_body_for_replay(is_super_draft: bool, content: str) -> str:
if not is_super_draft:
return content
try:
return entry_mod.parse(content).body
except Exception:
return content
def _wrap_body_for_replay(is_super_draft: bool, prior_content: str, new_body: str) -> str:
if not is_super_draft:
return new_body
entry = entry_mod.parse(prior_content)
entry.body = new_body if new_body.endswith("\n") else new_body + "\n"
return entry_mod.serialize(entry)
def _resolution_branch_name(original_branch: str) -> str:
"""Per §10.9: a fresh branch name derived from the original.
Slice 3 picks `<original>-resolved-<hex>`. Exact format is an
implementation detail per §8.14's voice — kept short, ref-safe,
and traceable to the parent."""
import secrets
suffix = secrets.token_hex(3)
base = original_branch
if len(base) > 80:
base = base[:80]
return f"{base}-resolved-{suffix}"
# ---------------------------------------------------------------------------
# Serialization helpers — mirror api_branches.py shape
# ---------------------------------------------------------------------------
def _serialize_thread(row) -> dict[str, Any]:
payload = row["anchor_payload"]
try:
anchor = json.loads(payload) if payload else None
except Exception:
anchor = None
return {
"id": row["id"],
"anchor_kind": row["anchor_kind"],
"anchor_payload": anchor,
"thread_kind": row["thread_kind"],
"label": row["label"],
"state": row["state"],
"created_by": row["created_by"],
"created_at": row["created_at"],
"resolved_at": row["resolved_at"],
"resolved_by": row["resolved_by"],
}
def _serialize_message(row) -> dict[str, Any]:
return {
"id": row["id"],
"thread_id": row["thread_id"],
"role": row["role"],
"author_user_id": row["author_user_id"],
"author_login": row["author_login"],
"author_display": row["author_display"],
"model_id": row["model_id"],
"text": row["text"],
"quote": row["quote"],
"created_at": row["created_at"],
}