Files
Ben Stull 060fa408a2 Slice 7: §14 chrome + settings and admin neighborhoods
§14.1 richer landing, §14.2 /philosophy route (disk-backed), §14.3
persistent About link. /settings/notifications surfaces Slice 6's
preferences/quiet-hours/mute/watches endpoints. /admin home base
consolidates role management, the §6.2 write-mute, the audit-log
viewer, the permission-events log, and the §13.2 graduation queue.

Backend: backend/app/philosophy.py, backend/app/api_admin.py (seven
admin endpoints + user-search), GET /api/users/me/notification-mutes.
Frontend: Landing.jsx (deck), Philosophy.jsx, NotificationSettings.jsx,
Admin.jsx, App.jsx routing for the chrome surfaces.

Tests: backend/tests/test_chrome_vertical.py — 13 cases. Full suite
75/75 green.

Spec corrections: §14.2 (PHILOSOPHY.md source is a deployment-time
decision), §17 (admin block extended to name the seven new endpoints
+ user-search and notification-mutes read). §19.1 rewritten for
Slice 8 hardening; §19.2 grew four candidates (owner succession,
mute-from-actor, the "Following since <date>" disclosure, audit-log
row prose).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 23:40:49 -07:00

398 lines
15 KiB
Python

"""§17 admin endpoints + Slice 7 user search.
The admin's repertoire grew across the prior slices without earning a
centralized home: role grants (§6.1), the §6.2 app-wide write-mute, the
§6.5 / §5 audit logs, and the §13 graduation-readiness queue. Slice 7
consolidates them behind `/api/admin/*` so the chrome can hold them in
one tabbed surface (`Admin.jsx`).
The endpoints in this module:
- `GET /api/admin/users` — list users with role + mute
- `POST /api/admin/users/<id>/role` — set role per §6.1
- `POST /api/admin/users/<id>/mute` — set the §6.2 write-mute
- `GET /api/admin/audit` — paged `actions` log
- `GET /api/admin/permission-events` — paged `permission_events` log
- `GET /api/admin/graduation-queue` — super-drafts ready to graduate
- `GET /api/users/search?q=…` — typeahead for §15.8 mute add
Permission gates: every `/api/admin/*` endpoint requires
`require_admin` (owner or admin); the role-change endpoint additionally
refuses any non-owner caller granting `owner`, since owner-zero is the
only owner bootstrap path per §6.1 and a downgrade from owner needs the
sitting owner's hand. The user-search endpoint is open to any
authenticated viewer — it powers the §15.8 mute typeahead and contains
no privileged information beyond what every authenticated viewer
already sees in the catalog and chat-author surfaces.
"""
from __future__ import annotations
import json
from typing import Any
from fastapi import APIRouter, HTTPException, Query, Request
from pydantic import BaseModel, Field
from . import auth, db
from .config import Config
# ---------------------------------------------------------------------------
# Pydantic bodies
# ---------------------------------------------------------------------------
class RoleBody(BaseModel):
role: str = Field(pattern="^(owner|admin|contributor)$")
class MuteBody(BaseModel):
muted: bool
# ---------------------------------------------------------------------------
# Router
# ---------------------------------------------------------------------------
def make_router(config: Config) -> APIRouter:
del config # unused for now; reserved for future per-deployment knobs
router = APIRouter()
# ----- User listing -----
@router.get("/api/admin/users")
async def list_users(request: Request) -> dict[str, Any]:
auth.require_admin(request)
rows = db.conn().execute(
"""
SELECT id, gitea_login, display_name, email, role, muted,
created_at, last_seen_at
FROM users
ORDER BY role = 'owner' DESC, role = 'admin' DESC, display_name COLLATE NOCASE
"""
).fetchall()
return {
"items": [
{
"id": r["id"],
"gitea_login": r["gitea_login"],
"display_name": r["display_name"],
"email": r["email"] or "",
"role": r["role"],
"muted": bool(r["muted"]),
"created_at": r["created_at"],
"last_seen_at": r["last_seen_at"],
}
for r in rows
]
}
# ----- Role change (§6.1) -----
@router.post("/api/admin/users/{user_id}/role")
async def set_role(user_id: int, body: RoleBody, request: Request) -> dict[str, Any]:
viewer = auth.require_admin(request)
target = db.conn().execute(
"SELECT id, role, gitea_login FROM users WHERE id = ?", (user_id,)
).fetchone()
if target is None:
raise HTTPException(404, "User not found")
# §6.1: only an owner can grant `owner`, and only an owner can
# revoke an owner. Admins can flip contributor ↔ admin freely.
if body.role == "owner" and viewer.role != "owner":
raise HTTPException(403, "Only an owner can grant the owner role")
if target["role"] == "owner" and viewer.role != "owner":
raise HTTPException(403, "Only an owner can change an owner's role")
# An owner cannot demote themselves — that would orphan owner-zero
# if they are the only owner, and the role-change UI should not
# smuggle a "downgrade self" path through this endpoint.
if target["id"] == viewer.user_id and body.role != viewer.role:
raise HTTPException(403, "Use the explicit succession path to change your own role")
before = target["role"]
if before == body.role:
return {"ok": True, "role": body.role, "changed": False}
db.conn().execute(
"UPDATE users SET role = ? WHERE id = ?", (body.role, user_id),
)
db.conn().execute(
"""
INSERT INTO permission_events
(actor_user_id, subject_user_id, event_kind, details)
VALUES (?, ?, 'role_changed', ?)
""",
(
viewer.user_id,
user_id,
json.dumps({"before": before, "after": body.role}),
),
)
return {"ok": True, "role": body.role, "changed": True}
# ----- Write-mute (§6.2) -----
@router.post("/api/admin/users/{user_id}/mute")
async def set_mute(user_id: int, body: MuteBody, request: Request) -> dict[str, Any]:
viewer = auth.require_admin(request)
target = db.conn().execute(
"SELECT id, role, muted FROM users WHERE id = ?", (user_id,)
).fetchone()
if target is None:
raise HTTPException(404, "User not found")
if target["id"] == viewer.user_id:
raise HTTPException(422, "You cannot write-mute yourself")
# §6.2 + §6.1: admins/owners are not write-mutable. The write-mute
# is a contributor-only refusal — an admin's authority is the
# role-change channel, not a mute.
if target["role"] in ("owner", "admin"):
raise HTTPException(
403,
"Owners and admins cannot be write-muted — change the role instead",
)
before = bool(target["muted"])
after = bool(body.muted)
if before == after:
return {"ok": True, "muted": after, "changed": False}
db.conn().execute(
"UPDATE users SET muted = ? WHERE id = ?", (1 if after else 0, user_id),
)
db.conn().execute(
"""
INSERT INTO permission_events
(actor_user_id, subject_user_id, event_kind, details)
VALUES (?, ?, ?, ?)
""",
(
viewer.user_id,
user_id,
"muted" if after else "restored",
json.dumps({"before": before, "after": after}),
),
)
return {"ok": True, "muted": after, "changed": True}
# ----- Audit log (`actions` + `permission_events`) -----
@router.get("/api/admin/audit")
async def list_audit(
request: Request,
action_kind: str | None = None,
actor_user_id: int | None = None,
rfc_slug: str | None = None,
limit: int = Query(default=100, ge=1, le=500),
before_id: int | None = None,
) -> dict[str, Any]:
auth.require_admin(request)
clauses: list[str] = []
args: list[Any] = []
if action_kind:
clauses.append("a.action_kind = ?")
args.append(action_kind)
if actor_user_id is not None:
clauses.append("a.actor_user_id = ?")
args.append(actor_user_id)
if rfc_slug:
clauses.append("a.rfc_slug = ?")
args.append(rfc_slug)
if before_id is not None:
clauses.append("a.id < ?")
args.append(before_id)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
rows = db.conn().execute(
f"""
SELECT a.id, a.actor_user_id, a.on_behalf_of, a.action_kind,
a.rfc_slug, a.branch_name, a.pr_number, a.bot_commit_sha,
a.details, a.created_at,
u.gitea_login AS actor_login, u.display_name AS actor_display
FROM actions a
LEFT JOIN users u ON u.id = a.actor_user_id
{where}
ORDER BY a.id DESC
LIMIT ?
""",
(*args, limit),
).fetchall()
# The distinct-action-kinds list powers the filter chip in
# `Admin.jsx`; cheap to compute alongside the page since the
# action_kind set is bounded.
kinds = [
r["action_kind"]
for r in db.conn().execute(
"SELECT DISTINCT action_kind FROM actions ORDER BY action_kind"
)
]
return {
"items": [
{
"id": r["id"],
"action_kind": r["action_kind"],
"actor_user_id": r["actor_user_id"],
"actor_login": r["actor_login"],
"actor_display": r["actor_display"],
"on_behalf_of": r["on_behalf_of"],
"rfc_slug": r["rfc_slug"],
"branch_name": r["branch_name"],
"pr_number": r["pr_number"],
"bot_commit_sha": r["bot_commit_sha"],
"details": _safe_json(r["details"]),
"created_at": r["created_at"],
}
for r in rows
],
"action_kinds": kinds,
"has_more": len(rows) == limit,
}
@router.get("/api/admin/permission-events")
async def list_permission_events(
request: Request,
limit: int = Query(default=100, ge=1, le=500),
before_id: int | None = None,
) -> dict[str, Any]:
auth.require_admin(request)
clauses: list[str] = []
args: list[Any] = []
if before_id is not None:
clauses.append("p.id < ?")
args.append(before_id)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
rows = db.conn().execute(
f"""
SELECT p.id, p.event_kind, p.details, p.created_at,
p.actor_user_id, p.subject_user_id,
au.gitea_login AS actor_login, au.display_name AS actor_display,
su.gitea_login AS subject_login, su.display_name AS subject_display
FROM permission_events p
LEFT JOIN users au ON au.id = p.actor_user_id
LEFT JOIN users su ON su.id = p.subject_user_id
{where}
ORDER BY p.id DESC
LIMIT ?
""",
(*args, limit),
).fetchall()
return {
"items": [
{
"id": r["id"],
"event_kind": r["event_kind"],
"actor_login": r["actor_login"],
"actor_display": r["actor_display"],
"subject_login": r["subject_login"],
"subject_display": r["subject_display"],
"details": _safe_json(r["details"]),
"created_at": r["created_at"],
}
for r in rows
],
"has_more": len(rows) == limit,
}
# ----- Graduation-readiness queue (§13.2) -----
@router.get("/api/admin/graduation-queue")
async def graduation_queue(request: Request) -> dict[str, Any]:
auth.require_admin(request)
# §13 / §13.2: a super-draft is ready when (a) it has at least one
# owner claimed via §13.1 and (b) it has zero open meta_body_edit
# PRs. We compute both with one pass, returning the ready set and
# the not-yet-ready set so the admin sees what is gating each.
rows = db.conn().execute(
"""
SELECT slug, title, owners_json, arbiters_json, tags_json,
proposed_at, last_entry_commit_at
FROM cached_rfcs
WHERE state = 'super-draft'
ORDER BY COALESCE(last_entry_commit_at, proposed_at) DESC
"""
).fetchall()
items_ready: list[dict[str, Any]] = []
items_blocked: list[dict[str, Any]] = []
for r in rows:
owners = json.loads(r["owners_json"] or "[]")
blocking = db.conn().execute(
"""
SELECT COUNT(*) AS n FROM cached_prs
WHERE rfc_slug = ? AND state = 'open' AND pr_kind = 'meta_body_edit'
""",
(r["slug"],),
).fetchone()["n"]
payload = {
"slug": r["slug"],
"title": r["title"],
"owners": owners,
"tags": json.loads(r["tags_json"] or "[]"),
"proposed_at": r["proposed_at"],
"last_entry_commit_at": r["last_entry_commit_at"],
"blocking_prs": blocking,
"owners_set": len(owners) > 0,
}
if payload["owners_set"] and blocking == 0:
items_ready.append(payload)
else:
items_blocked.append(payload)
return {"ready": items_ready, "blocked": items_blocked}
# ----- User search (typeahead for §15.8 mute add) -----
@router.get("/api/users/search")
async def search_users(
request: Request, q: str = Query(default="", min_length=0, max_length=80),
) -> dict[str, Any]:
viewer = auth.require_user(request)
needle = q.strip().lower()
# An empty query returns recent users; a non-empty query matches
# against gitea_login and display_name with a prefix-first
# ranking so the typeahead surfaces obvious matches early.
if not needle:
rows = db.conn().execute(
"""
SELECT id, gitea_login, display_name, role
FROM users WHERE id != ?
ORDER BY last_seen_at DESC LIMIT 10
""",
(viewer.user_id,),
).fetchall()
else:
like = f"%{needle}%"
prefix = f"{needle}%"
rows = db.conn().execute(
"""
SELECT id, gitea_login, display_name, role
FROM users
WHERE id != ?
AND (LOWER(gitea_login) LIKE ? OR LOWER(display_name) LIKE ?)
ORDER BY
CASE WHEN LOWER(gitea_login) LIKE ? THEN 0 ELSE 1 END,
LOWER(gitea_login)
LIMIT 10
""",
(viewer.user_id, like, like, prefix),
).fetchall()
return {
"items": [
{
"id": r["id"],
"gitea_login": r["gitea_login"],
"display_name": r["display_name"],
"role": r["role"],
}
for r in rows
]
}
return router
def _safe_json(blob: str | None) -> Any:
if not blob:
return None
try:
return json.loads(blob)
except Exception:
return blob