"""End-to-end integration test for the Slice 1 vertical. Stands up the FastAPI app against a mocked Gitea transport that simulates the meta repo and the propose-to-merge lifecycle. The test walks the same path a user would: sign in (a forged session cookie substitutes for the OAuth round-trip, since OAuth itself is not in scope to mock end-to-end), open a propose modal (POST /api/rfcs/propose), exercise the bot wrapper through to the Gitea HTTP layer, merge the PR as an owner, refresh the cache, and verify the super-draft surfaces in GET /api/rfcs and GET /api/rfcs/. The mocked Gitea is intentionally narrow — it only honors the endpoints the slice actually exercises. Adding routes to it as later slices land is the right shape: the test surface tracks the production surface. """ from __future__ import annotations import base64 import json import os import re import tempfile from pathlib import Path import httpx import pytest # --------------------------------------------------------------------------- # Fake Gitea # --------------------------------------------------------------------------- class FakeGitea: """A narrow in-memory simulation of the Gitea API the slices exercise. Slice 2 extends the seam to cover per-RFC repos: PUT contents (update file), POST orgs/{org}/repos (create repo), and branch listing with commit timestamps. The simulator is intentionally minimal — only the routes the production paths actually call. """ def __init__(self): # files: (owner, repo, branch, path) -> {"content": str, "sha": str} self.files: dict[tuple[str, str, str, str], dict] = {} # branches: (owner, repo) -> {branch_name -> {"sha": str, "ts": str, # "base_main_files": {path -> str}}} self.branches: dict[tuple[str, str], dict[str, dict]] = {} # pulls: (owner, repo) -> list[pull-dict] self.pulls: dict[tuple[str, str], list[dict]] = {} # repos: set of (owner, repo) self.repos: set[tuple[str, str]] = set() self._pr_counter = 0 self._commit_counter = 0 self._seed_repo("wiggleverse", "meta") def _seed_repo(self, owner, repo): self.branches[(owner, repo)] = {"main": {"sha": "initial", "ts": "2026-05-23T00:00:00Z"}} self.pulls[(owner, repo)] = [] self.repos.add((owner, repo)) def seed_rfc_repo(self, owner, repo, *, rfc_md_body): """Convenience: seed a per-RFC repo with an RFC.md on main.""" self._seed_repo(owner, repo) sha = self._next_sha() self.files[(owner, repo, "main", "RFC.md")] = {"content": rfc_md_body, "sha": sha} self.branches[(owner, repo)]["main"] = {"sha": sha, "ts": "2026-05-23T00:00:00Z"} def _next_sha(self): self._commit_counter += 1 return f"sha{self._commit_counter:04d}" def _enrich_pr(self, owner: str, repo: str, pr: dict) -> dict: """Return the PR with mergeability fields filled in. Gitea's PR responses carry `mergeable` and `merge_commit_sha` plus the head sha; for the per-RFC repo paths in §10 we mirror that shape. """ out = dict(pr) head_branch = pr["head"]["ref"] head_sha = (self.branches.get((owner, repo)) or {}).get(head_branch, {}).get("sha") out["head"] = dict(pr["head"]) if head_sha: out["head"]["sha"] = head_sha out["mergeable"] = self._is_mergeable(owner, repo, pr) if pr["state"] == "open" else False return out def _is_mergeable(self, owner: str, repo: str, pr: dict) -> bool: """A PR is mergeable when the file content under main matches the branch's snapshot of main at cut-time on every path the branch either inherited or touched. This collapses to "no path on the branch has diverged from main since cut" — sufficient for the single-file RFC.md surface and the §10.9 conflict-replay test path. """ head_branch = pr["head"]["ref"] branch_data = self.branches.get((owner, repo), {}).get(head_branch, {}) base_snapshot: dict[str, str] = branch_data.get("base_main_files") or {} # Touch every path the branch tracks plus every path on main, so a # file deleted on main also surfaces. paths = set(base_snapshot.keys()) for (o, r, br, p) in self.files.keys(): if (o, r, br) == (owner, repo, head_branch): paths.add(p) if (o, r, br) == (owner, repo, "main"): paths.add(p) for p in paths: main_content = (self.files.get((owner, repo, "main", p)) or {}).get("content") base_content = base_snapshot.get(p) if main_content != base_content: return False return True def handle(self, request: httpx.Request) -> httpx.Response: path = request.url.path.replace("/api/v1", "", 1) method = request.method body = request.read().decode() if request.content else "" payload = json.loads(body) if body else {} # GET /repos/{owner}/{repo} m_repo = re.fullmatch(r"/repos/([^/]+)/([^/]+)", path) if method == "GET" and m_repo: owner, repo = m_repo.groups() if (owner, repo) in self.repos: return httpx.Response(200, json={"name": repo, "full_name": f"{owner}/{repo}"}) return httpx.Response(404, json={"message": "not found"}) # DELETE /repos/{owner}/{repo} — Slice 5 graduation rollback uses # this to undo step 1 (repo create). The FakeGitea drops every # file, branch, and PR tied to the repo so a subsequent retry # graduation can re-create the repo cleanly. if method == "DELETE" and m_repo: owner, repo = m_repo.groups() self.repos.discard((owner, repo)) self.branches.pop((owner, repo), None) self.pulls.pop((owner, repo), None) self.files = {k: v for k, v in self.files.items() if (k[0], k[1]) != (owner, repo)} return httpx.Response(204, json={}) # POST /orgs/{org}/repos m = re.fullmatch(r"/orgs/([^/]+)/repos", path) if method == "POST" and m: org = m.group(1) name = payload["name"] self._seed_repo(org, name) return httpx.Response(201, json={"name": name, "full_name": f"{org}/{name}"}) # GET /repos/{owner}/{repo}/branches (list) m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/branches", path) if method == "GET" and m: owner, repo = m.groups() items = [] for name, b in self.branches.get((owner, repo), {}).items(): items.append({"name": name, "commit": {"id": b["sha"], "timestamp": b.get("ts")}}) return httpx.Response(200, json=items) # GET /repos/{owner}/{repo}/branches/{branch} m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/branches/([^/]+)", path) if method == "GET" and m: owner, repo, branch = m.groups() b = self.branches.get((owner, repo), {}).get(branch) if not b: return httpx.Response(404, json={"message": "not found"}) return httpx.Response(200, json={"name": branch, "commit": {"id": b["sha"]}}) # POST /repos/{owner}/{repo}/branches m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/branches", path) if method == "POST" and m: owner, repo = m.groups() new = payload["new_branch_name"] old = payload["old_branch_name"] old_sha = self.branches[(owner, repo)][old]["sha"] # Snapshot the parent branch's files at cut time so we can # surface §10.5 merge conflicts when main diverges later. snapshot: dict[str, str] = {} for (o, r, br, p), data in list(self.files.items()): if (o, r, br) == (owner, repo, old): self.files[(owner, repo, new, p)] = dict(data) snapshot[p] = data["content"] self.branches[(owner, repo)][new] = { "sha": old_sha, "ts": "2026-05-23T00:00:00Z", "base_main_files": snapshot, } return httpx.Response(201, json={"name": new}) # GET /repos/{owner}/{repo}/contents/{path}?ref=... m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/contents/(.+)", path) if method == "GET" and m: owner, repo, fpath = m.groups() ref = request.url.params.get("ref", "main") key = (owner, repo, ref, fpath) if key in self.files: f = self.files[key] return httpx.Response(200, json={ "name": fpath.rsplit("/", 1)[-1], "path": fpath, "type": "file", "sha": f["sha"], "content": base64.b64encode(f["content"].encode()).decode(), }) # Directory listing prefix = fpath.rstrip("/") + "/" children = [] for (o, r, br, p), data in self.files.items(): if (o, r, br) == (owner, repo, ref) and p.startswith(prefix) and "/" not in p[len(prefix):]: children.append({ "name": p.rsplit("/", 1)[-1], "path": p, "type": "file", "sha": data["sha"], }) if children: return httpx.Response(200, json=children) return httpx.Response(404, json={"message": "not found"}) # POST /repos/{owner}/{repo}/contents/{path} m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/contents/(.+)", path) if method == "POST" and m: owner, repo, fpath = m.groups() branch = payload["branch"] content = base64.b64decode(payload["content"]).decode() sha = self._next_sha() self.files[(owner, repo, branch, fpath)] = {"content": content, "sha": sha} br = self.branches[(owner, repo)].setdefault(branch, {}) br["sha"] = sha br["ts"] = "2026-05-23T00:00:00Z" return httpx.Response(201, json={"commit": {"sha": sha}}) # PUT /repos/{owner}/{repo}/contents/{path} — update_file m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/contents/(.+)", path) if method == "PUT" and m: owner, repo, fpath = m.groups() branch = payload["branch"] content = base64.b64decode(payload["content"]).decode() sha = self._next_sha() self.files[(owner, repo, branch, fpath)] = {"content": content, "sha": sha} br = self.branches[(owner, repo)].setdefault(branch, {}) br["sha"] = sha br["ts"] = "2026-05-23T00:00:00Z" return httpx.Response(200, json={"commit": {"sha": sha}, "content": {"sha": sha}}) # GET /repos/{owner}/{repo}/pulls?state=... m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/pulls", path) if method == "GET" and m: owner, repo = m.groups() state = request.url.params.get("state", "open") items = self.pulls.get((owner, repo), []) filtered = [self._enrich_pr(owner, repo, p) for p in items if (state == "all") or (p["state"] == state)] return httpx.Response(200, json=filtered) # POST /repos/{owner}/{repo}/pulls if method == "POST" and m: owner, repo = m.groups() self._pr_counter += 1 head_branch = payload["head"] pr = { "number": self._pr_counter, "title": payload["title"], "body": payload["body"], "head": {"ref": head_branch, "sha": self.branches[(owner, repo)][head_branch]["sha"]}, "base": {"ref": payload["base"]}, "state": "open", "merged": False, "merged_at": None, "closed_at": None, "created_at": "2026-05-23T00:00:00Z", "user": {"login": "rfc-bot"}, } self.pulls[(owner, repo)].append(pr) return httpx.Response(201, json=self._enrich_pr(owner, repo, pr)) # GET /repos/{owner}/{repo}/pulls/{number} m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/pulls/(\d+)", path) if method == "GET" and m: owner, repo, num = m.groups() for pr in self.pulls.get((owner, repo), []): if pr["number"] == int(num): return httpx.Response(200, json=self._enrich_pr(owner, repo, pr)) return httpx.Response(404, json={"message": "not found"}) # POST /repos/{owner}/{repo}/pulls/{number}/merge m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/pulls/(\d+)/merge", path) if method == "POST" and m: owner, repo, num = m.groups() for pr in self.pulls[(owner, repo)]: if pr["number"] == int(num): if pr["state"] != "open": return httpx.Response(409, json={"message": "PR is not open"}) if not self._is_mergeable(owner, repo, pr): return httpx.Response(409, json={"message": "merge conflict with main"}) head_branch = pr["head"]["ref"] for (o, r, br, p), data in list(self.files.items()): if (o, r, br) == (owner, repo, head_branch): self.files[(owner, repo, "main", p)] = dict(data) pr["state"] = "closed" pr["merged"] = True pr["merged_at"] = "2026-05-23T01:00:00Z" pr["closed_at"] = "2026-05-23T01:00:00Z" # Per §10.5: a no-fast-forward merge advances main # via a new merge commit SHA, not by reusing the # branch's tip. We mint a fresh sha to model that. merge_sha = self._next_sha() pr["merge_commit_sha"] = merge_sha self.branches[(owner, repo)]["main"]["sha"] = merge_sha self.branches[(owner, repo)]["main"]["ts"] = "2026-05-23T01:00:00Z" return httpx.Response(200, json={"merged": True, "merge_commit_sha": merge_sha}) return httpx.Response(404, json={"message": "not found"}) # GET /repos/{owner}/{repo}/hooks m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/hooks", path) if method == "GET" and m: return httpx.Response(200, json=[]) # PATCH /repos/{owner}/{repo}/issues/{number} — Gitea close path. m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/issues/(\d+)", path) if method == "PATCH" and m: owner, repo, num = m.groups() for pr in self.pulls.get((owner, repo), []): if pr["number"] == int(num) and payload.get("state") == "closed": pr["state"] = "closed" pr["closed_at"] = "2026-05-23T02:00:00Z" return httpx.Response(200, json={"state": "closed"}) return httpx.Response(200, json={}) # POST /repos/{owner}/{repo}/issues/{number}/comments m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/issues/(\d+)/comments", path) if method == "POST" and m: return httpx.Response(201, json={"id": 1, "body": payload.get("body", "")}) return httpx.Response(404, json={"message": f"unmocked {method} {path}"}) # --------------------------------------------------------------------------- # Session helpers — forge a SessionMiddleware cookie directly to skip OAuth. # --------------------------------------------------------------------------- def _sign_session(session_data: dict, secret: str) -> str: from itsdangerous import TimestampSigner data = base64.b64encode(json.dumps(session_data).encode("utf-8")) signer = TimestampSigner(secret) return signer.sign(data).decode("utf-8") def sign_in_as(client, *, user_id, gitea_login, display_name, role, email=""): payload = { "user": { "user_id": user_id, "gitea_id": user_id, "gitea_login": gitea_login, "display_name": display_name, "email": email, "avatar_url": "", "role": role, } } cookie = _sign_session(payload, os.environ["SECRET_KEY"]) client.cookies.set("rfc_session", cookie) def provision_user_row(*, user_id: int, login: str, role: str) -> None: from app import db db.conn().execute( """ INSERT OR REPLACE INTO users (id, gitea_id, gitea_login, email, display_name, avatar_url, role) VALUES (?, ?, ?, ?, ?, ?, ?) """, (user_id, user_id, login, f"{login}@test", login.capitalize(), "", role), ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def tmp_env(monkeypatch): tmpdir = tempfile.mkdtemp(prefix="rfc-app-test-") db_path = Path(tmpdir) / "test.db" env = { "GITEA_URL": "http://gitea.test", "GITEA_BOT_USER": "rfc-bot", "GITEA_BOT_TOKEN": "bot-token", "GITEA_ORG": "wiggleverse", "META_REPO": "meta", "OAUTH_CLIENT_ID": "cid", "OAUTH_CLIENT_SECRET": "csec", "APP_URL": "http://localhost:8000", "SECRET_KEY": "test-secret-key-for-cookies", "DATABASE_PATH": str(db_path), "OWNER_GITEA_LOGIN": "ben", "GITEA_WEBHOOK_SECRET": "", "ENABLED_MODELS": "claude", } for k, v in env.items(): monkeypatch.setenv(k, v) yield env @pytest.fixture def app_with_fake_gitea(tmp_env, monkeypatch): fake = FakeGitea() real_client_cls = httpx.AsyncClient def patched_client(*args, **kwargs): kwargs["transport"] = httpx.MockTransport(fake.handle) return real_client_cls(*args, **kwargs) monkeypatch.setattr("app.gitea.httpx.AsyncClient", patched_client) # The db module memoizes its connection — reset across tests so each # test gets the tmpdir db its env points at, not a previous test's. from app import db if db._CONN is not None: db._CONN.close() db._CONN = None from app.main import create_app app = create_app() return app, fake # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- def test_propose_to_super_draft_vertical(app_with_fake_gitea): from fastapi.testclient import TestClient from app import db app, _fake = app_with_fake_gitea with TestClient(app) as client: # The catalog is empty before anything happens. r = client.get("/api/rfcs") assert r.status_code == 200 assert r.json()["items"] == [] # A contributor proposes a new RFC. provision_user_row(user_id=2, login="alice", role="contributor") provision_user_row(user_id=1, login="ben", role="owner") sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor", email="alice@test") r = client.post("/api/rfcs/propose", json={ "title": "Open Human Model", "slug": "open-human-model", "pitch": "A shared definition of what we mean by *human*.", "tags": ["identity", "schema"], }) assert r.status_code == 200, r.text pr_number = r.json()["pr_number"] assert r.json()["slug"] == "open-human-model" # The proposal surfaces on the pending-ideas list. r = client.get("/api/proposals") items = r.json()["items"] assert len(items) == 1 assert items[0]["slug"] == "open-human-model" assert items[0]["pr_number"] == pr_number # A contributor cannot merge. r = client.post(f"/api/proposals/{pr_number}/merge") assert r.status_code == 403 # Switch to the owner. The pending-idea view exposes the merge affordance. sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner", email="ben@test") r = client.get(f"/api/proposals/{pr_number}") assert r.status_code == 200, r.text proposal = r.json() assert proposal["entry"]["title"] == "Open Human Model" assert proposal["entry"]["state"] == "super-draft" assert proposal["affordances"]["merge"] is True # Owner merges. The catalog picks up the new super-draft. r = client.post(f"/api/proposals/{pr_number}/merge") assert r.status_code == 200, r.text assert r.json()["slug"] == "open-human-model" r = client.get("/api/rfcs") items = r.json()["items"] assert len(items) == 1 assert items[0]["slug"] == "open-human-model" assert items[0]["state"] == "super-draft" assert "identity" in items[0]["tags"] # The super-draft view renders the body. r = client.get("/api/rfcs/open-human-model") assert r.status_code == 200 view = r.json() assert view["state"] == "super-draft" assert "shared definition" in view["body"] # The pending-ideas list no longer carries the merged proposal. r = client.get("/api/proposals") assert r.json()["items"] == [] # The bot's actions are recorded in the audit log per §6.5. actions = db.conn().execute( "SELECT action_kind, on_behalf_of FROM actions ORDER BY id" ).fetchall() kinds = [(a["action_kind"], a["on_behalf_of"]) for a in actions] assert ("propose_rfc", "alice") in kinds assert ("merge_proposal", "ben") in kinds def test_slug_uniqueness_enforced(app_with_fake_gitea): from fastapi.testclient import TestClient app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=5, login="alice", role="contributor") sign_in_as(client, user_id=5, gitea_login="alice", display_name="Alice", role="contributor") r = client.post("/api/rfcs/propose", json={ "title": "First", "slug": "first", "pitch": "p", "tags": [], }) assert r.status_code == 200, r.text r = client.post("/api/rfcs/propose", json={ "title": "First Again", "slug": "first", "pitch": "p", "tags": [], }) assert r.status_code == 409 def test_invalid_slug_rejected(app_with_fake_gitea): from fastapi.testclient import TestClient app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=7, login="alice", role="contributor") sign_in_as(client, user_id=7, gitea_login="alice", display_name="Alice", role="contributor") r = client.post("/api/rfcs/propose", json={ "title": "Bad slug", "slug": "Bad Slug!", "pitch": "p", "tags": [], }) assert r.status_code == 422 def test_anonymous_cannot_propose(app_with_fake_gitea): from fastapi.testclient import TestClient app, _fake = app_with_fake_gitea with TestClient(app) as client: r = client.post("/api/rfcs/propose", json={ "title": "A", "slug": "a", "pitch": "p", "tags": [], }) assert r.status_code == 401 def test_withdraw_by_proposer_works(app_with_fake_gitea): from fastapi.testclient import TestClient app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=9, login="alice", role="contributor") provision_user_row(user_id=10, login="bob", role="contributor") sign_in_as(client, user_id=9, gitea_login="alice", display_name="Alice", role="contributor") r = client.post("/api/rfcs/propose", json={ "title": "X", "slug": "x", "pitch": "p", "tags": [], }) pr_number = r.json()["pr_number"] # A different contributor cannot withdraw someone else's proposal. sign_in_as(client, user_id=10, gitea_login="bob", display_name="Bob", role="contributor") r = client.post(f"/api/proposals/{pr_number}/withdraw") assert r.status_code == 403 # The proposer can. sign_in_as(client, user_id=9, gitea_login="alice", display_name="Alice", role="contributor") r = client.post(f"/api/proposals/{pr_number}/withdraw") assert r.status_code == 200, r.text r = client.get("/api/proposals") assert r.json()["items"] == []