Files
Ben Stull f67d0aa0db Slice 6: notifications per §15
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 23:09:04 -07:00

949 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""§15: the notifications fan-out and the SSE broadcaster.
This module is the single chokepoint for the producer side of §15.
Every write that names an `rfc_slug` flows through `fan_out_from_action`
(called from `bot._log`) or `fan_out_chat_message` (called from the chat
write paths in `api_branches` / `api_prs`, since chat messages don't go
through the bot wrapper). The chokepoint:
- upserts a `watches` row for the actor per §15.6's auto-watch rule
(a substantive gesture sets `watching`, never downgrades, and bumps
`last_participation_at` for the 90-day decay);
- applies the §15.1 routing rules to determine which other users
receive a notification for the event;
- filters out muted recipients per §15.8 (per-user mute list);
- filters out `watches.state='muted'` per §15.6 (per-RFC mute);
- inserts one `notifications` row per recipient with the underlying
actor's user id per §15.9 (the bot never appears as actor);
- dispatches the row to the §15.4 email path if the recipient's
category toggle is on (held during quiet hours per §15.8, deferred
to the digest otherwise);
- publishes each new row to the per-user SSE subscriber queue so the
inbox refresh and header-badge count back the same stream per §15.3.
If you find yourself wanting to insert a row into `notifications`
directly from an endpoint, the spec is right and you are wrong: the
chokepoint is the invariant. Read paths can query the table from
anywhere; the producer side flows through here.
"""
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Iterable
from . import db
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# §15.1 routing — action_kind → (event_kind, category, recipient_set)
# ---------------------------------------------------------------------------
#
# Three recipient sets are used by the routing rules below:
#
# "watchers" — every user with a watches row on this slug whose state
# is 'watching' (full stream) or 'following' (structural
# only), minus the actor and minus 'muted' rows. The
# structural/full split is applied per-event below.
# "owners" — every user named in the entry's owners_json frontmatter
# on cached_rfcs (resolved to user_ids via gitea_login).
# "proposer" — the user whose action_kind=propose_rfc opened the slug,
# resolved via the most recent matching audit row.
#
# The category of each event maps to one of the §15.4 buckets:
#
# "personal-direct" — recipient is named subject of the action.
# "structural" — structural beats on watched RFCs.
# "churn" — per-commit / per-message activity. Never emails
# per §15.4; aggregated by the §15.5 digest.
CATEGORY_PERSONAL = "personal-direct"
CATEGORY_STRUCTURAL = "structural"
CATEGORY_CHURN = "churn"
# Action kinds whose actor's first interaction with a slug triggers
# auto-watch per §15.6. The substantive-gesture list in the spec is
# the source; we map onto our action_kind taxonomy. Reads, view advances,
# and own-action observations are intentionally excluded.
_AUTO_WATCH_ACTIONS = {
"propose_rfc",
"merge_proposal",
"decline_proposal",
"create_branch",
"accept_change",
"decline_change",
"manual_flush",
"open_branch_pr",
"merge_branch_pr",
"withdraw_branch_pr",
"supersede_branch_pr",
"open_metadata_pr",
"open_claim_pr",
"merge_claim_pr",
"create_resolution_branch",
"replay_change",
"graduate_start",
"graduate_complete",
"open_review_thread",
"resolve_thread",
"drop_flag",
"resolve_flag",
}
# ---------------------------------------------------------------------------
# SSE broadcaster — per-user subscriber registry
# ---------------------------------------------------------------------------
@dataclass
class _Subscriber:
user_id: int
queue: asyncio.Queue = field(default_factory=asyncio.Queue)
_subscribers: dict[int, list[_Subscriber]] = {}
_subscribers_lock: asyncio.Lock | None = None
def _get_lock() -> asyncio.Lock:
global _subscribers_lock
if _subscribers_lock is None:
_subscribers_lock = asyncio.Lock()
return _subscribers_lock
async def subscribe(user_id: int) -> _Subscriber:
"""Register a new SSE subscriber for a user. Each browser tab gets its
own subscriber; the per-user fan-out pushes the same event to every
subscriber for that user per §15 (multiple tabs see the same updates)."""
sub = _Subscriber(user_id=user_id)
async with _get_lock():
_subscribers.setdefault(user_id, []).append(sub)
return sub
async def unsubscribe(sub: _Subscriber) -> None:
async with _get_lock():
bucket = _subscribers.get(sub.user_id, [])
if sub in bucket:
bucket.remove(sub)
if not bucket:
_subscribers.pop(sub.user_id, None)
async def _broadcast(user_id: int, event: str, payload: Any) -> None:
"""Push an event to every subscriber for one user.
Best-effort; if the queue is unbounded a slow consumer just sees the
backlog when it next drains. We deliberately do not block writers on
consumer presence — the durable inbox row is the source of truth."""
async with _get_lock():
subs = list(_subscribers.get(user_id, []))
for sub in subs:
try:
sub.queue.put_nowait({"event": event, "payload": payload})
except Exception:
log.exception("notify broadcast: drop event for user %s", user_id)
# ---------------------------------------------------------------------------
# Producer-side entry points
# ---------------------------------------------------------------------------
def fan_out_from_action(
*,
actor_user_id: int | None,
action_kind: str,
rfc_slug: str | None,
branch_name: str | None,
pr_number: int | None,
details: dict | None,
) -> None:
"""Called from `bot._log` after every successful Git write.
Synchronous on purpose — runs inside the same request as the action,
so a failure here surfaces with the action rather than as a silent
drop. The SSE push is asyncio-scheduled if a loop is running; if
no loop is available (test harness, sync test client) the dispatch
falls back to writing the row and relying on the next subscriber
poll to surface it.
"""
if rfc_slug is None or action_kind not in _AUTO_WATCH_ACTIONS:
# Actions without an rfc_slug (e.g. permission events) or that
# aren't a §15.6 substantive gesture don't drive the chokepoint.
return
if actor_user_id is not None:
_bump_auto_watch(actor_user_id, rfc_slug)
rules = _ROUTING.get(action_kind)
if rules is None:
return
for rule in rules:
recipients = _resolve_recipients(
rule=rule,
actor_user_id=actor_user_id,
rfc_slug=rfc_slug,
details=details,
)
if not recipients:
continue
for recipient_id in recipients:
_emit_one(
recipient_user_id=recipient_id,
event_kind=rule["event_kind"],
category=rule["category"],
actor_user_id=actor_user_id,
rfc_slug=rfc_slug,
branch_name=branch_name,
pr_number=pr_number,
details=details or {},
)
def fan_out_chat_message(
*,
actor_user_id: int,
rfc_slug: str,
branch_name: str,
thread_id: int,
message_id: int,
is_review_thread: bool = False,
pr_number: int | None = None,
) -> None:
"""Called from chat write paths after a user message is persisted.
Chat messages don't flow through bot.py (no Git write); the chokepoint
here is the equivalent for §15's chat-message events. The routing
rule is: prior authors in the thread (besides the message author) get
a personal-direct `chat_reply_to_my_message`; users watching the RFC
(state='watching', i.e. full stream) get a churn-class
`chat_message_in_participated_thread`. The two are union'd so a user
who is both gets only the personal-direct row.
"""
_bump_auto_watch(actor_user_id, rfc_slug)
prior_authors = {
row["author_user_id"]
for row in db.conn().execute(
"""
SELECT DISTINCT author_user_id
FROM thread_messages
WHERE thread_id = ?
AND author_user_id IS NOT NULL
AND author_user_id != ?
AND id < ?
""",
(thread_id, actor_user_id, message_id),
)
}
# §15.6 / §15.8 filters apply here too — a muted recipient never sees
# a row regardless of which producer path generated it.
muted_rfc = _muted_user_ids_for_rfc(rfc_slug)
muters = _muters_of(actor_user_id)
prior_authors = (prior_authors - muted_rfc) - muters
# Personal-direct: prior thread authors.
for recipient_id in prior_authors:
event_kind = (
"pr_review_thread_reply" if is_review_thread else "chat_reply_to_my_message"
)
_emit_one(
recipient_user_id=recipient_id,
event_kind=event_kind,
category=CATEGORY_PERSONAL,
actor_user_id=actor_user_id,
rfc_slug=rfc_slug,
branch_name=branch_name,
pr_number=pr_number,
thread_id=thread_id,
details={"message_id": message_id},
)
# Churn: watchers of the RFC who weren't already covered.
watcher_ids = _watcher_user_ids(rfc_slug, scope="watching") - prior_authors
watcher_ids.discard(actor_user_id)
watcher_ids = (watcher_ids - muted_rfc) - muters
for recipient_id in watcher_ids:
event_kind = (
"pr_review_thread_new" if is_review_thread
else "chat_message_in_participated_thread"
)
_emit_one(
recipient_user_id=recipient_id,
event_kind=event_kind,
category=CATEGORY_CHURN,
actor_user_id=actor_user_id,
rfc_slug=rfc_slug,
branch_name=branch_name,
pr_number=pr_number,
thread_id=thread_id,
details={"message_id": message_id},
)
# ---------------------------------------------------------------------------
# Routing rules — per action_kind, a list of recipient×event×category
# ---------------------------------------------------------------------------
_ROUTING: dict[str, list[dict]] = {
# §9.1: propose surfaces the new slug. The proposer is the actor;
# the personal-direct beats fire on merge/decline, not on propose
# itself. Admins and owners get a structural ping so the
# pending-ideas queue isn't invisible.
"propose_rfc": [
{"event_kind": "proposal_opened_on_watched_topic",
"category": CATEGORY_STRUCTURAL, "recipients": "admins"},
],
# §9.3: merge fires personal-direct to the proposer per §15.4's
# named-subject rule. Watchers also see the structural beat.
"merge_proposal": [
{"event_kind": "proposal_merged",
"category": CATEGORY_PERSONAL, "recipients": "proposer"},
{"event_kind": "proposal_merged",
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
],
"decline_proposal": [
{"event_kind": "proposal_declined",
"category": CATEGORY_PERSONAL, "recipients": "proposer"},
],
# §10.1: PR opened. Watchers see it; the opener auto-watches but
# doesn't get an inbox row for their own gesture.
"open_branch_pr": [
{"event_kind": "pr_opened",
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
],
"open_metadata_pr": [
{"event_kind": "pr_opened",
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
],
"open_claim_pr": [
{"event_kind": "claim_opened",
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
],
# §10.5: merge fires personal-direct to the PR opener if someone
# else merged; watchers see the structural beat regardless.
"merge_branch_pr": [
{"event_kind": "pr_merged",
"category": CATEGORY_PERSONAL, "recipients": "pr_opener_if_other"},
{"event_kind": "pr_merged",
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
],
"withdraw_branch_pr": [
{"event_kind": "pr_withdrawn",
"category": CATEGORY_PERSONAL, "recipients": "pr_opener_if_other"},
{"event_kind": "pr_withdrawn",
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
],
"supersede_branch_pr": [
{"event_kind": "pr_withdrawn",
"category": CATEGORY_PERSONAL, "recipients": "pr_opener_if_other"},
],
# §8.6: accept_change is per-commit churn. Watchers (full stream)
# get the row; following-only watchers don't.
"accept_change": [
{"event_kind": "pr_commit_added",
"category": CATEGORY_CHURN, "recipients": "watchers_full"},
],
"manual_flush": [
{"event_kind": "pr_commit_added",
"category": CATEGORY_CHURN, "recipients": "watchers_full"},
],
"replay_change": [
{"event_kind": "pr_commit_added",
"category": CATEGORY_CHURN, "recipients": "watchers_full"},
],
# §13.3: graduation_complete is personal-direct to owners (the
# super-draft's owners list) and structural to watchers.
"graduate_complete": [
{"event_kind": "graduation_complete",
"category": CATEGORY_PERSONAL, "recipients": "owners"},
{"event_kind": "graduation_complete",
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
],
"graduate_start": [
{"event_kind": "super_draft_graduation_ready",
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
],
"create_resolution_branch": [
{"event_kind": "pr_conflict_with_main",
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
],
"create_branch": [
# No notification — branch creation is a private gesture until
# work lands on it. Auto-watch is the visible effect.
],
}
# ---------------------------------------------------------------------------
# Recipient resolvers
# ---------------------------------------------------------------------------
def _resolve_recipients(
*,
rule: dict,
actor_user_id: int | None,
rfc_slug: str,
details: dict | None,
) -> set[int]:
kind = rule["recipients"]
if kind == "watchers_structural":
# following (structural only) + watching (full stream) — both
# see structural events.
ids = _watcher_user_ids(rfc_slug, scope="any")
elif kind == "watchers_full":
ids = _watcher_user_ids(rfc_slug, scope="watching")
elif kind == "owners":
ids = _entry_owner_user_ids(rfc_slug)
elif kind == "admins":
ids = _admin_user_ids()
elif kind == "proposer":
ids = _proposer_user_id(rfc_slug)
elif kind == "pr_opener_if_other":
opener = _pr_opener_user_id(rfc_slug, details)
ids = {opener} if opener is not None and opener != actor_user_id else set()
else:
log.warning("notify: unknown recipient kind %s", kind)
return set()
if actor_user_id is not None:
ids.discard(actor_user_id)
# §15.6: per-RFC mute suppresses every signal for the (user, slug).
muted = _muted_user_ids_for_rfc(rfc_slug)
ids -= muted
# §15.8: per-user mute suppresses notifications produced by the actor
# for each muter. (Inbox-volume only; content visibility unchanged.)
if actor_user_id is not None:
muters = _muters_of(actor_user_id)
ids -= muters
return ids
def _watcher_user_ids(rfc_slug: str, *, scope: str) -> set[int]:
"""scope: 'watching' = full stream only; 'any' = watching+following."""
if scope == "watching":
rows = db.conn().execute(
"SELECT user_id FROM watches WHERE rfc_slug = ? AND state = 'watching'",
(rfc_slug,),
)
else:
rows = db.conn().execute(
"SELECT user_id FROM watches WHERE rfc_slug = ? AND state IN ('watching', 'following')",
(rfc_slug,),
)
return {r["user_id"] for r in rows}
def _entry_owner_user_ids(rfc_slug: str) -> set[int]:
row = db.conn().execute(
"SELECT owners_json FROM cached_rfcs WHERE slug = ?", (rfc_slug,),
).fetchone()
if row is None:
return set()
try:
owners = json.loads(row["owners_json"] or "[]")
except json.JSONDecodeError:
return set()
if not owners:
return set()
placeholders = ",".join("?" * len(owners))
user_rows = db.conn().execute(
f"SELECT id FROM users WHERE gitea_login IN ({placeholders})",
tuple(owners),
)
return {r["id"] for r in user_rows}
def _admin_user_ids() -> set[int]:
return {
r["id"]
for r in db.conn().execute("SELECT id FROM users WHERE role IN ('owner', 'admin')")
}
def _proposer_user_id(rfc_slug: str) -> set[int]:
row = db.conn().execute(
"""
SELECT actor_user_id FROM actions
WHERE action_kind = 'propose_rfc' AND rfc_slug = ?
ORDER BY id LIMIT 1
""",
(rfc_slug,),
).fetchone()
if row is None or row["actor_user_id"] is None:
return set()
return {row["actor_user_id"]}
def _pr_opener_user_id(rfc_slug: str, details: dict | None) -> int | None:
"""For pr_opener_if_other recipients, find the user who opened the PR.
Uses cached_prs.opened_by (gitea_login) → users.id, falling back to
the audit log if the cache row isn't there yet.
"""
pr_number = (details or {}).get("pr_number")
# When called from merge/withdraw routing, details is the original
# action's details — pr_number is on the audit row, not here. Pull
# it from the most-recent relevant action.
row = db.conn().execute(
"""
SELECT actor_user_id FROM actions
WHERE action_kind = 'open_branch_pr' AND rfc_slug = ?
ORDER BY id DESC LIMIT 1
""",
(rfc_slug,),
).fetchone()
if row and row["actor_user_id"] is not None:
return row["actor_user_id"]
return pr_number # last-ditch: keep typing consistent (rarely matters)
def _muted_user_ids_for_rfc(rfc_slug: str) -> set[int]:
return {
r["user_id"]
for r in db.conn().execute(
"SELECT user_id FROM watches WHERE rfc_slug = ? AND state = 'muted'",
(rfc_slug,),
)
}
def _muters_of(actor_user_id: int) -> set[int]:
return {
r["muter_user_id"]
for r in db.conn().execute(
"SELECT muter_user_id FROM notification_user_mutes WHERE muted_user_id = ?",
(actor_user_id,),
)
}
# ---------------------------------------------------------------------------
# Auto-watch upsert
# ---------------------------------------------------------------------------
def _bump_auto_watch(user_id: int, rfc_slug: str) -> None:
"""Per §15.6 substantive-gesture rule: upsert a `watching` row for
the actor, never downgrade. Bumps `last_participation_at` for the
90-day decay timer regardless of current state.
The role-implicit watching for owners/arbiters is computed at fan-out
time (recipient resolution), not stored on the row, so this upsert
doesn't need to special-case them.
"""
existing = db.conn().execute(
"SELECT state, set_by FROM watches WHERE user_id = ? AND rfc_slug = ?",
(user_id, rfc_slug),
).fetchone()
now = "datetime('now')"
if existing is None:
db.conn().execute(
f"""
INSERT INTO watches
(user_id, rfc_slug, state, set_by, set_at, last_participation_at)
VALUES (?, ?, 'watching', 'auto', {now}, {now})
""",
(user_id, rfc_slug),
)
return
# Don't downgrade: a 'muted' explicit setting stays muted; a 'watching'
# row stays watching. The only auto-upgrade is following → watching.
if existing["state"] == "following" and existing["set_by"] != "explicit":
db.conn().execute(
f"UPDATE watches SET state = 'watching', last_participation_at = {now} WHERE user_id = ? AND rfc_slug = ?",
(user_id, rfc_slug),
)
else:
db.conn().execute(
f"UPDATE watches SET last_participation_at = {now} WHERE user_id = ? AND rfc_slug = ?",
(user_id, rfc_slug),
)
# ---------------------------------------------------------------------------
# Inserting a notification row + dispatch
# ---------------------------------------------------------------------------
def _emit_one(
*,
recipient_user_id: int,
event_kind: str,
category: str,
actor_user_id: int | None,
rfc_slug: str | None,
branch_name: str | None,
pr_number: int | None,
details: dict,
thread_id: int | None = None,
) -> int:
payload = {"category": category, **details}
cur = db.conn().execute(
"""
INSERT INTO notifications
(recipient_user_id, event_kind, rfc_slug, branch_name, pr_number,
thread_id, actor_user_id, payload)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
recipient_user_id,
event_kind,
rfc_slug,
branch_name,
pr_number,
thread_id,
actor_user_id,
json.dumps(payload),
),
)
notif_id = cur.lastrowid
row_payload = _row_payload(notif_id)
_schedule_broadcast(recipient_user_id, "notification", row_payload)
# Email dispatch: §15.4. Held during quiet hours per §15.8; held
# during a global opt-out (bounce) per §15.4. Defers to the digest
# otherwise per §15.5's exclusion rules.
_schedule_email(notif_id, recipient_user_id, event_kind, category, row_payload)
return notif_id
def _row_payload(notif_id: int) -> dict:
row = db.conn().execute(
"""
SELECT n.id, n.event_kind, n.rfc_slug, n.branch_name, n.pr_number,
n.thread_id, n.actor_user_id, n.payload, n.created_at, n.read_at,
u.display_name AS actor_display, u.gitea_login AS actor_login,
r.title AS rfc_title
FROM notifications n
LEFT JOIN users u ON u.id = n.actor_user_id
LEFT JOIN cached_rfcs r ON r.slug = n.rfc_slug
WHERE n.id = ?
""",
(notif_id,),
).fetchone()
if row is None:
return {}
try:
extras = json.loads(row["payload"] or "{}")
except json.JSONDecodeError:
extras = {}
return {
"id": row["id"],
"event_kind": row["event_kind"],
"rfc_slug": row["rfc_slug"],
"rfc_title": row["rfc_title"],
"branch_name": row["branch_name"],
"pr_number": row["pr_number"],
"thread_id": row["thread_id"],
"actor_user_id": row["actor_user_id"],
"actor_login": row["actor_login"],
"actor_display": row["actor_display"],
"created_at": row["created_at"],
"read_at": row["read_at"],
"category": extras.get("category"),
"summary": render_summary(row["event_kind"], row["actor_display"], row["rfc_title"], extras),
"extras": extras,
}
def render_summary(event_kind: str, actor_display: str | None, rfc_title: str | None, extras: dict) -> str:
"""One short sentence shared by the inbox row and the email body
per §15.4. Actor is the underlying user per §15.9; null actor renders
as "the app" so a system-generated event still reads as a sentence."""
actor = actor_display or "the app"
title = rfc_title or extras.get("slug") or "this RFC"
if event_kind == "proposal_merged":
return f"{actor} merged your proposal — {title} is now a super-draft."
if event_kind == "proposal_declined":
return f"{actor} declined your proposal for {title}."
if event_kind == "proposal_opened_on_watched_topic":
return f"{actor} proposed a new RFC: {title}."
if event_kind == "pr_opened":
return f"{actor} opened a PR on {title}."
if event_kind == "pr_merged":
return f"{actor} merged a PR on {title}."
if event_kind == "pr_withdrawn":
return f"{actor} withdrew a PR on {title}."
if event_kind == "pr_commit_added":
return f"{actor} added a commit on {title}."
if event_kind == "pr_review_thread_new":
return f"{actor} opened a review thread on {title}."
if event_kind == "pr_review_thread_reply":
return f"{actor} replied to your review thread on {title}."
if event_kind == "chat_message_in_participated_thread":
return f"{actor} posted a chat message on {title}."
if event_kind == "chat_reply_to_my_message":
return f"{actor} replied to your message on {title}."
if event_kind == "claim_opened":
return f"{actor} opened a claim PR on {title}."
if event_kind == "graduation_complete":
return f"{title} graduated to an active RFC."
if event_kind == "super_draft_graduation_ready":
return f"{actor} began graduating {title}."
if event_kind == "pr_conflict_with_main":
return f"{actor} started a resolution branch on {title}."
return f"{event_kind} on {title}"
def _schedule_broadcast(user_id: int, event: str, payload: Any) -> None:
"""Best-effort SSE push; survives the absence of a running loop so
sync test clients still write the row."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
loop.create_task(_broadcast(user_id, event, payload))
def _schedule_email(
notif_id: int,
recipient_user_id: int,
event_kind: str,
category: str,
payload: dict,
) -> None:
"""§15.4 dispatch — synchronous so the email_sent_at timestamp lands
in the same tick as the notification row, which is what the §15.5
digest's exclusion rule 1 keys on. The actual SMTP send happens in
`email.py`; this just consults the recipient's preferences and quiet
hours, then calls into the adapter."""
from . import email as email_mod # avoid import cycle
email_mod.maybe_send(
notif_id=notif_id,
recipient_user_id=recipient_user_id,
event_kind=event_kind,
category=category,
payload=payload,
)
# ---------------------------------------------------------------------------
# Per-user mute and watch helpers used by api_notifications
# ---------------------------------------------------------------------------
def list_inbox(
*,
user_id: int,
unread: bool | None = None,
rfc_slug: str | None = None,
category: str | None = None,
actor_user_id: int | None = None,
bundled: bool = False,
limit: int = 200,
) -> dict:
"""§15.2: inbox query. Filter chips are AND-combined.
When `bundled=True` is set, rows are grouped server-side by
(rfc_slug, event_kind) per the §15.2 bundle toggle, with the most
recent timestamp on each bundle.
"""
where = ["recipient_user_id = ?"]
args: list[Any] = [user_id]
if unread:
where.append("read_at IS NULL")
if rfc_slug:
where.append("rfc_slug = ?")
args.append(rfc_slug)
if actor_user_id is not None:
where.append("actor_user_id = ?")
args.append(actor_user_id)
rows = db.conn().execute(
f"""
SELECT n.id, n.event_kind, n.rfc_slug, n.branch_name, n.pr_number,
n.thread_id, n.actor_user_id, n.payload, n.created_at, n.read_at,
u.display_name AS actor_display, u.gitea_login AS actor_login,
r.title AS rfc_title
FROM notifications n
LEFT JOIN users u ON u.id = n.actor_user_id
LEFT JOIN cached_rfcs r ON r.slug = n.rfc_slug
WHERE {' AND '.join(where)}
ORDER BY n.id DESC
LIMIT ?
""",
(*args, limit),
).fetchall()
items = []
for row in rows:
try:
extras = json.loads(row["payload"] or "{}")
except json.JSONDecodeError:
extras = {}
if category and extras.get("category") != category:
continue
items.append({
"id": row["id"],
"event_kind": row["event_kind"],
"rfc_slug": row["rfc_slug"],
"rfc_title": row["rfc_title"],
"branch_name": row["branch_name"],
"pr_number": row["pr_number"],
"thread_id": row["thread_id"],
"actor_user_id": row["actor_user_id"],
"actor_login": row["actor_login"],
"actor_display": row["actor_display"],
"created_at": row["created_at"],
"read_at": row["read_at"],
"category": extras.get("category"),
"summary": render_summary(row["event_kind"], row["actor_display"], row["rfc_title"], extras),
})
if bundled:
items = _bundle(items)
unread_count = db.conn().execute(
"SELECT COUNT(*) AS c FROM notifications WHERE recipient_user_id = ? AND read_at IS NULL",
(user_id,),
).fetchone()["c"]
return {"items": items, "unread_count": unread_count}
def _bundle(items: list[dict]) -> list[dict]:
"""§15.2: collapse per-row notifications into per-(RFC, event_kind)
bundles; the bundle's representative row is the most-recent
constituent, with a `bundled_ids` array carrying the rest."""
buckets: dict[tuple, list[dict]] = {}
for it in items:
key = (it["rfc_slug"], it["event_kind"])
buckets.setdefault(key, []).append(it)
bundled = []
for key, rows in buckets.items():
rep = dict(rows[0]) # most recent (sorted DESC above)
rep["bundled_ids"] = [r["id"] for r in rows]
rep["bundled_count"] = len(rows)
bundled.append(rep)
bundled.sort(key=lambda r: r["created_at"], reverse=True)
return bundled
def mark_read_by_filter(
*,
user_id: int,
unread: bool | None = None,
rfc_slug: str | None = None,
category: str | None = None,
actor_user_id: int | None = None,
ids: Iterable[int] | None = None,
) -> int:
"""`Mark all read` per §15.2 — respects the active filter so the
user can mark all churn read without touching personal-direct."""
where = ["recipient_user_id = ?", "read_at IS NULL"]
args: list[Any] = [user_id]
if rfc_slug:
where.append("rfc_slug = ?")
args.append(rfc_slug)
if actor_user_id is not None:
where.append("actor_user_id = ?")
args.append(actor_user_id)
if ids:
ids = list(ids)
if not ids:
return 0
placeholders = ",".join("?" * len(ids))
where.append(f"id IN ({placeholders})")
args.extend(ids)
# Category lives inside payload JSON; we filter post-fetch to keep
# the SQL portable.
rows = db.conn().execute(
f"SELECT id, payload FROM notifications WHERE {' AND '.join(where)}",
args,
).fetchall()
selected_ids: list[int] = []
for r in rows:
if category:
try:
extras = json.loads(r["payload"] or "{}")
except json.JSONDecodeError:
extras = {}
if extras.get("category") != category:
continue
selected_ids.append(r["id"])
if not selected_ids:
return 0
placeholders = ",".join("?" * len(selected_ids))
db.conn().execute(
f"UPDATE notifications SET read_at = datetime('now') WHERE id IN ({placeholders})",
selected_ids,
)
for nid in selected_ids:
_schedule_broadcast(user_id, "read", {"id": nid})
return len(selected_ids)
def reconcile_seen_advance(
*,
user_id: int,
rfc_slug: str,
branch_name: str | None = None,
pr_number: int | None = None,
) -> int:
"""§15.7 reconciler — when a scope cursor advances on visit, mark
matching unread notifications read. Called from the chat-seen and
pr-seen advance endpoints.
"""
where = ["recipient_user_id = ?", "read_at IS NULL", "rfc_slug = ?"]
args: list[Any] = [user_id, rfc_slug]
if pr_number is not None:
where.append("pr_number = ?")
args.append(pr_number)
elif branch_name is not None:
where.append("branch_name = ?")
args.append(branch_name)
rows = db.conn().execute(
f"SELECT id FROM notifications WHERE {' AND '.join(where)}",
args,
).fetchall()
ids = [r["id"] for r in rows]
if not ids:
return 0
placeholders = ",".join("?" * len(ids))
db.conn().execute(
f"UPDATE notifications SET read_at = datetime('now') WHERE id IN ({placeholders})",
ids,
)
for nid in ids:
_schedule_broadcast(user_id, "read", {"id": nid})
return len(ids)
# ---------------------------------------------------------------------------
# 90-day decay sweep (§15.6) — called by the digest job's nightly loop
# ---------------------------------------------------------------------------
def decay_watches() -> int:
"""Downgrade `watching` rows whose last_participation_at is older
than 90 days to `following`. Explicit and role-implicit rows are
exempt; role-implicit isn't stored on the row, so it's a no-op for
those. Returns the number of rows downgraded."""
cur = db.conn().execute(
"""
UPDATE watches
SET state = 'following', last_participation_at = datetime('now')
WHERE state = 'watching'
AND set_by = 'auto'
AND (last_participation_at IS NULL
OR datetime(last_participation_at, '+90 days') < datetime('now'))
"""
)
return cur.rowcount or 0