a2bf89e90b
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
560 lines
24 KiB
Python
560 lines
24 KiB
Python
"""End-to-end integration test for the Slice 1 vertical.
|
|
|
|
Stands up the FastAPI app against a mocked Gitea transport that
|
|
simulates the meta repo and the propose-to-merge lifecycle. The test
|
|
walks the same path a user would: sign in (a forged session cookie
|
|
substitutes for the OAuth round-trip, since OAuth itself is not in
|
|
scope to mock end-to-end), open a propose modal
|
|
(POST /api/rfcs/propose), exercise the bot wrapper through to the
|
|
Gitea HTTP layer, merge the PR as an owner, refresh the cache, and
|
|
verify the super-draft surfaces in GET /api/rfcs and
|
|
GET /api/rfcs/<slug>.
|
|
|
|
The mocked Gitea is intentionally narrow — it only honors the
|
|
endpoints the slice actually exercises. Adding routes to it as later
|
|
slices land is the right shape: the test surface tracks the production
|
|
surface.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import re
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fake Gitea
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class FakeGitea:
|
|
"""A narrow in-memory simulation of the Gitea API the slices exercise.
|
|
|
|
Slice 2 extends the seam to cover per-RFC repos: PUT contents
|
|
(update file), POST orgs/{org}/repos (create repo), and branch
|
|
listing with commit timestamps. The simulator is intentionally
|
|
minimal — only the routes the production paths actually call.
|
|
"""
|
|
|
|
def __init__(self):
|
|
# files: (owner, repo, branch, path) -> {"content": str, "sha": str}
|
|
self.files: dict[tuple[str, str, str, str], dict] = {}
|
|
# branches: (owner, repo) -> {branch_name -> {"sha": str, "ts": str,
|
|
# "base_main_files": {path -> str}}}
|
|
self.branches: dict[tuple[str, str], dict[str, dict]] = {}
|
|
# pulls: (owner, repo) -> list[pull-dict]
|
|
self.pulls: dict[tuple[str, str], list[dict]] = {}
|
|
# repos: set of (owner, repo)
|
|
self.repos: set[tuple[str, str]] = set()
|
|
self._pr_counter = 0
|
|
self._commit_counter = 0
|
|
self._seed_repo("wiggleverse", "meta")
|
|
|
|
def _seed_repo(self, owner, repo):
|
|
self.branches[(owner, repo)] = {"main": {"sha": "initial", "ts": "2026-05-23T00:00:00Z"}}
|
|
self.pulls[(owner, repo)] = []
|
|
self.repos.add((owner, repo))
|
|
|
|
def seed_rfc_repo(self, owner, repo, *, rfc_md_body):
|
|
"""Convenience: seed a per-RFC repo with an RFC.md on main."""
|
|
self._seed_repo(owner, repo)
|
|
sha = self._next_sha()
|
|
self.files[(owner, repo, "main", "RFC.md")] = {"content": rfc_md_body, "sha": sha}
|
|
self.branches[(owner, repo)]["main"] = {"sha": sha, "ts": "2026-05-23T00:00:00Z"}
|
|
|
|
def _next_sha(self):
|
|
self._commit_counter += 1
|
|
return f"sha{self._commit_counter:04d}"
|
|
|
|
def _enrich_pr(self, owner: str, repo: str, pr: dict) -> dict:
|
|
"""Return the PR with mergeability fields filled in.
|
|
|
|
Gitea's PR responses carry `mergeable` and `merge_commit_sha`
|
|
plus the head sha; for the per-RFC repo paths in §10 we mirror
|
|
that shape.
|
|
"""
|
|
out = dict(pr)
|
|
head_branch = pr["head"]["ref"]
|
|
head_sha = (self.branches.get((owner, repo)) or {}).get(head_branch, {}).get("sha")
|
|
out["head"] = dict(pr["head"])
|
|
if head_sha:
|
|
out["head"]["sha"] = head_sha
|
|
out["mergeable"] = self._is_mergeable(owner, repo, pr) if pr["state"] == "open" else False
|
|
return out
|
|
|
|
def _is_mergeable(self, owner: str, repo: str, pr: dict) -> bool:
|
|
"""A PR is mergeable when the file content under main matches the
|
|
branch's snapshot of main at cut-time on every path the branch
|
|
either inherited or touched. This collapses to "no path on the
|
|
branch has diverged from main since cut" — sufficient for the
|
|
single-file RFC.md surface and the §10.9 conflict-replay test
|
|
path.
|
|
"""
|
|
head_branch = pr["head"]["ref"]
|
|
branch_data = self.branches.get((owner, repo), {}).get(head_branch, {})
|
|
base_snapshot: dict[str, str] = branch_data.get("base_main_files") or {}
|
|
# Touch every path the branch tracks plus every path on main, so a
|
|
# file deleted on main also surfaces.
|
|
paths = set(base_snapshot.keys())
|
|
for (o, r, br, p) in self.files.keys():
|
|
if (o, r, br) == (owner, repo, head_branch):
|
|
paths.add(p)
|
|
if (o, r, br) == (owner, repo, "main"):
|
|
paths.add(p)
|
|
for p in paths:
|
|
main_content = (self.files.get((owner, repo, "main", p)) or {}).get("content")
|
|
base_content = base_snapshot.get(p)
|
|
if main_content != base_content:
|
|
return False
|
|
return True
|
|
|
|
def handle(self, request: httpx.Request) -> httpx.Response:
|
|
path = request.url.path.replace("/api/v1", "", 1)
|
|
method = request.method
|
|
body = request.read().decode() if request.content else ""
|
|
payload = json.loads(body) if body else {}
|
|
|
|
# GET /repos/{owner}/{repo}
|
|
m_repo = re.fullmatch(r"/repos/([^/]+)/([^/]+)", path)
|
|
if method == "GET" and m_repo:
|
|
owner, repo = m_repo.groups()
|
|
if (owner, repo) in self.repos:
|
|
return httpx.Response(200, json={"name": repo, "full_name": f"{owner}/{repo}"})
|
|
return httpx.Response(404, json={"message": "not found"})
|
|
|
|
# POST /orgs/{org}/repos
|
|
m = re.fullmatch(r"/orgs/([^/]+)/repos", path)
|
|
if method == "POST" and m:
|
|
org = m.group(1)
|
|
name = payload["name"]
|
|
self._seed_repo(org, name)
|
|
return httpx.Response(201, json={"name": name, "full_name": f"{org}/{name}"})
|
|
|
|
# GET /repos/{owner}/{repo}/branches (list)
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/branches", path)
|
|
if method == "GET" and m:
|
|
owner, repo = m.groups()
|
|
items = []
|
|
for name, b in self.branches.get((owner, repo), {}).items():
|
|
items.append({"name": name, "commit": {"id": b["sha"], "timestamp": b.get("ts")}})
|
|
return httpx.Response(200, json=items)
|
|
|
|
# GET /repos/{owner}/{repo}/branches/{branch}
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/branches/([^/]+)", path)
|
|
if method == "GET" and m:
|
|
owner, repo, branch = m.groups()
|
|
b = self.branches.get((owner, repo), {}).get(branch)
|
|
if not b:
|
|
return httpx.Response(404, json={"message": "not found"})
|
|
return httpx.Response(200, json={"name": branch, "commit": {"id": b["sha"]}})
|
|
|
|
# POST /repos/{owner}/{repo}/branches
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/branches", path)
|
|
if method == "POST" and m:
|
|
owner, repo = m.groups()
|
|
new = payload["new_branch_name"]
|
|
old = payload["old_branch_name"]
|
|
old_sha = self.branches[(owner, repo)][old]["sha"]
|
|
# Snapshot the parent branch's files at cut time so we can
|
|
# surface §10.5 merge conflicts when main diverges later.
|
|
snapshot: dict[str, str] = {}
|
|
for (o, r, br, p), data in list(self.files.items()):
|
|
if (o, r, br) == (owner, repo, old):
|
|
self.files[(owner, repo, new, p)] = dict(data)
|
|
snapshot[p] = data["content"]
|
|
self.branches[(owner, repo)][new] = {
|
|
"sha": old_sha,
|
|
"ts": "2026-05-23T00:00:00Z",
|
|
"base_main_files": snapshot,
|
|
}
|
|
return httpx.Response(201, json={"name": new})
|
|
|
|
# GET /repos/{owner}/{repo}/contents/{path}?ref=...
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/contents/(.+)", path)
|
|
if method == "GET" and m:
|
|
owner, repo, fpath = m.groups()
|
|
ref = request.url.params.get("ref", "main")
|
|
key = (owner, repo, ref, fpath)
|
|
if key in self.files:
|
|
f = self.files[key]
|
|
return httpx.Response(200, json={
|
|
"name": fpath.rsplit("/", 1)[-1],
|
|
"path": fpath,
|
|
"type": "file",
|
|
"sha": f["sha"],
|
|
"content": base64.b64encode(f["content"].encode()).decode(),
|
|
})
|
|
# Directory listing
|
|
prefix = fpath.rstrip("/") + "/"
|
|
children = []
|
|
for (o, r, br, p), data in self.files.items():
|
|
if (o, r, br) == (owner, repo, ref) and p.startswith(prefix) and "/" not in p[len(prefix):]:
|
|
children.append({
|
|
"name": p.rsplit("/", 1)[-1],
|
|
"path": p,
|
|
"type": "file",
|
|
"sha": data["sha"],
|
|
})
|
|
if children:
|
|
return httpx.Response(200, json=children)
|
|
return httpx.Response(404, json={"message": "not found"})
|
|
|
|
# POST /repos/{owner}/{repo}/contents/{path}
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/contents/(.+)", path)
|
|
if method == "POST" and m:
|
|
owner, repo, fpath = m.groups()
|
|
branch = payload["branch"]
|
|
content = base64.b64decode(payload["content"]).decode()
|
|
sha = self._next_sha()
|
|
self.files[(owner, repo, branch, fpath)] = {"content": content, "sha": sha}
|
|
br = self.branches[(owner, repo)].setdefault(branch, {})
|
|
br["sha"] = sha
|
|
br["ts"] = "2026-05-23T00:00:00Z"
|
|
return httpx.Response(201, json={"commit": {"sha": sha}})
|
|
|
|
# PUT /repos/{owner}/{repo}/contents/{path} — update_file
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/contents/(.+)", path)
|
|
if method == "PUT" and m:
|
|
owner, repo, fpath = m.groups()
|
|
branch = payload["branch"]
|
|
content = base64.b64decode(payload["content"]).decode()
|
|
sha = self._next_sha()
|
|
self.files[(owner, repo, branch, fpath)] = {"content": content, "sha": sha}
|
|
br = self.branches[(owner, repo)].setdefault(branch, {})
|
|
br["sha"] = sha
|
|
br["ts"] = "2026-05-23T00:00:00Z"
|
|
return httpx.Response(200, json={"commit": {"sha": sha}, "content": {"sha": sha}})
|
|
|
|
# GET /repos/{owner}/{repo}/pulls?state=...
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/pulls", path)
|
|
if method == "GET" and m:
|
|
owner, repo = m.groups()
|
|
state = request.url.params.get("state", "open")
|
|
items = self.pulls.get((owner, repo), [])
|
|
filtered = [self._enrich_pr(owner, repo, p) for p in items if (state == "all") or (p["state"] == state)]
|
|
return httpx.Response(200, json=filtered)
|
|
|
|
# POST /repos/{owner}/{repo}/pulls
|
|
if method == "POST" and m:
|
|
owner, repo = m.groups()
|
|
self._pr_counter += 1
|
|
head_branch = payload["head"]
|
|
pr = {
|
|
"number": self._pr_counter,
|
|
"title": payload["title"],
|
|
"body": payload["body"],
|
|
"head": {"ref": head_branch, "sha": self.branches[(owner, repo)][head_branch]["sha"]},
|
|
"base": {"ref": payload["base"]},
|
|
"state": "open",
|
|
"merged": False,
|
|
"merged_at": None,
|
|
"closed_at": None,
|
|
"created_at": "2026-05-23T00:00:00Z",
|
|
"user": {"login": "rfc-bot"},
|
|
}
|
|
self.pulls[(owner, repo)].append(pr)
|
|
return httpx.Response(201, json=self._enrich_pr(owner, repo, pr))
|
|
|
|
# GET /repos/{owner}/{repo}/pulls/{number}
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/pulls/(\d+)", path)
|
|
if method == "GET" and m:
|
|
owner, repo, num = m.groups()
|
|
for pr in self.pulls.get((owner, repo), []):
|
|
if pr["number"] == int(num):
|
|
return httpx.Response(200, json=self._enrich_pr(owner, repo, pr))
|
|
return httpx.Response(404, json={"message": "not found"})
|
|
|
|
# POST /repos/{owner}/{repo}/pulls/{number}/merge
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/pulls/(\d+)/merge", path)
|
|
if method == "POST" and m:
|
|
owner, repo, num = m.groups()
|
|
for pr in self.pulls[(owner, repo)]:
|
|
if pr["number"] == int(num):
|
|
if pr["state"] != "open":
|
|
return httpx.Response(409, json={"message": "PR is not open"})
|
|
if not self._is_mergeable(owner, repo, pr):
|
|
return httpx.Response(409, json={"message": "merge conflict with main"})
|
|
head_branch = pr["head"]["ref"]
|
|
for (o, r, br, p), data in list(self.files.items()):
|
|
if (o, r, br) == (owner, repo, head_branch):
|
|
self.files[(owner, repo, "main", p)] = dict(data)
|
|
pr["state"] = "closed"
|
|
pr["merged"] = True
|
|
pr["merged_at"] = "2026-05-23T01:00:00Z"
|
|
pr["closed_at"] = "2026-05-23T01:00:00Z"
|
|
# Per §10.5: a no-fast-forward merge advances main
|
|
# via a new merge commit SHA, not by reusing the
|
|
# branch's tip. We mint a fresh sha to model that.
|
|
merge_sha = self._next_sha()
|
|
pr["merge_commit_sha"] = merge_sha
|
|
self.branches[(owner, repo)]["main"]["sha"] = merge_sha
|
|
self.branches[(owner, repo)]["main"]["ts"] = "2026-05-23T01:00:00Z"
|
|
return httpx.Response(200, json={"merged": True, "merge_commit_sha": merge_sha})
|
|
return httpx.Response(404, json={"message": "not found"})
|
|
|
|
# GET /repos/{owner}/{repo}/hooks
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/hooks", path)
|
|
if method == "GET" and m:
|
|
return httpx.Response(200, json=[])
|
|
|
|
# PATCH /repos/{owner}/{repo}/issues/{number} — Gitea close path.
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/issues/(\d+)", path)
|
|
if method == "PATCH" and m:
|
|
owner, repo, num = m.groups()
|
|
for pr in self.pulls.get((owner, repo), []):
|
|
if pr["number"] == int(num) and payload.get("state") == "closed":
|
|
pr["state"] = "closed"
|
|
pr["closed_at"] = "2026-05-23T02:00:00Z"
|
|
return httpx.Response(200, json={"state": "closed"})
|
|
return httpx.Response(200, json={})
|
|
|
|
# POST /repos/{owner}/{repo}/issues/{number}/comments
|
|
m = re.fullmatch(r"/repos/([^/]+)/([^/]+)/issues/(\d+)/comments", path)
|
|
if method == "POST" and m:
|
|
return httpx.Response(201, json={"id": 1, "body": payload.get("body", "")})
|
|
|
|
return httpx.Response(404, json={"message": f"unmocked {method} {path}"})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Session helpers — forge a SessionMiddleware cookie directly to skip OAuth.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _sign_session(session_data: dict, secret: str) -> str:
|
|
from itsdangerous import TimestampSigner
|
|
data = base64.b64encode(json.dumps(session_data).encode("utf-8"))
|
|
signer = TimestampSigner(secret)
|
|
return signer.sign(data).decode("utf-8")
|
|
|
|
|
|
def sign_in_as(client, *, user_id, gitea_login, display_name, role, email=""):
|
|
payload = {
|
|
"user": {
|
|
"user_id": user_id,
|
|
"gitea_id": user_id,
|
|
"gitea_login": gitea_login,
|
|
"display_name": display_name,
|
|
"email": email,
|
|
"avatar_url": "",
|
|
"role": role,
|
|
}
|
|
}
|
|
cookie = _sign_session(payload, os.environ["SECRET_KEY"])
|
|
client.cookies.set("rfc_session", cookie)
|
|
|
|
|
|
def provision_user_row(*, user_id: int, login: str, role: str) -> None:
|
|
from app import db
|
|
db.conn().execute(
|
|
"""
|
|
INSERT OR REPLACE INTO users (id, gitea_id, gitea_login, email, display_name, avatar_url, role)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(user_id, user_id, login, f"{login}@test", login.capitalize(), "", role),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_env(monkeypatch):
|
|
tmpdir = tempfile.mkdtemp(prefix="rfc-app-test-")
|
|
db_path = Path(tmpdir) / "test.db"
|
|
env = {
|
|
"GITEA_URL": "http://gitea.test",
|
|
"GITEA_BOT_USER": "rfc-bot",
|
|
"GITEA_BOT_TOKEN": "bot-token",
|
|
"GITEA_ORG": "wiggleverse",
|
|
"META_REPO": "meta",
|
|
"OAUTH_CLIENT_ID": "cid",
|
|
"OAUTH_CLIENT_SECRET": "csec",
|
|
"APP_URL": "http://localhost:8000",
|
|
"SECRET_KEY": "test-secret-key-for-cookies",
|
|
"DATABASE_PATH": str(db_path),
|
|
"OWNER_GITEA_LOGIN": "ben",
|
|
"GITEA_WEBHOOK_SECRET": "",
|
|
"ENABLED_MODELS": "claude",
|
|
}
|
|
for k, v in env.items():
|
|
monkeypatch.setenv(k, v)
|
|
yield env
|
|
|
|
|
|
@pytest.fixture
|
|
def app_with_fake_gitea(tmp_env, monkeypatch):
|
|
fake = FakeGitea()
|
|
real_client_cls = httpx.AsyncClient
|
|
|
|
def patched_client(*args, **kwargs):
|
|
kwargs["transport"] = httpx.MockTransport(fake.handle)
|
|
return real_client_cls(*args, **kwargs)
|
|
|
|
monkeypatch.setattr("app.gitea.httpx.AsyncClient", patched_client)
|
|
|
|
# The db module memoizes its connection — reset across tests so each
|
|
# test gets the tmpdir db its env points at, not a previous test's.
|
|
from app import db
|
|
if db._CONN is not None:
|
|
db._CONN.close()
|
|
db._CONN = None
|
|
|
|
from app.main import create_app
|
|
app = create_app()
|
|
return app, fake
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_propose_to_super_draft_vertical(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
|
|
with TestClient(app) as client:
|
|
# The catalog is empty before anything happens.
|
|
r = client.get("/api/rfcs")
|
|
assert r.status_code == 200
|
|
assert r.json()["items"] == []
|
|
|
|
# A contributor proposes a new RFC.
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
provision_user_row(user_id=1, login="ben", role="owner")
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor", email="alice@test")
|
|
r = client.post("/api/rfcs/propose", json={
|
|
"title": "Open Human Model",
|
|
"slug": "open-human-model",
|
|
"pitch": "A shared definition of what we mean by *human*.",
|
|
"tags": ["identity", "schema"],
|
|
})
|
|
assert r.status_code == 200, r.text
|
|
pr_number = r.json()["pr_number"]
|
|
assert r.json()["slug"] == "open-human-model"
|
|
|
|
# The proposal surfaces on the pending-ideas list.
|
|
r = client.get("/api/proposals")
|
|
items = r.json()["items"]
|
|
assert len(items) == 1
|
|
assert items[0]["slug"] == "open-human-model"
|
|
assert items[0]["pr_number"] == pr_number
|
|
|
|
# A contributor cannot merge.
|
|
r = client.post(f"/api/proposals/{pr_number}/merge")
|
|
assert r.status_code == 403
|
|
|
|
# Switch to the owner. The pending-idea view exposes the merge affordance.
|
|
sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner", email="ben@test")
|
|
r = client.get(f"/api/proposals/{pr_number}")
|
|
assert r.status_code == 200, r.text
|
|
proposal = r.json()
|
|
assert proposal["entry"]["title"] == "Open Human Model"
|
|
assert proposal["entry"]["state"] == "super-draft"
|
|
assert proposal["affordances"]["merge"] is True
|
|
|
|
# Owner merges. The catalog picks up the new super-draft.
|
|
r = client.post(f"/api/proposals/{pr_number}/merge")
|
|
assert r.status_code == 200, r.text
|
|
assert r.json()["slug"] == "open-human-model"
|
|
|
|
r = client.get("/api/rfcs")
|
|
items = r.json()["items"]
|
|
assert len(items) == 1
|
|
assert items[0]["slug"] == "open-human-model"
|
|
assert items[0]["state"] == "super-draft"
|
|
assert "identity" in items[0]["tags"]
|
|
|
|
# The super-draft view renders the body.
|
|
r = client.get("/api/rfcs/open-human-model")
|
|
assert r.status_code == 200
|
|
view = r.json()
|
|
assert view["state"] == "super-draft"
|
|
assert "shared definition" in view["body"]
|
|
|
|
# The pending-ideas list no longer carries the merged proposal.
|
|
r = client.get("/api/proposals")
|
|
assert r.json()["items"] == []
|
|
|
|
# The bot's actions are recorded in the audit log per §6.5.
|
|
actions = db.conn().execute(
|
|
"SELECT action_kind, on_behalf_of FROM actions ORDER BY id"
|
|
).fetchall()
|
|
kinds = [(a["action_kind"], a["on_behalf_of"]) for a in actions]
|
|
assert ("propose_rfc", "alice") in kinds
|
|
assert ("merge_proposal", "ben") in kinds
|
|
|
|
|
|
def test_slug_uniqueness_enforced(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=5, login="alice", role="contributor")
|
|
sign_in_as(client, user_id=5, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.post("/api/rfcs/propose", json={
|
|
"title": "First", "slug": "first", "pitch": "p", "tags": [],
|
|
})
|
|
assert r.status_code == 200, r.text
|
|
r = client.post("/api/rfcs/propose", json={
|
|
"title": "First Again", "slug": "first", "pitch": "p", "tags": [],
|
|
})
|
|
assert r.status_code == 409
|
|
|
|
|
|
def test_invalid_slug_rejected(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=7, login="alice", role="contributor")
|
|
sign_in_as(client, user_id=7, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.post("/api/rfcs/propose", json={
|
|
"title": "Bad slug", "slug": "Bad Slug!", "pitch": "p", "tags": [],
|
|
})
|
|
assert r.status_code == 422
|
|
|
|
|
|
def test_anonymous_cannot_propose(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
r = client.post("/api/rfcs/propose", json={
|
|
"title": "A", "slug": "a", "pitch": "p", "tags": [],
|
|
})
|
|
assert r.status_code == 401
|
|
|
|
|
|
def test_withdraw_by_proposer_works(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=9, login="alice", role="contributor")
|
|
provision_user_row(user_id=10, login="bob", role="contributor")
|
|
sign_in_as(client, user_id=9, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.post("/api/rfcs/propose", json={
|
|
"title": "X", "slug": "x", "pitch": "p", "tags": [],
|
|
})
|
|
pr_number = r.json()["pr_number"]
|
|
|
|
# A different contributor cannot withdraw someone else's proposal.
|
|
sign_in_as(client, user_id=10, gitea_login="bob", display_name="Bob", role="contributor")
|
|
r = client.post(f"/api/proposals/{pr_number}/withdraw")
|
|
assert r.status_code == 403
|
|
|
|
# The proposer can.
|
|
sign_in_as(client, user_id=9, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.post(f"/api/proposals/{pr_number}/withdraw")
|
|
assert r.status_code == 200, r.text
|
|
r = client.get("/api/proposals")
|
|
assert r.json()["items"] == []
|