Files
rfc-app/backend/app/api_notifications.py
Ben Stull 1a0c4428af Slice 8 WIP: §12 hygiene + §10.7 + routing + rollback cleanup
- Add §12 30/90 hygiene scheduler in hygiene.py, mirroring the
  DigestScheduler shape; wires next to digest in main.py with the
  same start/stop/run_tick test seam.
- Extend bot.delete_branch to accept actor=None for system gestures,
  per §15.9 (actor_user_id=NULL, on_behalf_of=bot_login).
- Convert every branches/{branch} route in api_branches.py and
  api_prs.py to {branch:path}; move the bare GET to the bottom of
  the router so deeper GETs match before greedy-path swallow.
- Extend api_prs.py's _require_pr to accept pr_kind='meta_metadata'
  so the §9.5 metadata-pane PRs land an in-app merge.
- Graduation rollback now deletes the graduate-<slug>-<6hex> branch
  after closing the PR — §19.2 candidate that lands here.
- Email-bounce webhook gains a WEBHOOK_EMAIL_BOUNCE_SECRET seam.
- FakeGitea grows a DELETE /branches/{branch:path} handler and a
  slashed-branch read; integration tests for the hygiene vertical
  cover the 30d close, 90d delete, post-merge delete, pinned
  exemption, per-user cursor preservation, no-notification rule,
  and the graduation-rollback cleanup.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 04:03:09 -07:00

468 lines
19 KiB
Python

