"""The §4 metadata cache and its two writers. Per §4: Gitea is truth. The cache mirrors only what the left pane and the read surfaces need, and it is rebuildable from Gitea at any time. Per §4.1: two writers — the webhook handler and the periodic reconciler — both read from Gitea and write to the cache. User actions never write to the cache directly; they trigger Git operations through the bot (`bot.py`), and the resulting webhook (or the next reconciler sweep) is what updates the cache. This module provides: - `refresh_meta_repo()` — reads rfcs/ on the meta repo and reconciles cached_rfcs against what's there. Used by both the webhook handler (on meta-repo merge events) and the reconciler. - `refresh_meta_pulls()` — reads open meta-repo PRs and reconciles cached_prs for pr_kind='idea' and friends. Backs the §7.3 pending-ideas disclosure. Per §4.2's "single SQLite file colocated with the FastAPI process," the cache writes happen on the same process that serves reads; lock contention is bounded by the small mutation surface (a few hundred rows at most for v1) and SQLite's WAL mode. """ from __future__ import annotations import asyncio import json import logging from . import db, entry as entry_mod from .config import Config from .gitea import Gitea, GiteaError log = logging.getLogger(__name__) async def refresh_meta_repo(config: Config, gitea: Gitea) -> None: """Re-read rfcs/ on the meta repo and reconcile cached_rfcs. Idempotent. Safe to call on every meta-repo webhook and on every reconciler sweep. """ org, repo = config.gitea_org, config.meta_repo try: files = await gitea.list_dir(org, repo, "rfcs", ref="main") except GiteaError as e: log.warning("refresh_meta_repo: cannot list rfcs/: %s", e) return seen_slugs: set[str] = set() for f in files: if f.get("type") != "file" or not f.get("name", "").endswith(".md"): continue result = await gitea.read_file(org, repo, f["path"], ref="main") if not result: continue text, sha = result try: entry = entry_mod.parse(text) except Exception as parse_err: log.warning("refresh_meta_repo: skipping %s: %s", f["path"], parse_err) continue if not entry.slug: log.warning("refresh_meta_repo: skipping %s: missing slug", f["path"]) continue seen_slugs.add(entry.slug) _upsert_cached_rfc(entry, body_sha=sha) # Mark entries removed from the meta repo as withdrawn-without-trace. # In practice the spec keeps withdrawn entries in rfcs/ as historical # record (§3), so this branch fires only for entries deleted out of # band. We leave the row but flag it for reconciler attention. existing = {row["slug"] for row in db.conn().execute("SELECT slug FROM cached_rfcs")} for missing in existing - seen_slugs: log.info("refresh_meta_repo: %s no longer in rfcs/ — leaving cache row in place", missing) def _upsert_cached_rfc(entry: entry_mod.Entry, body_sha: str) -> None: db.conn().execute( """ INSERT INTO cached_rfcs (slug, title, state, rfc_id, repo, proposed_by, proposed_at, graduated_at, graduated_by, owners_json, arbiters_json, tags_json, body, body_sha, last_entry_commit_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) ON CONFLICT(slug) DO UPDATE SET title = excluded.title, state = excluded.state, rfc_id = excluded.rfc_id, repo = excluded.repo, proposed_by = excluded.proposed_by, proposed_at = excluded.proposed_at, graduated_at = excluded.graduated_at, graduated_by = excluded.graduated_by, owners_json = excluded.owners_json, arbiters_json = excluded.arbiters_json, tags_json = excluded.tags_json, body = excluded.body, body_sha = excluded.body_sha, last_entry_commit_at = datetime('now'), updated_at = datetime('now') """, ( entry.slug, entry.title, entry.state, entry.id, entry.repo, entry.proposed_by, entry.proposed_at, entry.graduated_at, entry.graduated_by, json.dumps(entry.owners), json.dumps(entry.arbiters), json.dumps(entry.tags), entry.body, body_sha, ), ) async def refresh_meta_pulls(config: Config, gitea: Gitea) -> None: """Reconcile open meta-repo PRs into cached_prs. For Slice 1 we care about pr_kind='idea' (proposing a new entry). Other meta-repo PR kinds (body edits, metadata edits, claims) will be wired in their respective slices. `opened_by` is the **underlying actor**, not the bot login Gitea reports — per §15.9's framing for notifications and per §6.5's On-behalf-of accountability shape. We recover the actor by joining against the `actions` audit log; if no row matches (cache rebuilt from scratch on a deployment that pre-dates the actions log, or a pull we did not author), we fall back to parsing the `On-behalf-of:` trailer from the PR body, then to the raw Gitea login as last resort. """ org, repo = config.gitea_org, config.meta_repo repo_full = f"{org}/{repo}" try: open_pulls = await gitea.list_pulls(org, repo, state="open") closed_pulls = await gitea.list_pulls(org, repo, state="closed") except GiteaError as e: log.warning("refresh_meta_pulls: %s", e) return bot_login = config.gitea_bot_user for pull in open_pulls + closed_pulls: head_branch = pull.get("head", {}).get("ref", "") slug = _slug_from_head_branch(head_branch) if slug is None: continue pr_kind = _kind_from_branch(head_branch) state = _state_from_pull(pull) gitea_opener = (pull.get("user") or {}).get("login") or "" opened_by = _resolve_actor( gitea_opener, bot_login, slug, pull["number"], pull.get("body") or "", ) db.conn().execute( """ INSERT INTO cached_prs (rfc_slug, pr_kind, repo, pr_number, title, description, state, opened_by, opened_at, merged_at, closed_at, head_branch, base_branch, head_sha) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(repo, pr_number) DO UPDATE SET title = excluded.title, description = excluded.description, state = excluded.state, opened_by = excluded.opened_by, merged_at = excluded.merged_at, closed_at = excluded.closed_at, head_sha = excluded.head_sha """, ( slug, pr_kind, repo_full, pull["number"], pull.get("title") or "", pull.get("body") or "", state, opened_by, pull.get("created_at"), pull.get("merged_at"), pull.get("closed_at"), head_branch, (pull.get("base") or {}).get("ref") or "main", (pull.get("head") or {}).get("sha"), ), ) _TRAILER_RE = None def _resolve_actor(gitea_opener: str, bot_login: str, slug: str, pr_number: int, body: str) -> str: """Best effort: collapse the bot's authorship to the underlying actor.""" if gitea_opener and gitea_opener != bot_login: return gitea_opener # Prefer the audit log. row = db.conn().execute( """ SELECT on_behalf_of FROM actions WHERE action_kind IN ('propose_rfc', 'open_body_edit_pr', 'open_claim_pr', 'open_metadata_pr') AND rfc_slug = ? AND pr_number = ? ORDER BY id LIMIT 1 """, (slug, pr_number), ).fetchone() if row and row["on_behalf_of"]: return row["on_behalf_of"] # Fall back to parsing the On-behalf-of trailer. import re as _re global _TRAILER_RE if _TRAILER_RE is None: _TRAILER_RE = _re.compile(r"On-behalf-of:\s+.*?<([^>]+)>", _re.MULTILINE) m = _TRAILER_RE.search(body) if m: return m.group(1) return gitea_opener or bot_login def _slug_from_head_branch(head_branch: str) -> str | None: if head_branch.startswith("propose/"): return head_branch[len("propose/") :] if head_branch.startswith("edit/"): parts = head_branch.split("/", 2) if len(parts) >= 2: return parts[1] if head_branch.startswith("claim/"): return head_branch[len("claim/") :] if head_branch.startswith("metadata/"): return head_branch[len("metadata/") :] return None def _kind_from_branch(head_branch: str) -> str: if head_branch.startswith("propose/"): return "idea" if head_branch.startswith("edit/"): return "meta_body_edit" if head_branch.startswith("claim/"): return "meta_claim" if head_branch.startswith("metadata/"): return "meta_metadata" return "idea" # fallback def _state_from_pull(pull: dict) -> str: if pull.get("merged"): return "merged" if pull.get("state") == "closed": return "closed" return "open" # ----- Reconciler ----- class Reconciler: """Per §4.1: periodic safety-net sweep. Runs in the background, every five minutes by default. Catches up on any webhook the bot missed (downtime, network failure, Gitea flake). If the cache is corrupted, the reconciler rebuilds from scratch — that's the contract. """ def __init__(self, config: Config, gitea: Gitea, interval_seconds: int = 300): self._config = config self._gitea = gitea self._interval = interval_seconds self._task: asyncio.Task | None = None self._stop = asyncio.Event() async def _loop(self) -> None: # One sweep at startup, then on the interval. The startup sweep # is what brings a fresh cache to life on first boot. await self.sweep() while not self._stop.is_set(): try: await asyncio.wait_for(self._stop.wait(), timeout=self._interval) except asyncio.TimeoutError: pass if self._stop.is_set(): break await self.sweep() async def sweep(self) -> None: log.info("reconciler: starting sweep") try: await refresh_meta_repo(self._config, self._gitea) await refresh_meta_pulls(self._config, self._gitea) except Exception: log.exception("reconciler: sweep failed") else: log.info("reconciler: sweep complete") def start(self) -> None: if self._task is None: self._task = asyncio.create_task(self._loop()) async def stop(self) -> None: self._stop.set() if self._task is not None: await self._task self._task = None