Slice 8 WIP: §12 hygiene + §10.7 + routing + rollback cleanup

- Add §12 30/90 hygiene scheduler in hygiene.py, mirroring the
  DigestScheduler shape; wires next to digest in main.py with the
  same start/stop/run_tick test seam.
- Extend bot.delete_branch to accept actor=None for system gestures,
  per §15.9 (actor_user_id=NULL, on_behalf_of=bot_login).
- Convert every branches/{branch} route in api_branches.py and
  api_prs.py to {branch:path}; move the bare GET to the bottom of
  the router so deeper GETs match before greedy-path swallow.
- Extend api_prs.py's _require_pr to accept pr_kind='meta_metadata'
  so the §9.5 metadata-pane PRs land an in-app merge.
- Graduation rollback now deletes the graduate-<slug>-<6hex> branch
  after closing the PR — §19.2 candidate that lands here.
- Email-bounce webhook gains a WEBHOOK_EMAIL_BOUNCE_SECRET seam.
- FakeGitea grows a DELETE /branches/{branch:path} handler and a
  slashed-branch read; integration tests for the hygiene vertical
  cover the 30d close, 90d delete, post-merge delete, pinned
  exemption, per-user cursor preservation, no-notification rule,
  and the graduation-rollback cleanup.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ben Stull
