"""§17 admin endpoints + Slice 7 user search. The admin's repertoire grew across the prior slices without earning a centralized home: role grants (§6.1), the §6.2 app-wide write-mute, the §6.5 / §5 audit logs, and the §13 graduation-readiness queue. Slice 7 consolidates them behind `/api/admin/*` so the chrome can hold them in one tabbed surface (`Admin.jsx`). The endpoints in this module: - `GET /api/admin/users` — list users with role + mute - `POST /api/admin/users//role` — set role per §6.1 - `POST /api/admin/users//mute` — set the §6.2 write-mute - `GET /api/admin/audit` — paged `actions` log - `GET /api/admin/permission-events` — paged `permission_events` log - `GET /api/admin/graduation-queue` — super-drafts ready to graduate - `GET /api/users/search?q=…` — typeahead for §15.8 mute add Permission gates: every `/api/admin/*` endpoint requires `require_admin` (owner or admin); the role-change endpoint additionally refuses any non-owner caller granting `owner`, since owner-zero is the only owner bootstrap path per §6.1 and a downgrade from owner needs the sitting owner's hand. The user-search endpoint is open to any authenticated viewer — it powers the §15.8 mute typeahead and contains no privileged information beyond what every authenticated viewer already sees in the catalog and chat-author surfaces. """ from __future__ import annotations import json from typing import Any from fastapi import APIRouter, HTTPException, Query, Request from pydantic import BaseModel, Field from . import auth, db from .config import Config # --------------------------------------------------------------------------- # Pydantic bodies # --------------------------------------------------------------------------- class RoleBody(BaseModel): role: str = Field(pattern="^(owner|admin|contributor)$") class MuteBody(BaseModel): muted: bool # --------------------------------------------------------------------------- # Router # --------------------------------------------------------------------------- def make_router(config: Config) -> APIRouter: del config # unused for now; reserved for future per-deployment knobs router = APIRouter() # ----- User listing ----- @router.get("/api/admin/users") async def list_users(request: Request) -> dict[str, Any]: auth.require_admin(request) rows = db.conn().execute( """ SELECT id, gitea_login, display_name, email, role, muted, created_at, last_seen_at FROM users ORDER BY role = 'owner' DESC, role = 'admin' DESC, display_name COLLATE NOCASE """ ).fetchall() return { "items": [ { "id": r["id"], "gitea_login": r["gitea_login"], "display_name": r["display_name"], "email": r["email"] or "", "role": r["role"], "muted": bool(r["muted"]), "created_at": r["created_at"], "last_seen_at": r["last_seen_at"], } for r in rows ] } # ----- Role change (§6.1) ----- @router.post("/api/admin/users/{user_id}/role") async def set_role(user_id: int, body: RoleBody, request: Request) -> dict[str, Any]: viewer = auth.require_admin(request) target = db.conn().execute( "SELECT id, role, gitea_login FROM users WHERE id = ?", (user_id,) ).fetchone() if target is None: raise HTTPException(404, "User not found") # §6.1: only an owner can grant `owner`, and only an owner can # revoke an owner. Admins can flip contributor ↔ admin freely. if body.role == "owner" and viewer.role != "owner": raise HTTPException(403, "Only an owner can grant the owner role") if target["role"] == "owner" and viewer.role != "owner": raise HTTPException(403, "Only an owner can change an owner's role") # An owner cannot demote themselves — that would orphan owner-zero # if they are the only owner, and the role-change UI should not # smuggle a "downgrade self" path through this endpoint. if target["id"] == viewer.user_id and body.role != viewer.role: raise HTTPException(403, "Use the explicit succession path to change your own role") before = target["role"] if before == body.role: return {"ok": True, "role": body.role, "changed": False} db.conn().execute( "UPDATE users SET role = ? WHERE id = ?", (body.role, user_id), ) db.conn().execute( """ INSERT INTO permission_events (actor_user_id, subject_user_id, event_kind, details) VALUES (?, ?, 'role_changed', ?) """, ( viewer.user_id, user_id, json.dumps({"before": before, "after": body.role}), ), ) return {"ok": True, "role": body.role, "changed": True} # ----- Write-mute (§6.2) ----- @router.post("/api/admin/users/{user_id}/mute") async def set_mute(user_id: int, body: MuteBody, request: Request) -> dict[str, Any]: viewer = auth.require_admin(request) target = db.conn().execute( "SELECT id, role, muted FROM users WHERE id = ?", (user_id,) ).fetchone() if target is None: raise HTTPException(404, "User not found") if target["id"] == viewer.user_id: raise HTTPException(422, "You cannot write-mute yourself") # §6.2 + §6.1: admins/owners are not write-mutable. The write-mute # is a contributor-only refusal — an admin's authority is the # role-change channel, not a mute. if target["role"] in ("owner", "admin"): raise HTTPException( 403, "Owners and admins cannot be write-muted — change the role instead", ) before = bool(target["muted"]) after = bool(body.muted) if before == after: return {"ok": True, "muted": after, "changed": False} db.conn().execute( "UPDATE users SET muted = ? WHERE id = ?", (1 if after else 0, user_id), ) db.conn().execute( """ INSERT INTO permission_events (actor_user_id, subject_user_id, event_kind, details) VALUES (?, ?, ?, ?) """, ( viewer.user_id, user_id, "muted" if after else "restored", json.dumps({"before": before, "after": after}), ), ) return {"ok": True, "muted": after, "changed": True} # ----- Audit log (`actions` + `permission_events`) ----- @router.get("/api/admin/audit") async def list_audit( request: Request, action_kind: str | None = None, actor_user_id: int | None = None, rfc_slug: str | None = None, limit: int = Query(default=100, ge=1, le=500), before_id: int | None = None, ) -> dict[str, Any]: auth.require_admin(request) clauses: list[str] = [] args: list[Any] = [] if action_kind: clauses.append("a.action_kind = ?") args.append(action_kind) if actor_user_id is not None: clauses.append("a.actor_user_id = ?") args.append(actor_user_id) if rfc_slug: clauses.append("a.rfc_slug = ?") args.append(rfc_slug) if before_id is not None: clauses.append("a.id < ?") args.append(before_id) where = ("WHERE " + " AND ".join(clauses)) if clauses else "" rows = db.conn().execute( f""" SELECT a.id, a.actor_user_id, a.on_behalf_of, a.action_kind, a.rfc_slug, a.branch_name, a.pr_number, a.bot_commit_sha, a.details, a.created_at, u.gitea_login AS actor_login, u.display_name AS actor_display FROM actions a LEFT JOIN users u ON u.id = a.actor_user_id {where} ORDER BY a.id DESC LIMIT ? """, (*args, limit), ).fetchall() # The distinct-action-kinds list powers the filter chip in # `Admin.jsx`; cheap to compute alongside the page since the # action_kind set is bounded. kinds = [ r["action_kind"] for r in db.conn().execute( "SELECT DISTINCT action_kind FROM actions ORDER BY action_kind" ) ] return { "items": [ { "id": r["id"], "action_kind": r["action_kind"], "actor_user_id": r["actor_user_id"], "actor_login": r["actor_login"], "actor_display": r["actor_display"], "on_behalf_of": r["on_behalf_of"], "rfc_slug": r["rfc_slug"], "branch_name": r["branch_name"], "pr_number": r["pr_number"], "bot_commit_sha": r["bot_commit_sha"], "details": _safe_json(r["details"]), "created_at": r["created_at"], } for r in rows ], "action_kinds": kinds, "has_more": len(rows) == limit, } @router.get("/api/admin/permission-events") async def list_permission_events( request: Request, limit: int = Query(default=100, ge=1, le=500), before_id: int | None = None, ) -> dict[str, Any]: auth.require_admin(request) clauses: list[str] = [] args: list[Any] = [] if before_id is not None: clauses.append("p.id < ?") args.append(before_id) where = ("WHERE " + " AND ".join(clauses)) if clauses else "" rows = db.conn().execute( f""" SELECT p.id, p.event_kind, p.details, p.created_at, p.actor_user_id, p.subject_user_id, au.gitea_login AS actor_login, au.display_name AS actor_display, su.gitea_login AS subject_login, su.display_name AS subject_display FROM permission_events p LEFT JOIN users au ON au.id = p.actor_user_id LEFT JOIN users su ON su.id = p.subject_user_id {where} ORDER BY p.id DESC LIMIT ? """, (*args, limit), ).fetchall() return { "items": [ { "id": r["id"], "event_kind": r["event_kind"], "actor_login": r["actor_login"], "actor_display": r["actor_display"], "subject_login": r["subject_login"], "subject_display": r["subject_display"], "details": _safe_json(r["details"]), "created_at": r["created_at"], } for r in rows ], "has_more": len(rows) == limit, } # ----- Graduation-readiness queue (§13.2) ----- @router.get("/api/admin/graduation-queue") async def graduation_queue(request: Request) -> dict[str, Any]: auth.require_admin(request) # §13 / §13.2: a super-draft is ready when (a) it has at least one # owner claimed via §13.1 and (b) it has zero open meta_body_edit # PRs. We compute both with one pass, returning the ready set and # the not-yet-ready set so the admin sees what is gating each. rows = db.conn().execute( """ SELECT slug, title, owners_json, arbiters_json, tags_json, proposed_at, last_entry_commit_at FROM cached_rfcs WHERE state = 'super-draft' ORDER BY COALESCE(last_entry_commit_at, proposed_at) DESC """ ).fetchall() items_ready: list[dict[str, Any]] = [] items_blocked: list[dict[str, Any]] = [] for r in rows: owners = json.loads(r["owners_json"] or "[]") blocking = db.conn().execute( """ SELECT COUNT(*) AS n FROM cached_prs WHERE rfc_slug = ? AND state = 'open' AND pr_kind = 'meta_body_edit' """, (r["slug"],), ).fetchone()["n"] payload = { "slug": r["slug"], "title": r["title"], "owners": owners, "tags": json.loads(r["tags_json"] or "[]"), "proposed_at": r["proposed_at"], "last_entry_commit_at": r["last_entry_commit_at"], "blocking_prs": blocking, "owners_set": len(owners) > 0, } if payload["owners_set"] and blocking == 0: items_ready.append(payload) else: items_blocked.append(payload) return {"ready": items_ready, "blocked": items_blocked} # ----- User search (typeahead for §15.8 mute add) ----- @router.get("/api/users/search") async def search_users( request: Request, q: str = Query(default="", min_length=0, max_length=80), ) -> dict[str, Any]: viewer = auth.require_user(request) needle = q.strip().lower() # An empty query returns recent users; a non-empty query matches # against gitea_login and display_name with a prefix-first # ranking so the typeahead surfaces obvious matches early. if not needle: rows = db.conn().execute( """ SELECT id, gitea_login, display_name, role FROM users WHERE id != ? ORDER BY last_seen_at DESC LIMIT 10 """, (viewer.user_id,), ).fetchall() else: like = f"%{needle}%" prefix = f"{needle}%" rows = db.conn().execute( """ SELECT id, gitea_login, display_name, role FROM users WHERE id != ? AND (LOWER(gitea_login) LIKE ? OR LOWER(display_name) LIKE ?) ORDER BY CASE WHEN LOWER(gitea_login) LIKE ? THEN 0 ELSE 1 END, LOWER(gitea_login) LIMIT 10 """, (viewer.user_id, like, like, prefix), ).fetchall() return { "items": [ { "id": r["id"], "gitea_login": r["gitea_login"], "display_name": r["display_name"], "role": r["role"], } for r in rows ] } return router def _safe_json(blob: str | None) -> Any: if not blob: return None try: return json.loads(blob) except Exception: return blob