"""§17 endpoints for the §15 notifications surface (Slice 6).
The endpoints in this module are:
- `GET /api/notifications` — §15.2 inbox listing
- `POST /api/notifications/<id>/read` — §15.2/§15.7
- `POST /api/notifications/read` — §15.2 mark by filter
- `GET /api/notifications/stream` — §15.3 SSE
- `GET /api/watches` — §15.6
- `POST /api/rfcs/<slug>/watch` — §15.6 explicit set
- `GET /api/users/me/notification-preferences` — §15.4 / §15.5
- `POST /api/users/me/notification-preferences` — set
- `GET /api/users/me/quiet-hours` — §15.8
- `POST /api/users/me/quiet-hours` — set / clear
- `POST /api/users/<id>/notification-mute` — §15.8
- `DELETE /api/users/<id>/notification-mute` — §15.8
- `GET /api/email/unsubscribe` — §15.4 one-click
- `POST /api/webhooks/email-bounce` — §15.4 receiver
The `branches/<branch>/chat-seen` advance lives in `api_branches` next
to the other branch-scoped endpoints; it calls into `notify.reconcile_seen_advance`
per §15.7. The `prs/<n>/seen` endpoint in `api_prs` does the same.
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any
from fastapi import APIRouter, HTTPException, Query, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from itsdangerous import BadSignature
from pydantic import BaseModel, Field
from . import auth, db, email as email_mod, notify
from .config import Config
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Pydantic bodies
# ---------------------------------------------------------------------------
class WatchBody(BaseModel):
state: str = Field(pattern="^(watching|following|muted)$")
class PreferencesBody(BaseModel):
email_personal_direct: bool | None = None
email_watched_structural: bool | None = None
email_admin_actionable: bool | None = None
digest_cadence: str | None = Field(default=None, pattern="^(off|weekly|daily)$")
class QuietHoursBody(BaseModel):
start: str | None = Field(default=None, pattern=r"^\d{2}:\d{2}$")
end: str | None = Field(default=None, pattern=r"^\d{2}:\d{2}$")
timezone: str | None = None
class MarkReadBody(BaseModel):
ids: list[int] | None = None
rfc_slug: str | None = None
category: str | None = None
actor_user_id: int | None = None
class BounceBody(BaseModel):
email: str = Field(min_length=3, max_length=320)
kind: str = Field(default="hard") # 'hard' or 'complaint'
# ---------------------------------------------------------------------------
# Router
# ---------------------------------------------------------------------------
def make_router(config: Config) -> APIRouter:
router = APIRouter()
# ----- Inbox listing + mark-read -----
@router.get("/api/notifications")
async def list_notifications(
request: Request,
unread: bool = False,
rfc_slug: str | None = None,
category: str | None = None,
actor_user_id: int | None = None,
bundled: bool = False,
) -> dict[str, Any]:
viewer = auth.require_user(request)
return notify.list_inbox(
user_id=viewer.user_id,
unread=unread,
rfc_slug=rfc_slug,
category=category,
actor_user_id=actor_user_id,
bundled=bundled,
)
@router.post("/api/notifications/{notif_id}/read")
async def mark_one_read(notif_id: int, request: Request) -> dict[str, Any]:
viewer = auth.require_user(request)
row = db.conn().execute(
"SELECT id FROM notifications WHERE id = ? AND recipient_user_id = ?",
(notif_id, viewer.user_id),
).fetchone()
if row is None:
raise HTTPException(404, "Notification not found")
db.conn().execute(
"UPDATE notifications SET read_at = datetime('now') WHERE id = ? AND read_at IS NULL",
(notif_id,),
)
# Push the read event so other tabs update their badge counts.
asyncio.create_task(notify._broadcast(viewer.user_id, "read", {"id": notif_id}))
return {"ok": True}
@router.post("/api/notifications/read")
async def mark_filtered_read(body: MarkReadBody, request: Request) -> dict[str, Any]:
viewer = auth.require_user(request)
marked = notify.mark_read_by_filter(
user_id=viewer.user_id,
rfc_slug=body.rfc_slug,
category=body.category,
actor_user_id=body.actor_user_id,
ids=body.ids,
)
return {"marked": marked}
@router.get("/api/notifications/stream")
async def stream_notifications(request: Request):
viewer = auth.require_user(request)
sub = await notify.subscribe(viewer.user_id)
async def event_stream():
# On open, send the current unread count as a snapshot so
# the badge initializes correctly without a second request.
count = db.conn().execute(
"SELECT COUNT(*) AS c FROM notifications WHERE recipient_user_id = ? AND read_at IS NULL",
(viewer.user_id,),
).fetchone()["c"]
yield _sse("snapshot", {"unread_count": count})
try:
while True:
if await request.is_disconnected():
break
try:
evt = await asyncio.wait_for(sub.queue.get(), timeout=15.0)
except asyncio.TimeoutError:
# Keep-alive comment line; clients ignore comments.
yield ": keep-alive\n\n"
continue
yield _sse(evt.get("event", "update"), evt.get("payload"))
finally:
await notify.unsubscribe(sub)
headers = {"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
return StreamingResponse(event_stream(), media_type="text/event-stream", headers=headers)
# ----- Watches -----
@router.get("/api/watches")
async def list_watches(request: Request) -> dict[str, Any]:
viewer = auth.require_user(request)
rows = db.conn().execute(
"""
SELECT w.id, w.rfc_slug, w.state, w.set_by, w.set_at, w.last_participation_at,
r.title AS rfc_title
FROM watches w
LEFT JOIN cached_rfcs r ON r.slug = w.rfc_slug
WHERE w.user_id = ?
ORDER BY w.set_at DESC
""",
(viewer.user_id,),
).fetchall()
return {
"items": [
{
"id": r["id"],
"rfc_slug": r["rfc_slug"],
"rfc_title": r["rfc_title"],
"state": r["state"],
"set_by": r["set_by"],
"set_at": r["set_at"],
"last_participation_at": r["last_participation_at"],
}
for r in rows
]
}
@router.post("/api/rfcs/{slug}/watch")
async def set_watch(slug: str, body: WatchBody, request: Request) -> dict[str, Any]:
viewer = auth.require_user(request)
rfc = db.conn().execute("SELECT slug FROM cached_rfcs WHERE slug = ?", (slug,)).fetchone()
if rfc is None:
raise HTTPException(404, "RFC not found")
db.conn().execute(
"""
INSERT INTO watches (user_id, rfc_slug, state, set_by, set_at, last_participation_at)
VALUES (?, ?, ?, 'explicit', datetime('now'), datetime('now'))
ON CONFLICT(user_id, rfc_slug) DO UPDATE SET
state = excluded.state,
set_by = 'explicit',
set_at = excluded.set_at
""",
(viewer.user_id, slug, body.state),
)
return {"ok": True, "state": body.state, "set_by": "explicit"}
# ----- Per-user notification preferences (§15.4 / §15.5) -----
@router.get("/api/users/me/notification-preferences")
async def get_prefs(request: Request) -> dict[str, Any]:
viewer = auth.require_user(request)
row = db.conn().execute(
"""
SELECT email_personal_direct, email_watched_structural,
email_admin_actionable, digest_cadence, email_opt_out_all
FROM users WHERE id = ?
""",
(viewer.user_id,),
).fetchone()
return {
"email_personal_direct": bool(row["email_personal_direct"]),
"email_watched_structural": bool(row["email_watched_structural"]),
"email_admin_actionable": bool(row["email_admin_actionable"]),
"email_watched_churn": False, # §15.4 permanently off
"email_opt_out_all": bool(row["email_opt_out_all"]),
"digest_cadence": row["digest_cadence"],
}
@router.post("/api/users/me/notification-preferences")
async def set_prefs(body: PreferencesBody, request: Request) -> dict[str, Any]:
viewer = auth.require_user(request)
sets = []
args: list[Any] = []
if body.email_personal_direct is not None:
sets.append("email_personal_direct = ?")
args.append(1 if body.email_personal_direct else 0)
if body.email_watched_structural is not None:
sets.append("email_watched_structural = ?")
args.append(1 if body.email_watched_structural else 0)
if body.email_admin_actionable is not None:
sets.append("email_admin_actionable = ?")
args.append(1 if body.email_admin_actionable else 0)
if body.digest_cadence is not None:
sets.append("digest_cadence = ?")
args.append(body.digest_cadence)
if not sets:
return {"ok": True}
args.append(viewer.user_id)
db.conn().execute(f"UPDATE users SET {', '.join(sets)} WHERE id = ?", args)
return {"ok": True}
# ----- Quiet hours (§15.8) -----
@router.get("/api/users/me/quiet-hours")
async def get_quiet_hours(request: Request) -> dict[str, Any]:
viewer = auth.require_user(request)
row = db.conn().execute(
"""
SELECT notification_quiet_hours_start AS start,
notification_quiet_hours_end AS end_,
notification_quiet_hours_timezone AS tz
FROM users WHERE id = ?
""",
(viewer.user_id,),
).fetchone()
return {"start": row["start"], "end": row["end_"], "timezone": row["tz"]}
@router.post("/api/users/me/quiet-hours")
async def set_quiet_hours(body: QuietHoursBody, request: Request) -> dict[str, Any]:
viewer = auth.require_user(request)
# Per §15.8: all three to set, all null to clear. Reject partials.
filled = [body.start, body.end, body.timezone]
if any(filled) and not all(filled):
raise HTTPException(422, "Set start, end, and timezone together, or clear all three")
db.conn().execute(
"""
UPDATE users
SET notification_quiet_hours_start = ?,
notification_quiet_hours_end = ?,
notification_quiet_hours_timezone = ?
WHERE id = ?
""",
(body.start, body.end, body.timezone, viewer.user_id),
)
return {"ok": True}
# ----- Per-user notification mute (§15.8) -----
@router.get("/api/users/me/notification-mutes")
async def list_user_mutes(request: Request) -> dict[str, Any]:
"""Slice 7: the §15.8 mute list the settings surface renders.
Joined against users so the settings UI can show @gitea_login and
display_name without a second round-trip.
"""
viewer = auth.require_user(request)
rows = db.conn().execute(
"""
SELECT m.muted_user_id, m.muted_at,
u.gitea_login, u.display_name
FROM notification_user_mutes m
JOIN users u ON u.id = m.muted_user_id
WHERE m.muter_user_id = ?
ORDER BY m.muted_at DESC
""",
(viewer.user_id,),
).fetchall()
return {
"items": [
{
"muted_user_id": r["muted_user_id"],
"gitea_login": r["gitea_login"],
"display_name": r["display_name"],
"muted_at": r["muted_at"],
}
for r in rows
]
}
@router.post("/api/users/{user_id}/notification-mute")
async def add_user_mute(user_id: int, request: Request) -> dict[str, Any]:
viewer = auth.require_user(request)
if user_id == viewer.user_id:
raise HTTPException(422, "You cannot mute yourself")
# Refusal per §15.8: admins/owners cannot mute notifications
# from anyone (they are exercising app-wide authority); arbiters
# cannot mute participants on RFCs where they hold authority.
if viewer.role in ("owner", "admin"):
raise HTTPException(
403,
"Admins and owners cannot mute notifications — the role requires receiving signals from everyone",
)
if _is_arbiter_with_overlap(viewer.user_id, user_id):
raise HTTPException(
403,
"You hold arbiter authority on an RFC where this user is active — muting is refused per §15.8",
)
target = db.conn().execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone()
if target is None:
raise HTTPException(404, "User not found")
db.conn().execute(
"""
INSERT OR IGNORE INTO notification_user_mutes (muter_user_id, muted_user_id)
VALUES (?, ?)
""",
(viewer.user_id, user_id),
)
return {"ok": True}
@router.delete("/api/users/{user_id}/notification-mute")
async def remove_user_mute(user_id: int, request: Request) -> dict[str, Any]:
viewer = auth.require_user(request)
db.conn().execute(
"DELETE FROM notification_user_mutes WHERE muter_user_id = ? AND muted_user_id = ?",
(viewer.user_id, user_id),
)
return {"ok": True}
# ----- Email: one-click unsubscribe + bounce webhook -----
@router.get("/api/email/unsubscribe")
async def email_unsubscribe(t: str = Query(..., description="Signed token from the email footer")) -> HTMLResponse:
try:
user_id, category = email_mod.verify_unsubscribe_token(t)
except BadSignature:
return HTMLResponse(
"<h1>Link expired or invalid</h1>"
"<p>Open the app to manage your notification preferences directly.</p>",
status_code=400,
)
column = {
"personal-direct": "email_personal_direct",
"structural": "email_watched_structural",
"admin-actionable": "email_admin_actionable",
}.get(category)
if column is None:
return HTMLResponse(
f"<h1>Unknown category</h1><p>{category}</p>", status_code=400
)
db.conn().execute(f"UPDATE users SET {column} = 0 WHERE id = ?", (user_id,))
return HTMLResponse(
f"<h1>Unsubscribed</h1><p>You will no longer receive {category} emails. "
f"You can re-enable them in your notification preferences.</p>"
)
@router.post("/api/webhooks/email-bounce")
async def email_bounce(body: BounceBody, request: Request) -> dict[str, Any]:
# §15.4: hard bounces and complaints flip the global opt-out.
# Per the §19.2 "email bounce webhook authentication" candidate
# Slice 8 settles: when `WEBHOOK_EMAIL_BOUNCE_SECRET` is set in
# env, the webhook requires the same value in the
# `X-Webhook-Secret` header. The shared-secret shape is the
# narrowest seam that covers the major providers — Sendgrid's
# `X-Twilio-Email-Event-Webhook-Signature`, SES via SNS topic
# signatures, Postmark's HTTP basic auth — without forcing a
# per-provider verifier today. When the operator wires a real
# SMTP provider they pick the equivalent shared-secret or
# rotate the value behind whichever signature scheme the
# provider supports. When the env var is unset the webhook
# stays unauthenticated for dev (the v1 contract).
import os as _os
expected = _os.environ.get("WEBHOOK_EMAIL_BOUNCE_SECRET", "").strip()
if expected:
received = request.headers.get("X-Webhook-Secret", "")
import hmac as _hmac
if not received or not _hmac.compare_digest(expected, received):
raise HTTPException(401, "Invalid webhook signature")
row = db.conn().execute(
"SELECT id FROM users WHERE LOWER(email) = LOWER(?)", (body.email,),
).fetchone()
if row is None:
return {"ok": True, "matched": False}
db.conn().execute(
"UPDATE users SET email_opt_out_all = 1 WHERE id = ?", (row["id"],),
)
log.info("email-bounce: opted out user %s (%s)", row["id"], body.kind)
return {"ok": True, "matched": True}
return router
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _sse(event: str, payload: Any) -> str:
return f"event: {event}\ndata: {json.dumps(payload)}\n\n"
def _is_arbiter_with_overlap(muter_user_id: int, muted_user_id: int) -> bool:
"""§15.8: an arbiter cannot mute notifications from a user who is
active on an RFC the arbiter has authority on. Active is defined
here as "has a watches row" — a low bar, but it's the cheapest
proxy for participation and the spec intends a generous refusal.
"""
muter_login_row = db.conn().execute(
"SELECT gitea_login FROM users WHERE id = ?", (muter_user_id,)
).fetchone()
muted_login_row = db.conn().execute(
"SELECT gitea_login FROM users WHERE id = ?", (muted_user_id,)
).fetchone()
if not muter_login_row or not muted_login_row:
return False
muter_login = muter_login_row["gitea_login"]
rfcs = db.conn().execute(
"SELECT slug, arbiters_json FROM cached_rfcs WHERE state = 'active'"
).fetchall()
for r in rfcs:
try:
arbiters = json.loads(r["arbiters_json"] or "[]")
except json.JSONDecodeError:
continue
if muter_login in arbiters:
other_active = db.conn().execute(
"SELECT 1 FROM watches WHERE user_id = ? AND rfc_slug = ?",
(muted_user_id, r["slug"]),
).fetchone()
if other_active:
return True
return False