2026-05-25 04:03:09 -07:00
parent 060fa408a2
commit 1a0c4428af
9 changed files with 965 additions and 105 deletions
+109 -91
View File
@@ -258,83 +258,15 @@ def make_router(
"pre_graduation_history": pre_grad,
}
# -------------------------------------------------------------------
# §17: GET /api/rfcs/<slug>/branches/<branch>
# Per §4: branch bodies are NOT cached — fetch live from Gitea.
# Per §9.5 / §17: when slug resolves to super-draft, <branch> names
# a meta-repo branch and the underlying file is rfcs/<slug>.md with
# the body wrapped in frontmatter.
# -------------------------------------------------------------------
@router.get("/api/rfcs/{slug}/branches/{branch}")
async def get_branch_view(slug: str, branch: str, request: Request) -> dict[str, Any]:
viewer = auth.current_user(request)
rfc = _require_rfc_with_repo(slug)
if not _can_read_branch(slug, branch, viewer):
raise HTTPException(403, "Branch is private")
owner, repo = _repo_for(rfc, branch)
path = _file_path_for(rfc, branch)
result = await gitea.read_file(owner, repo, path, ref=branch)
if result is None:
br = await gitea.get_branch(owner, repo, branch)
if br is None:
raise HTTPException(404, "Branch not found")
body, body_sha = "", ""
else:
content, body_sha = result
body = _extract_body(rfc, content, branch)
# Ensure the whole-doc chat thread for the branch exists.
thread_id = _ensure_branch_chat_thread(slug, branch, viewer)
# Sub-threads (range/paragraph) and flags scoped to this branch.
thread_rows = db.conn().execute(
"""
SELECT id, anchor_kind, anchor_payload, thread_kind, label, state, created_by, created_at
FROM threads
WHERE rfc_slug = ? AND branch_name = ?
ORDER BY id
""",
(slug, branch),
).fetchall()
threads = [_serialize_thread(t) for t in thread_rows]
# Visibility, contribute, grants.
vis = _branch_vis(slug, branch)
grants = _branch_grants(slug, branch)
# Pending and resolved changes scoped to this branch.
changes_rows = db.conn().execute(
"""
SELECT id, thread_id, source_message_id, kind, state, original, proposed, reason,
was_edited_before_accept, stale_since, acted_by, acted_at, commit_sha, created_at
FROM changes
WHERE rfc_slug = ? AND branch_name = ?
ORDER BY id
""",
(slug, branch),
).fetchall()
changes = [_serialize_change(c) for c in changes_rows]
# Branch metadata for the breadcrumb / header.
creator = _branch_creator(slug, branch)
capabilities = _capabilities(rfc, slug, branch, viewer, creator)
return {
"slug": slug,
"title": rfc["title"],
"branch_name": branch,
"body": body,
"body_sha": body_sha,
"main_thread_id": thread_id,
"threads": threads,
"changes": changes,
"visibility": vis,
"grants": grants,
"creator": creator,
"capabilities": capabilities,
}
# The bare `GET /api/rfcs/<slug>/branches/<branch>` is declared
# at the *bottom* of this router so the more-specific deeper GET
# routes — `branches/{branch:path}/threads` and
# `branches/{branch:path}/threads/{thread_id}/messages` — match
# before the bare GET swallows a sub-route path with `:path`'s
# greedy match. Per the §19.2 "branch-name path routing" candidate
# Slice 8 settles: ordering discipline against `{branch:path}` is
# how the slashed-branch read works without collisions. See
# `get_branch_view` below `stream_chat_turn`.
# -------------------------------------------------------------------
# §17: POST /api/rfcs/<slug>/branches/main/promote-to-branch
@@ -506,7 +438,7 @@ def make_router(
# §17 / §8.9: accept / decline / reask a change
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/branches/{branch}/changes/{change_id}/accept")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/changes/{change_id}/accept")
async def accept_change(
slug: str,
branch: str,
@@ -599,7 +531,7 @@ def make_router(
return {"ok": True, "commit_sha": sha, "change_id": change_id}
@router.post("/api/rfcs/{slug}/branches/{branch}/changes/{change_id}/decline")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/changes/{change_id}/decline")
async def decline_change(slug: str, branch: str, change_id: int, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
_require_rfc_with_repo(slug)
@@ -617,7 +549,7 @@ def make_router(
)
return {"ok": True, "change_id": change_id}
@router.post("/api/rfcs/{slug}/branches/{branch}/changes/{change_id}/reask")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/changes/{change_id}/reask")
async def reask_change(slug: str, branch: str, change_id: int, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_rfc_with_repo(slug)
@@ -674,7 +606,7 @@ def make_router(
# §17 / §8.11 / §10.6: manual-edit flush
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/branches/{branch}/manual-flush")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/manual-flush")
async def manual_flush(slug: str, branch: str, body: ManualFlushBody, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_rfc_with_repo(slug)
@@ -751,7 +683,7 @@ def make_router(
# §17 / §11: visibility + contribute + grants
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/branches/{branch}/visibility")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/visibility")
async def set_branch_visibility(slug: str, branch: str, body: VisibilityBody, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_rfc_with_repo(slug)
@@ -772,7 +704,7 @@ def make_router(
)
return {"ok": True, "visibility": _branch_vis(slug, branch)}
@router.post("/api/rfcs/{slug}/branches/{branch}/grants")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/grants")
async def add_branch_grant(slug: str, branch: str, body: GrantBody, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_rfc_with_repo(slug)
@@ -793,7 +725,7 @@ def make_router(
)
return {"ok": True, "grants": _branch_grants(slug, branch)}
@router.delete("/api/rfcs/{slug}/branches/{branch}/grants/{grantee_login}")
@router.delete("/api/rfcs/{slug}/branches/{branch:path}/grants/{grantee_login}")
async def revoke_branch_grant(slug: str, branch: str, grantee_login: str, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_rfc_with_repo(slug)
@@ -813,7 +745,7 @@ def make_router(
# §17 / §8.12 / §8.13: threads
# -------------------------------------------------------------------
@router.get("/api/rfcs/{slug}/branches/{branch}/threads")
@router.get("/api/rfcs/{slug}/branches/{branch:path}/threads")
async def list_branch_threads(slug: str, branch: str, request: Request) -> dict[str, Any]:
viewer = auth.current_user(request)
_require_rfc_with_repo(slug)
@@ -831,7 +763,7 @@ def make_router(
).fetchall()
return {"items": [_serialize_thread(r) for r in rows]}
@router.post("/api/rfcs/{slug}/branches/{branch}/threads")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/threads")
async def create_branch_thread(slug: str, branch: str, body: ThreadCreateBody, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
_require_rfc_with_repo(slug)
@@ -861,7 +793,7 @@ def make_router(
)
return {"thread_id": thread_id, "message_id": message_id}
@router.get("/api/rfcs/{slug}/branches/{branch}/threads/{thread_id}/messages")
@router.get("/api/rfcs/{slug}/branches/{branch:path}/threads/{thread_id}/messages")
async def get_thread_messages(slug: str, branch: str, thread_id: int, request: Request) -> dict[str, Any]:
viewer = auth.current_user(request)
_require_rfc_with_repo(slug)
@@ -884,7 +816,7 @@ def make_router(
"messages": [_serialize_message(r) for r in rows],
}
@router.post("/api/rfcs/{slug}/branches/{branch}/threads/{thread_id}/messages")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/threads/{thread_id}/messages")
async def post_thread_message(
slug: str, branch: str, thread_id: int, body: ThreadMessageBody, request: Request
) -> dict[str, Any]:
@@ -901,7 +833,7 @@ def make_router(
)
return {"ok": True, "message_id": message_id}
@router.post("/api/rfcs/{slug}/branches/{branch}/chat-seen")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/chat-seen")
async def advance_chat_seen(slug: str, branch: str, body: dict, request: Request) -> dict[str, Any]:
"""§15.7 chat-seen cursor advance.
@@ -930,7 +862,7 @@ def make_router(
)
return {"ok": True, "reconciled": reconciled}
@router.post("/api/rfcs/{slug}/branches/{branch}/threads/{thread_id}/resolve")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/threads/{thread_id}/resolve")
async def resolve_thread(slug: str, branch: str, thread_id: int, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_rfc_with_repo(slug)
@@ -951,7 +883,7 @@ def make_router(
# §17 / §18 carryover: SSE-streaming chat turn on a thread
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/branches/{branch}/threads/{thread_id}/chat")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/threads/{thread_id}/chat")
async def stream_chat_turn(
slug: str, branch: str, thread_id: int, body: ChatTurnBody, request: Request
):
@@ -1015,6 +947,92 @@ def make_router(
}
return StreamingResponse(event_stream(), media_type="text/event-stream", headers=headers)
# -------------------------------------------------------------------
# §17: GET /api/rfcs/<slug>/branches/<branch>
# Per §4: branch bodies are NOT cached — fetch live from Gitea.
# Per §9.5 / §17: when slug resolves to super-draft, <branch> names
# a meta-repo branch and the underlying file is rfcs/<slug>.md with
# the body wrapped in frontmatter.
#
# Declared LAST among the branch-scoped GET routes per the §19.2
# "branch-name path routing" candidate: `{branch:path}` is greedy
# by Starlette's converter, so `branches/foo/threads` would match
# this bare GET with branch=foo/threads if declared first. Putting
# the more-specific `threads` and `threads/{thread_id}/messages`
# GETs above lets them claim those URLs; this one catches anything
# else, including slashed branch names like `foo/bar`.
# -------------------------------------------------------------------
@router.get("/api/rfcs/{slug}/branches/{branch:path}")
async def get_branch_view(slug: str, branch: str, request: Request) -> dict[str, Any]:
viewer = auth.current_user(request)
rfc = _require_rfc_with_repo(slug)
if not _can_read_branch(slug, branch, viewer):
raise HTTPException(403, "Branch is private")
owner, repo = _repo_for(rfc, branch)
path = _file_path_for(rfc, branch)
result = await gitea.read_file(owner, repo, path, ref=branch)
if result is None:
br = await gitea.get_branch(owner, repo, branch)
if br is None:
raise HTTPException(404, "Branch not found")
body, body_sha = "", ""
else:
content, body_sha = result
body = _extract_body(rfc, content, branch)
# Ensure the whole-doc chat thread for the branch exists.
thread_id = _ensure_branch_chat_thread(slug, branch, viewer)
# Sub-threads (range/paragraph) and flags scoped to this branch.
thread_rows = db.conn().execute(
"""
SELECT id, anchor_kind, anchor_payload, thread_kind, label, state, created_by, created_at
FROM threads
WHERE rfc_slug = ? AND branch_name = ?
ORDER BY id
""",
(slug, branch),
).fetchall()
threads = [_serialize_thread(t) for t in thread_rows]
# Visibility, contribute, grants.
vis = _branch_vis(slug, branch)
grants = _branch_grants(slug, branch)
# Pending and resolved changes scoped to this branch.
changes_rows = db.conn().execute(
"""
SELECT id, thread_id, source_message_id, kind, state, original, proposed, reason,
was_edited_before_accept, stale_since, acted_by, acted_at, commit_sha, created_at
FROM changes
WHERE rfc_slug = ? AND branch_name = ?
ORDER BY id
""",
(slug, branch),
).fetchall()
changes = [_serialize_change(c) for c in changes_rows]
# Branch metadata for the breadcrumb / header.
creator = _branch_creator(slug, branch)
capabilities = _capabilities(rfc, slug, branch, viewer, creator)
return {
"slug": slug,
"title": rfc["title"],
"branch_name": branch,
"body": body,
"body_sha": body_sha,
"main_thread_id": thread_id,
"threads": threads,
"changes": changes,
"visibility": vis,
"grants": grants,
"creator": creator,
"capabilities": capabilities,
}
# ------------------------------------------------------------------
# Permission + state helpers (closures, share `config` etc.)
# ------------------------------------------------------------------
+19
View File
@@ -846,6 +846,25 @@ async def _undo_open_pr(*, config, gitea, bot, actor, state) -> str:
head_branch=state.graduation_branch or "",
slug=state.slug, reason="graduation rollback",
)
# Per the §19.2 "graduation rollback's branch cleanup" candidate
# that Slice 8 settles: delete the dash-suffixed branch on rollback
# so failed-graduation branches don't accumulate on the meta repo.
# The §12 hygiene sweep would catch this eventually, but closing
# the loop here removes the chance of pile-up across retries.
branch_name = state.graduation_branch or ""
if branch_name:
try:
await bot.delete_branch(
actor,
owner=config.gitea_org,
repo=config.meta_repo,
branch=branch_name,
slug=state.slug,
action_kind="delete_post_merge_branch",
reason="graduation rollback",
)
except Exception:
log.exception("rollback: delete_branch failed for %s", branch_name)
return f"Closed PR #{state.new_pr_number}"
+20 -4
View File
@@ -390,11 +390,27 @@ def make_router(config: Config) -> APIRouter:
)
@router.post("/api/webhooks/email-bounce")
async def email_bounce(body: BounceBody) -> dict[str, Any]:
async def email_bounce(body: BounceBody, request: Request) -> dict[str, Any]:
# §15.4: hard bounces and complaints flip the global opt-out.
# The webhook is unauthenticated here for v1 — the SMTP provider's
# callback URL is the contract. Tighten with a signing secret
# when an actual provider is wired in.
# Per the §19.2 "email bounce webhook authentication" candidate
# Slice 8 settles: when `WEBHOOK_EMAIL_BOUNCE_SECRET` is set in
# env, the webhook requires the same value in the
# `X-Webhook-Secret` header. The shared-secret shape is the
# narrowest seam that covers the major providers — Sendgrid's
# `X-Twilio-Email-Event-Webhook-Signature`, SES via SNS topic
# signatures, Postmark's HTTP basic auth — without forcing a
# per-provider verifier today. When the operator wires a real
# SMTP provider they pick the equivalent shared-secret or
# rotate the value behind whichever signature scheme the
# provider supports. When the env var is unset the webhook
# stays unauthenticated for dev (the v1 contract).
import os as _os
expected = _os.environ.get("WEBHOOK_EMAIL_BOUNCE_SECRET", "").strip()
if expected:
received = request.headers.get("X-Webhook-Secret", "")
import hmac as _hmac
if not received or not _hmac.compare_digest(expected, received):
raise HTTPException(401, "Invalid webhook signature")
row = db.conn().execute(
"SELECT id FROM users WHERE LOWER(email) = LOWER(?)", (body.email,),
).fetchone()
+8 -3
View File
@@ -77,7 +77,7 @@ def make_router(
# produces the open-pr call; the draft is just a starting point.
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/branches/{branch}/pr-draft")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/pr-draft")
async def draft_pr_text(slug: str, branch: str, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_active_rfc(slug)
@@ -107,7 +107,7 @@ def make_router(
# branch's read_public unconditionally.
# -------------------------------------------------------------------
@router.post("/api/rfcs/{slug}/branches/{branch}/open-pr")
@router.post("/api/rfcs/{slug}/branches/{branch:path}/open-pr")
async def open_pr(slug: str, branch: str, body: OpenPRBody, request: Request) -> dict[str, Any]:
viewer = auth.require_contributor(request)
rfc = _require_active_rfc(slug)
@@ -660,11 +660,16 @@ def make_router(
# exposed through this surface — the merge path is the only
# affordance an admin needs, and the §10 review machinery
# gracefully degrades for frontmatter-only PRs.
# Slice 8: §9.5 metadata-pane PRs (`meta_metadata`) land here
# too per the §19.2 "in-app merge for metadata PRs" candidate.
# The diff-rendered review surface degrades gracefully — there
# is no body diff worth reviewing — but the merge gesture lands
# in-app rather than forcing the Gitea round-trip.
row = db.conn().execute(
"""
SELECT * FROM cached_prs
WHERE rfc_slug = ? AND pr_number = ?
AND pr_kind IN ('rfc_branch', 'meta_body_edit', 'meta_claim')
AND pr_kind IN ('rfc_branch', 'meta_body_edit', 'meta_claim', 'meta_metadata')
""",
(slug, pr_number),
).fetchone()
+76 -4
View File
@@ -874,10 +874,12 @@ class Bot:
slug: str,
reason: str,
) -> None:
"""Undo of `open_graduation_pr`. Closes the PR without merging; the
branch is left in place to dodge the case where another graduation
attempt runs immediately — it'll get its own `graduate-<slug>-<hex>`
suffix."""
"""Undo of `open_graduation_pr`. Closes the PR without merging.
The companion `delete_branch` call lives next to the rollback
caller in `api_graduation.py` per the §19.2 'graduation rollback's
branch cleanup' candidate Slice 8 settles — the §12 hygiene
sweep would catch the branch eventually, but closing the loop
on rollback avoids accumulation."""
await self._gitea.close_pull(org, meta_repo, pr_number)
_log(
actor,
@@ -888,6 +890,76 @@ class Bot:
details={"reason": reason},
)
# ----- §12 hygiene: branch deletion -----
async def delete_branch(
self,
actor: Actor | None,
*,
owner: str,
repo: str,
branch: str,
slug: str | None,
action_kind: str,
reason: str,
bot_login: str | None = None,
) -> None:
"""Per §12: the bot deletes a stale branch from Gitea.
Three callers: the §12 hygiene sweep (90-day boundary on
meta-repo edit branches), the §10.7 90-day post-merge timer
for per-RFC PR branches, and the graduation-rollback cleanup
for `graduate-<slug>-<6hex>` per §19.2 "graduation rollback's
branch cleanup."
For the timer paths the caller passes `actor=None`; the audit
row lands with `actor_user_id=NULL` and `on_behalf_of=bot_login`
per §15.9's "system-generated events" rule — "the app" in the
noun slot. For the rollback case the human actor flows through
the standard `_log` shape.
Idempotent against the Gitea API — 404 from a prior delete is
swallowed so a retried sweep doesn't crash.
"""
try:
await self._gitea.delete_branch(owner, repo, branch)
except Exception as exc:
from .gitea import GiteaError as _GE
if isinstance(exc, _GE) and exc.status == 404:
pass
else:
raise
details = {"repo": f"{owner}/{repo}", "reason": reason}
if actor is None:
# System actor: write the audit row directly. Fan-out is
# skipped — `delete_stale_branch` and `delete_post_merge_branch`
# are intentionally absent from `notify._AUTO_WATCH_ACTIONS`
# and `_ROUTING`, so no notification fires. The branches
# being deleted are stale; the population that watched them
# would be churn-grade noise per §15.4.
db.conn().execute(
"""
INSERT INTO actions
(actor_user_id, on_behalf_of, action_kind, rfc_slug, branch_name, details)
VALUES (NULL, ?, ?, ?, ?, ?)
""",
(
bot_login or "",
action_kind,
slug,
branch,
json.dumps(details),
),
)
return
_log(
actor,
action_kind,
rfc_slug=slug,
branch_name=branch,
details=details,
)
# ----- §13.1 claim PRs -----
async def open_claim_pr(
+334
View File
@@ -0,0 +1,334 @@
"""§12: the branch-hygiene scheduler.
The structural commitment Slice 8 owes. Closes the loop on §11.5's
branch lifecycle (open → closed at 30d → deleted at 90d) and on §10.7's
post-merge deletion timer for per-RFC PR branches.
The sweep rides next to `DigestScheduler` per the §19.1 brief — same
`start` / `stop` / `run_tick` shape, same hourly cadence by default,
same test seam pattern. The cadence is configurable via
`HYGIENE_TICK_SECONDS` for tests and dev.
Per §15.9 and the §19.1 brief, hygiene actions fire as "the app"
`actor_user_id = NULL` and `on_behalf_of = bot_login`. The action
kinds (`close_idle_branch`, `delete_stale_branch`,
`delete_post_merge_branch`) are intentionally outside
`notify._AUTO_WATCH_ACTIONS` and `notify._ROUTING`, so no notifications
fan out for the hygiene gestures. The branches being touched are stale
by definition; the affected population would be churn-grade noise per
§15.4.
The per-user message-cursor preservation contract per §11.5: this
module never touches `branch_chat_messages` or `branch_chat_seen`.
Chat history survives the branch's deletion in Gitea because those
tables are app-canonical, not cached.
"""
from __future__ import annotations
import asyncio
import logging
import os
from datetime import datetime, timedelta, timezone
from . import db
from .bot import Bot
from .config import Config
log = logging.getLogger(__name__)
# Window sizes per §11.5 / §12 / §10.7. The 30/90 numbers are the
# canonical spec values; they are exposed as env vars so the integration
# tests can compress windows to small fractions of a second without
# touching production code.
def _close_after_days() -> int:
return int(os.environ.get("HYGIENE_CLOSE_AFTER_DAYS", "30"))
def _delete_after_days() -> int:
return int(os.environ.get("HYGIENE_DELETE_AFTER_DAYS", "90"))
# ---------------------------------------------------------------------------
# Scheduler shell — mirrors DigestScheduler
# ---------------------------------------------------------------------------
class HygieneScheduler:
"""Periodic task wrapper that drives `run_tick()`.
Same lifecycle contract as DigestScheduler so the operator's mental
model stays "the app has three scheduled jobs, all the same shape"
(reconciler, digest, hygiene)."""
def __init__(self, *, config: Config, bot: Bot, tick_seconds: int | None = None):
self._config = config
self._bot = bot
self._tick = tick_seconds or int(os.environ.get("HYGIENE_TICK_SECONDS", "3600"))
self._task: asyncio.Task | None = None
self._stop = asyncio.Event()
def start(self) -> None:
if self._task is None:
self._task = asyncio.create_task(self._loop())
async def stop(self) -> None:
self._stop.set()
if self._task is not None:
await self._task
async def _loop(self) -> None:
await self._safe_tick()
while not self._stop.is_set():
try:
await asyncio.wait_for(self._stop.wait(), timeout=self._tick)
except asyncio.TimeoutError:
pass
if self._stop.is_set():
break
await self._safe_tick()
async def _safe_tick(self) -> None:
try:
await run_tick(config=self._config, bot=self._bot)
except Exception:
log.exception("hygiene tick failed")
# ---------------------------------------------------------------------------
# The tick itself
# ---------------------------------------------------------------------------
async def run_tick(*, config: Config, bot: Bot, now: datetime | None = None) -> dict[str, int]:
"""One pass over the §12 + §10.7 surfaces.
Returns counters for observability and tests. Idempotent — a second
tick within the same window is a no-op because the state-flip is
monotonic (open → closed → deleted).
Tests pass an explicit `now` to control the time horizon; production
uses `datetime.now(timezone.utc)`.
"""
if now is None:
now = datetime.now(timezone.utc)
closed_after = timedelta(days=_close_after_days())
deleted_after = timedelta(days=_delete_after_days())
close_cutoff = (now - closed_after).strftime("%Y-%m-%d %H:%M:%S")
delete_cutoff = (now - deleted_after).strftime("%Y-%m-%d %H:%M:%S")
counters = {
"closed_idle": 0,
"closed_post_merge": 0,
"deleted_stale": 0,
"deleted_post_merge": 0,
}
# Order matters: deletes fire BEFORE closes so a branch that
# crosses both boundaries in the same sweep (a long-merged PR
# whose branch is still open in the cache, the cache-bootstrap
# case the brief calls out) goes straight to 'deleted' rather
# than spending one tick at 'closed' with a fresh closed_at that
# would delay the delete by another 90 days. Real-time sweeps see
# the two windows 60 days apart, so this is only load-bearing for
# cache-bootstrap and clock-jump cases — but those are exactly the
# cases this slice hardens against.
# ---- 90-day delete: §10.7 fast-path for merged-PR branches that
# never got flipped to 'closed' (cache-bootstrap from history the
# bot did not author, or a process restart that skipped enough
# ticks for both boundaries to land in one sweep). ----
post_merge_delete = db.conn().execute(
f"""
SELECT DISTINCT b.rfc_slug, b.branch_name
FROM cached_branches b
JOIN cached_prs p
ON p.rfc_slug = b.rfc_slug
AND p.head_branch = b.branch_name
WHERE b.state IN ('open', 'closed')
AND b.pinned = 0
AND p.state = 'merged'
AND COALESCE(p.merged_at, '') != ''
AND p.merged_at <= ?
""",
(delete_cutoff,),
).fetchall()
for r in post_merge_delete:
ok = await _delete_branch_via_bot(
config=config, bot=bot,
slug=r["rfc_slug"], branch=r["branch_name"],
action_kind="delete_post_merge_branch",
reason="90d post-merge",
)
if ok:
counters["deleted_post_merge"] += 1
# ---- 90-day delete: idle branches that closed long enough ago ----
stale_rows = db.conn().execute(
f"""
SELECT b.rfc_slug, b.branch_name, b.closed_at, b.last_commit_at
FROM cached_branches b
WHERE b.state = 'closed'
AND b.pinned = 0
AND b.branch_name != 'main'
AND COALESCE(b.closed_at, b.last_commit_at, b.created_at) <= ?
""",
(delete_cutoff,),
).fetchall()
for r in stale_rows:
ok = await _delete_branch_via_bot(
config=config, bot=bot,
slug=r["rfc_slug"], branch=r["branch_name"],
action_kind="delete_stale_branch",
reason="90d closed",
)
if ok:
counters["deleted_stale"] += 1
# ---- 30-day close: idle open branches ----
#
# §11.5: a branch with no associated PR auto-closes at 30 days from
# last commit. The query joins against cached_prs to exclude
# branches that have any open PR (those stay open) or any merged PR
# (those are handled by the post-merge timer below). Pinned branches
# (§12) skip the close.
idle_rows = db.conn().execute(
f"""
SELECT b.rfc_slug, b.branch_name
FROM cached_branches b
WHERE b.state = 'open'
AND b.pinned = 0
AND b.branch_name != 'main'
AND COALESCE(b.last_commit_at, b.created_at) <= ?
AND NOT EXISTS (
SELECT 1 FROM cached_prs p
WHERE p.rfc_slug = b.rfc_slug
AND p.head_branch = b.branch_name
AND p.state IN ('open', 'merged')
)
""",
(close_cutoff,),
).fetchall()
for r in idle_rows:
_close_branch(r["rfc_slug"], r["branch_name"], config.gitea_bot_user, reason="30d idle")
counters["closed_idle"] += 1
# ---- 30-day "settle": post-merge branches still flagged open ----
#
# §10.7: after merge, the branch enters a closed state per §12. In
# practice the cached_branches row may still read state='open'
# immediately after the merge (the meta-repo refresh doesn't flip
# it). At the 30-day mark the hygiene sweep formalizes the closure
# so the rest of the app reads "this branch is sealed." Pinned
# branches retain open state.
post_merge_close = db.conn().execute(
f"""
SELECT DISTINCT b.rfc_slug, b.branch_name
FROM cached_branches b
JOIN cached_prs p
ON p.rfc_slug = b.rfc_slug
AND p.head_branch = b.branch_name
WHERE b.state = 'open'
AND b.pinned = 0
AND p.state = 'merged'
AND COALESCE(p.merged_at, '') != ''
AND p.merged_at <= ?
""",
(close_cutoff,),
).fetchall()
for r in post_merge_close:
_close_branch(r["rfc_slug"], r["branch_name"], config.gitea_bot_user, reason="30d post-merge")
counters["closed_post_merge"] += 1
return counters
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _close_branch(slug: str, branch: str, bot_login: str, *, reason: str) -> None:
"""Flip the cached_branches row to state='closed' and write the
audit row. No Gitea call — the branch stays present in Gitea until
the 90-day mark. The `On-behalf-of` shape mirrors the bot's
`delete_branch` system path: actor_user_id=NULL, on_behalf_of=bot."""
db.conn().execute(
"""
UPDATE cached_branches
SET state = 'closed',
closed_at = COALESCE(closed_at, datetime('now'))
WHERE rfc_slug = ? AND branch_name = ? AND state = 'open'
""",
(slug, branch),
)
db.conn().execute(
"""
INSERT INTO actions
(actor_user_id, on_behalf_of, action_kind, rfc_slug, branch_name, details)
VALUES (NULL, ?, 'close_idle_branch', ?, ?, ?)
""",
(bot_login, slug, branch, _json_details({"reason": reason})),
)
async def _delete_branch_via_bot(
*,
config: Config,
bot: Bot,
slug: str,
branch: str,
action_kind: str,
reason: str,
) -> bool:
"""Call `bot.delete_branch` with the system actor. Resolves the
`(org, repo)` pair from the slug: super-draft edit branches and
graduation branches live on the meta repo; active-RFC branches
live on the per-RFC repo named by `cached_rfcs.repo`.
Returns True on a clean delete; False if the rfc row is missing
(we leave the branch row in place — a subsequent reconciler sweep
will reconcile or the operator can intervene)."""
rfc = db.conn().execute(
"SELECT state, repo FROM cached_rfcs WHERE slug = ?", (slug,)
).fetchone()
if rfc is None:
log.warning("hygiene: cannot delete %s/%s — slug missing from cache", slug, branch)
return False
if rfc["state"] == "super-draft":
owner, repo = config.gitea_org, config.meta_repo
elif rfc["state"] == "active" and rfc["repo"] and "/" in rfc["repo"]:
owner, repo = rfc["repo"].split("/", 1)
else:
log.warning("hygiene: cannot resolve repo for %s state=%s", slug, rfc["state"])
return False
try:
await bot.delete_branch(
None,
owner=owner,
repo=repo,
branch=branch,
slug=slug,
action_kind=action_kind,
reason=reason,
bot_login=config.gitea_bot_user,
)
except Exception:
log.exception("hygiene: bot.delete_branch failed for %s/%s", slug, branch)
return False
db.conn().execute(
"""
UPDATE cached_branches
SET state = 'deleted'
WHERE rfc_slug = ? AND branch_name = ?
""",
(slug, branch),
)
return True
def _json_details(payload: dict) -> str:
import json
return json.dumps(payload)
+4 -1
View File
@@ -14,7 +14,7 @@ from fastapi import APIRouter, FastAPI, HTTPException, Request
from fastapi.responses import RedirectResponse
from starlette.middleware.sessions import SessionMiddleware
from . import api as api_routes, auth, cache, db, digest, providers as providers_mod, webhooks
from . import api as api_routes, auth, cache, db, digest, hygiene, providers as providers_mod, webhooks
from .bot import Bot
from .config import load_config
from .gitea import Gitea
@@ -32,6 +32,7 @@ async def lifespan(app: FastAPI):
bot = Bot(gitea)
reconciler = cache.Reconciler(config, gitea)
digest_sched = digest.DigestScheduler()
hygiene_sched = hygiene.HygieneScheduler(config=config, bot=bot)
# §18 carryover: the multi-provider LLM abstraction. Provider
# construction can fail (missing key, wrong env value) — if it does,
@@ -55,10 +56,12 @@ async def lifespan(app: FastAPI):
reconciler.start()
digest_sched.start()
hygiene_sched.start()
log.info("RFC app started — meta repo %s/%s", config.gitea_org, config.meta_repo)
try:
yield
finally:
await hygiene_sched.stop()
await digest_sched.stop()
await reconciler.stop()
await gitea.close()