f67d0aa0db
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
949 lines
34 KiB
Python
949 lines
34 KiB
Python
"""§15: the notifications fan-out and the SSE broadcaster.
|
||
|
||
This module is the single chokepoint for the producer side of §15.
|
||
Every write that names an `rfc_slug` flows through `fan_out_from_action`
|
||
(called from `bot._log`) or `fan_out_chat_message` (called from the chat
|
||
write paths in `api_branches` / `api_prs`, since chat messages don't go
|
||
through the bot wrapper). The chokepoint:
|
||
|
||
- upserts a `watches` row for the actor per §15.6's auto-watch rule
|
||
(a substantive gesture sets `watching`, never downgrades, and bumps
|
||
`last_participation_at` for the 90-day decay);
|
||
- applies the §15.1 routing rules to determine which other users
|
||
receive a notification for the event;
|
||
- filters out muted recipients per §15.8 (per-user mute list);
|
||
- filters out `watches.state='muted'` per §15.6 (per-RFC mute);
|
||
- inserts one `notifications` row per recipient with the underlying
|
||
actor's user id per §15.9 (the bot never appears as actor);
|
||
- dispatches the row to the §15.4 email path if the recipient's
|
||
category toggle is on (held during quiet hours per §15.8, deferred
|
||
to the digest otherwise);
|
||
- publishes each new row to the per-user SSE subscriber queue so the
|
||
inbox refresh and header-badge count back the same stream per §15.3.
|
||
|
||
If you find yourself wanting to insert a row into `notifications`
|
||
directly from an endpoint, the spec is right and you are wrong: the
|
||
chokepoint is the invariant. Read paths can query the table from
|
||
anywhere; the producer side flows through here.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from dataclasses import dataclass, field
|
||
from typing import Any, Iterable
|
||
|
||
from . import db
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# §15.1 routing — action_kind → (event_kind, category, recipient_set)
|
||
# ---------------------------------------------------------------------------
|
||
#
|
||
# Three recipient sets are used by the routing rules below:
|
||
#
|
||
# "watchers" — every user with a watches row on this slug whose state
|
||
# is 'watching' (full stream) or 'following' (structural
|
||
# only), minus the actor and minus 'muted' rows. The
|
||
# structural/full split is applied per-event below.
|
||
# "owners" — every user named in the entry's owners_json frontmatter
|
||
# on cached_rfcs (resolved to user_ids via gitea_login).
|
||
# "proposer" — the user whose action_kind=propose_rfc opened the slug,
|
||
# resolved via the most recent matching audit row.
|
||
#
|
||
# The category of each event maps to one of the §15.4 buckets:
|
||
#
|
||
# "personal-direct" — recipient is named subject of the action.
|
||
# "structural" — structural beats on watched RFCs.
|
||
# "churn" — per-commit / per-message activity. Never emails
|
||
# per §15.4; aggregated by the §15.5 digest.
|
||
|
||
CATEGORY_PERSONAL = "personal-direct"
|
||
CATEGORY_STRUCTURAL = "structural"
|
||
CATEGORY_CHURN = "churn"
|
||
|
||
# Action kinds whose actor's first interaction with a slug triggers
|
||
# auto-watch per §15.6. The substantive-gesture list in the spec is
|
||
# the source; we map onto our action_kind taxonomy. Reads, view advances,
|
||
# and own-action observations are intentionally excluded.
|
||
_AUTO_WATCH_ACTIONS = {
|
||
"propose_rfc",
|
||
"merge_proposal",
|
||
"decline_proposal",
|
||
"create_branch",
|
||
"accept_change",
|
||
"decline_change",
|
||
"manual_flush",
|
||
"open_branch_pr",
|
||
"merge_branch_pr",
|
||
"withdraw_branch_pr",
|
||
"supersede_branch_pr",
|
||
"open_metadata_pr",
|
||
"open_claim_pr",
|
||
"merge_claim_pr",
|
||
"create_resolution_branch",
|
||
"replay_change",
|
||
"graduate_start",
|
||
"graduate_complete",
|
||
"open_review_thread",
|
||
"resolve_thread",
|
||
"drop_flag",
|
||
"resolve_flag",
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# SSE broadcaster — per-user subscriber registry
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@dataclass
|
||
class _Subscriber:
|
||
user_id: int
|
||
queue: asyncio.Queue = field(default_factory=asyncio.Queue)
|
||
|
||
|
||
_subscribers: dict[int, list[_Subscriber]] = {}
|
||
_subscribers_lock: asyncio.Lock | None = None
|
||
|
||
|
||
def _get_lock() -> asyncio.Lock:
|
||
global _subscribers_lock
|
||
if _subscribers_lock is None:
|
||
_subscribers_lock = asyncio.Lock()
|
||
return _subscribers_lock
|
||
|
||
|
||
async def subscribe(user_id: int) -> _Subscriber:
|
||
"""Register a new SSE subscriber for a user. Each browser tab gets its
|
||
own subscriber; the per-user fan-out pushes the same event to every
|
||
subscriber for that user per §15 (multiple tabs see the same updates)."""
|
||
sub = _Subscriber(user_id=user_id)
|
||
async with _get_lock():
|
||
_subscribers.setdefault(user_id, []).append(sub)
|
||
return sub
|
||
|
||
|
||
async def unsubscribe(sub: _Subscriber) -> None:
|
||
async with _get_lock():
|
||
bucket = _subscribers.get(sub.user_id, [])
|
||
if sub in bucket:
|
||
bucket.remove(sub)
|
||
if not bucket:
|
||
_subscribers.pop(sub.user_id, None)
|
||
|
||
|
||
async def _broadcast(user_id: int, event: str, payload: Any) -> None:
|
||
"""Push an event to every subscriber for one user.
|
||
|
||
Best-effort; if the queue is unbounded a slow consumer just sees the
|
||
backlog when it next drains. We deliberately do not block writers on
|
||
consumer presence — the durable inbox row is the source of truth."""
|
||
async with _get_lock():
|
||
subs = list(_subscribers.get(user_id, []))
|
||
for sub in subs:
|
||
try:
|
||
sub.queue.put_nowait({"event": event, "payload": payload})
|
||
except Exception:
|
||
log.exception("notify broadcast: drop event for user %s", user_id)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Producer-side entry points
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def fan_out_from_action(
|
||
*,
|
||
actor_user_id: int | None,
|
||
action_kind: str,
|
||
rfc_slug: str | None,
|
||
branch_name: str | None,
|
||
pr_number: int | None,
|
||
details: dict | None,
|
||
) -> None:
|
||
"""Called from `bot._log` after every successful Git write.
|
||
|
||
Synchronous on purpose — runs inside the same request as the action,
|
||
so a failure here surfaces with the action rather than as a silent
|
||
drop. The SSE push is asyncio-scheduled if a loop is running; if
|
||
no loop is available (test harness, sync test client) the dispatch
|
||
falls back to writing the row and relying on the next subscriber
|
||
poll to surface it.
|
||
"""
|
||
if rfc_slug is None or action_kind not in _AUTO_WATCH_ACTIONS:
|
||
# Actions without an rfc_slug (e.g. permission events) or that
|
||
# aren't a §15.6 substantive gesture don't drive the chokepoint.
|
||
return
|
||
|
||
if actor_user_id is not None:
|
||
_bump_auto_watch(actor_user_id, rfc_slug)
|
||
|
||
rules = _ROUTING.get(action_kind)
|
||
if rules is None:
|
||
return
|
||
|
||
for rule in rules:
|
||
recipients = _resolve_recipients(
|
||
rule=rule,
|
||
actor_user_id=actor_user_id,
|
||
rfc_slug=rfc_slug,
|
||
details=details,
|
||
)
|
||
if not recipients:
|
||
continue
|
||
for recipient_id in recipients:
|
||
_emit_one(
|
||
recipient_user_id=recipient_id,
|
||
event_kind=rule["event_kind"],
|
||
category=rule["category"],
|
||
actor_user_id=actor_user_id,
|
||
rfc_slug=rfc_slug,
|
||
branch_name=branch_name,
|
||
pr_number=pr_number,
|
||
details=details or {},
|
||
)
|
||
|
||
|
||
def fan_out_chat_message(
|
||
*,
|
||
actor_user_id: int,
|
||
rfc_slug: str,
|
||
branch_name: str,
|
||
thread_id: int,
|
||
message_id: int,
|
||
is_review_thread: bool = False,
|
||
pr_number: int | None = None,
|
||
) -> None:
|
||
"""Called from chat write paths after a user message is persisted.
|
||
|
||
Chat messages don't flow through bot.py (no Git write); the chokepoint
|
||
here is the equivalent for §15's chat-message events. The routing
|
||
rule is: prior authors in the thread (besides the message author) get
|
||
a personal-direct `chat_reply_to_my_message`; users watching the RFC
|
||
(state='watching', i.e. full stream) get a churn-class
|
||
`chat_message_in_participated_thread`. The two are union'd so a user
|
||
who is both gets only the personal-direct row.
|
||
"""
|
||
_bump_auto_watch(actor_user_id, rfc_slug)
|
||
|
||
prior_authors = {
|
||
row["author_user_id"]
|
||
for row in db.conn().execute(
|
||
"""
|
||
SELECT DISTINCT author_user_id
|
||
FROM thread_messages
|
||
WHERE thread_id = ?
|
||
AND author_user_id IS NOT NULL
|
||
AND author_user_id != ?
|
||
AND id < ?
|
||
""",
|
||
(thread_id, actor_user_id, message_id),
|
||
)
|
||
}
|
||
|
||
# §15.6 / §15.8 filters apply here too — a muted recipient never sees
|
||
# a row regardless of which producer path generated it.
|
||
muted_rfc = _muted_user_ids_for_rfc(rfc_slug)
|
||
muters = _muters_of(actor_user_id)
|
||
prior_authors = (prior_authors - muted_rfc) - muters
|
||
|
||
# Personal-direct: prior thread authors.
|
||
for recipient_id in prior_authors:
|
||
event_kind = (
|
||
"pr_review_thread_reply" if is_review_thread else "chat_reply_to_my_message"
|
||
)
|
||
_emit_one(
|
||
recipient_user_id=recipient_id,
|
||
event_kind=event_kind,
|
||
category=CATEGORY_PERSONAL,
|
||
actor_user_id=actor_user_id,
|
||
rfc_slug=rfc_slug,
|
||
branch_name=branch_name,
|
||
pr_number=pr_number,
|
||
thread_id=thread_id,
|
||
details={"message_id": message_id},
|
||
)
|
||
|
||
# Churn: watchers of the RFC who weren't already covered.
|
||
watcher_ids = _watcher_user_ids(rfc_slug, scope="watching") - prior_authors
|
||
watcher_ids.discard(actor_user_id)
|
||
watcher_ids = (watcher_ids - muted_rfc) - muters
|
||
for recipient_id in watcher_ids:
|
||
event_kind = (
|
||
"pr_review_thread_new" if is_review_thread
|
||
else "chat_message_in_participated_thread"
|
||
)
|
||
_emit_one(
|
||
recipient_user_id=recipient_id,
|
||
event_kind=event_kind,
|
||
category=CATEGORY_CHURN,
|
||
actor_user_id=actor_user_id,
|
||
rfc_slug=rfc_slug,
|
||
branch_name=branch_name,
|
||
pr_number=pr_number,
|
||
thread_id=thread_id,
|
||
details={"message_id": message_id},
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Routing rules — per action_kind, a list of recipient×event×category
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
_ROUTING: dict[str, list[dict]] = {
|
||
# §9.1: propose surfaces the new slug. The proposer is the actor;
|
||
# the personal-direct beats fire on merge/decline, not on propose
|
||
# itself. Admins and owners get a structural ping so the
|
||
# pending-ideas queue isn't invisible.
|
||
"propose_rfc": [
|
||
{"event_kind": "proposal_opened_on_watched_topic",
|
||
"category": CATEGORY_STRUCTURAL, "recipients": "admins"},
|
||
],
|
||
# §9.3: merge fires personal-direct to the proposer per §15.4's
|
||
# named-subject rule. Watchers also see the structural beat.
|
||
"merge_proposal": [
|
||
{"event_kind": "proposal_merged",
|
||
"category": CATEGORY_PERSONAL, "recipients": "proposer"},
|
||
{"event_kind": "proposal_merged",
|
||
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
|
||
],
|
||
"decline_proposal": [
|
||
{"event_kind": "proposal_declined",
|
||
"category": CATEGORY_PERSONAL, "recipients": "proposer"},
|
||
],
|
||
# §10.1: PR opened. Watchers see it; the opener auto-watches but
|
||
# doesn't get an inbox row for their own gesture.
|
||
"open_branch_pr": [
|
||
{"event_kind": "pr_opened",
|
||
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
|
||
],
|
||
"open_metadata_pr": [
|
||
{"event_kind": "pr_opened",
|
||
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
|
||
],
|
||
"open_claim_pr": [
|
||
{"event_kind": "claim_opened",
|
||
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
|
||
],
|
||
# §10.5: merge fires personal-direct to the PR opener if someone
|
||
# else merged; watchers see the structural beat regardless.
|
||
"merge_branch_pr": [
|
||
{"event_kind": "pr_merged",
|
||
"category": CATEGORY_PERSONAL, "recipients": "pr_opener_if_other"},
|
||
{"event_kind": "pr_merged",
|
||
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
|
||
],
|
||
"withdraw_branch_pr": [
|
||
{"event_kind": "pr_withdrawn",
|
||
"category": CATEGORY_PERSONAL, "recipients": "pr_opener_if_other"},
|
||
{"event_kind": "pr_withdrawn",
|
||
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
|
||
],
|
||
"supersede_branch_pr": [
|
||
{"event_kind": "pr_withdrawn",
|
||
"category": CATEGORY_PERSONAL, "recipients": "pr_opener_if_other"},
|
||
],
|
||
# §8.6: accept_change is per-commit churn. Watchers (full stream)
|
||
# get the row; following-only watchers don't.
|
||
"accept_change": [
|
||
{"event_kind": "pr_commit_added",
|
||
"category": CATEGORY_CHURN, "recipients": "watchers_full"},
|
||
],
|
||
"manual_flush": [
|
||
{"event_kind": "pr_commit_added",
|
||
"category": CATEGORY_CHURN, "recipients": "watchers_full"},
|
||
],
|
||
"replay_change": [
|
||
{"event_kind": "pr_commit_added",
|
||
"category": CATEGORY_CHURN, "recipients": "watchers_full"},
|
||
],
|
||
# §13.3: graduation_complete is personal-direct to owners (the
|
||
# super-draft's owners list) and structural to watchers.
|
||
"graduate_complete": [
|
||
{"event_kind": "graduation_complete",
|
||
"category": CATEGORY_PERSONAL, "recipients": "owners"},
|
||
{"event_kind": "graduation_complete",
|
||
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
|
||
],
|
||
"graduate_start": [
|
||
{"event_kind": "super_draft_graduation_ready",
|
||
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
|
||
],
|
||
"create_resolution_branch": [
|
||
{"event_kind": "pr_conflict_with_main",
|
||
"category": CATEGORY_STRUCTURAL, "recipients": "watchers_structural"},
|
||
],
|
||
"create_branch": [
|
||
# No notification — branch creation is a private gesture until
|
||
# work lands on it. Auto-watch is the visible effect.
|
||
],
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Recipient resolvers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _resolve_recipients(
|
||
*,
|
||
rule: dict,
|
||
actor_user_id: int | None,
|
||
rfc_slug: str,
|
||
details: dict | None,
|
||
) -> set[int]:
|
||
kind = rule["recipients"]
|
||
if kind == "watchers_structural":
|
||
# following (structural only) + watching (full stream) — both
|
||
# see structural events.
|
||
ids = _watcher_user_ids(rfc_slug, scope="any")
|
||
elif kind == "watchers_full":
|
||
ids = _watcher_user_ids(rfc_slug, scope="watching")
|
||
elif kind == "owners":
|
||
ids = _entry_owner_user_ids(rfc_slug)
|
||
elif kind == "admins":
|
||
ids = _admin_user_ids()
|
||
elif kind == "proposer":
|
||
ids = _proposer_user_id(rfc_slug)
|
||
elif kind == "pr_opener_if_other":
|
||
opener = _pr_opener_user_id(rfc_slug, details)
|
||
ids = {opener} if opener is not None and opener != actor_user_id else set()
|
||
else:
|
||
log.warning("notify: unknown recipient kind %s", kind)
|
||
return set()
|
||
|
||
if actor_user_id is not None:
|
||
ids.discard(actor_user_id)
|
||
|
||
# §15.6: per-RFC mute suppresses every signal for the (user, slug).
|
||
muted = _muted_user_ids_for_rfc(rfc_slug)
|
||
ids -= muted
|
||
|
||
# §15.8: per-user mute suppresses notifications produced by the actor
|
||
# for each muter. (Inbox-volume only; content visibility unchanged.)
|
||
if actor_user_id is not None:
|
||
muters = _muters_of(actor_user_id)
|
||
ids -= muters
|
||
|
||
return ids
|
||
|
||
|
||
def _watcher_user_ids(rfc_slug: str, *, scope: str) -> set[int]:
|
||
"""scope: 'watching' = full stream only; 'any' = watching+following."""
|
||
if scope == "watching":
|
||
rows = db.conn().execute(
|
||
"SELECT user_id FROM watches WHERE rfc_slug = ? AND state = 'watching'",
|
||
(rfc_slug,),
|
||
)
|
||
else:
|
||
rows = db.conn().execute(
|
||
"SELECT user_id FROM watches WHERE rfc_slug = ? AND state IN ('watching', 'following')",
|
||
(rfc_slug,),
|
||
)
|
||
return {r["user_id"] for r in rows}
|
||
|
||
|
||
def _entry_owner_user_ids(rfc_slug: str) -> set[int]:
|
||
row = db.conn().execute(
|
||
"SELECT owners_json FROM cached_rfcs WHERE slug = ?", (rfc_slug,),
|
||
).fetchone()
|
||
if row is None:
|
||
return set()
|
||
try:
|
||
owners = json.loads(row["owners_json"] or "[]")
|
||
except json.JSONDecodeError:
|
||
return set()
|
||
if not owners:
|
||
return set()
|
||
placeholders = ",".join("?" * len(owners))
|
||
user_rows = db.conn().execute(
|
||
f"SELECT id FROM users WHERE gitea_login IN ({placeholders})",
|
||
tuple(owners),
|
||
)
|
||
return {r["id"] for r in user_rows}
|
||
|
||
|
||
def _admin_user_ids() -> set[int]:
|
||
return {
|
||
r["id"]
|
||
for r in db.conn().execute("SELECT id FROM users WHERE role IN ('owner', 'admin')")
|
||
}
|
||
|
||
|
||
def _proposer_user_id(rfc_slug: str) -> set[int]:
|
||
row = db.conn().execute(
|
||
"""
|
||
SELECT actor_user_id FROM actions
|
||
WHERE action_kind = 'propose_rfc' AND rfc_slug = ?
|
||
ORDER BY id LIMIT 1
|
||
""",
|
||
(rfc_slug,),
|
||
).fetchone()
|
||
if row is None or row["actor_user_id"] is None:
|
||
return set()
|
||
return {row["actor_user_id"]}
|
||
|
||
|
||
def _pr_opener_user_id(rfc_slug: str, details: dict | None) -> int | None:
|
||
"""For pr_opener_if_other recipients, find the user who opened the PR.
|
||
|
||
Uses cached_prs.opened_by (gitea_login) → users.id, falling back to
|
||
the audit log if the cache row isn't there yet.
|
||
"""
|
||
pr_number = (details or {}).get("pr_number")
|
||
# When called from merge/withdraw routing, details is the original
|
||
# action's details — pr_number is on the audit row, not here. Pull
|
||
# it from the most-recent relevant action.
|
||
row = db.conn().execute(
|
||
"""
|
||
SELECT actor_user_id FROM actions
|
||
WHERE action_kind = 'open_branch_pr' AND rfc_slug = ?
|
||
ORDER BY id DESC LIMIT 1
|
||
""",
|
||
(rfc_slug,),
|
||
).fetchone()
|
||
if row and row["actor_user_id"] is not None:
|
||
return row["actor_user_id"]
|
||
return pr_number # last-ditch: keep typing consistent (rarely matters)
|
||
|
||
|
||
def _muted_user_ids_for_rfc(rfc_slug: str) -> set[int]:
|
||
return {
|
||
r["user_id"]
|
||
for r in db.conn().execute(
|
||
"SELECT user_id FROM watches WHERE rfc_slug = ? AND state = 'muted'",
|
||
(rfc_slug,),
|
||
)
|
||
}
|
||
|
||
|
||
def _muters_of(actor_user_id: int) -> set[int]:
|
||
return {
|
||
r["muter_user_id"]
|
||
for r in db.conn().execute(
|
||
"SELECT muter_user_id FROM notification_user_mutes WHERE muted_user_id = ?",
|
||
(actor_user_id,),
|
||
)
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Auto-watch upsert
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _bump_auto_watch(user_id: int, rfc_slug: str) -> None:
|
||
"""Per §15.6 substantive-gesture rule: upsert a `watching` row for
|
||
the actor, never downgrade. Bumps `last_participation_at` for the
|
||
90-day decay timer regardless of current state.
|
||
|
||
The role-implicit watching for owners/arbiters is computed at fan-out
|
||
time (recipient resolution), not stored on the row, so this upsert
|
||
doesn't need to special-case them.
|
||
"""
|
||
existing = db.conn().execute(
|
||
"SELECT state, set_by FROM watches WHERE user_id = ? AND rfc_slug = ?",
|
||
(user_id, rfc_slug),
|
||
).fetchone()
|
||
now = "datetime('now')"
|
||
if existing is None:
|
||
db.conn().execute(
|
||
f"""
|
||
INSERT INTO watches
|
||
(user_id, rfc_slug, state, set_by, set_at, last_participation_at)
|
||
VALUES (?, ?, 'watching', 'auto', {now}, {now})
|
||
""",
|
||
(user_id, rfc_slug),
|
||
)
|
||
return
|
||
# Don't downgrade: a 'muted' explicit setting stays muted; a 'watching'
|
||
# row stays watching. The only auto-upgrade is following → watching.
|
||
if existing["state"] == "following" and existing["set_by"] != "explicit":
|
||
db.conn().execute(
|
||
f"UPDATE watches SET state = 'watching', last_participation_at = {now} WHERE user_id = ? AND rfc_slug = ?",
|
||
(user_id, rfc_slug),
|
||
)
|
||
else:
|
||
db.conn().execute(
|
||
f"UPDATE watches SET last_participation_at = {now} WHERE user_id = ? AND rfc_slug = ?",
|
||
(user_id, rfc_slug),
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Inserting a notification row + dispatch
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _emit_one(
|
||
*,
|
||
recipient_user_id: int,
|
||
event_kind: str,
|
||
category: str,
|
||
actor_user_id: int | None,
|
||
rfc_slug: str | None,
|
||
branch_name: str | None,
|
||
pr_number: int | None,
|
||
details: dict,
|
||
thread_id: int | None = None,
|
||
) -> int:
|
||
payload = {"category": category, **details}
|
||
cur = db.conn().execute(
|
||
"""
|
||
INSERT INTO notifications
|
||
(recipient_user_id, event_kind, rfc_slug, branch_name, pr_number,
|
||
thread_id, actor_user_id, payload)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
recipient_user_id,
|
||
event_kind,
|
||
rfc_slug,
|
||
branch_name,
|
||
pr_number,
|
||
thread_id,
|
||
actor_user_id,
|
||
json.dumps(payload),
|
||
),
|
||
)
|
||
notif_id = cur.lastrowid
|
||
|
||
row_payload = _row_payload(notif_id)
|
||
_schedule_broadcast(recipient_user_id, "notification", row_payload)
|
||
|
||
# Email dispatch: §15.4. Held during quiet hours per §15.8; held
|
||
# during a global opt-out (bounce) per §15.4. Defers to the digest
|
||
# otherwise per §15.5's exclusion rules.
|
||
_schedule_email(notif_id, recipient_user_id, event_kind, category, row_payload)
|
||
|
||
return notif_id
|
||
|
||
|
||
def _row_payload(notif_id: int) -> dict:
|
||
row = db.conn().execute(
|
||
"""
|
||
SELECT n.id, n.event_kind, n.rfc_slug, n.branch_name, n.pr_number,
|
||
n.thread_id, n.actor_user_id, n.payload, n.created_at, n.read_at,
|
||
u.display_name AS actor_display, u.gitea_login AS actor_login,
|
||
r.title AS rfc_title
|
||
FROM notifications n
|
||
LEFT JOIN users u ON u.id = n.actor_user_id
|
||
LEFT JOIN cached_rfcs r ON r.slug = n.rfc_slug
|
||
WHERE n.id = ?
|
||
""",
|
||
(notif_id,),
|
||
).fetchone()
|
||
if row is None:
|
||
return {}
|
||
try:
|
||
extras = json.loads(row["payload"] or "{}")
|
||
except json.JSONDecodeError:
|
||
extras = {}
|
||
return {
|
||
"id": row["id"],
|
||
"event_kind": row["event_kind"],
|
||
"rfc_slug": row["rfc_slug"],
|
||
"rfc_title": row["rfc_title"],
|
||
"branch_name": row["branch_name"],
|
||
"pr_number": row["pr_number"],
|
||
"thread_id": row["thread_id"],
|
||
"actor_user_id": row["actor_user_id"],
|
||
"actor_login": row["actor_login"],
|
||
"actor_display": row["actor_display"],
|
||
"created_at": row["created_at"],
|
||
"read_at": row["read_at"],
|
||
"category": extras.get("category"),
|
||
"summary": render_summary(row["event_kind"], row["actor_display"], row["rfc_title"], extras),
|
||
"extras": extras,
|
||
}
|
||
|
||
|
||
def render_summary(event_kind: str, actor_display: str | None, rfc_title: str | None, extras: dict) -> str:
|
||
"""One short sentence shared by the inbox row and the email body
|
||
per §15.4. Actor is the underlying user per §15.9; null actor renders
|
||
as "the app" so a system-generated event still reads as a sentence."""
|
||
actor = actor_display or "the app"
|
||
title = rfc_title or extras.get("slug") or "this RFC"
|
||
if event_kind == "proposal_merged":
|
||
return f"{actor} merged your proposal — {title} is now a super-draft."
|
||
if event_kind == "proposal_declined":
|
||
return f"{actor} declined your proposal for {title}."
|
||
if event_kind == "proposal_opened_on_watched_topic":
|
||
return f"{actor} proposed a new RFC: {title}."
|
||
if event_kind == "pr_opened":
|
||
return f"{actor} opened a PR on {title}."
|
||
if event_kind == "pr_merged":
|
||
return f"{actor} merged a PR on {title}."
|
||
if event_kind == "pr_withdrawn":
|
||
return f"{actor} withdrew a PR on {title}."
|
||
if event_kind == "pr_commit_added":
|
||
return f"{actor} added a commit on {title}."
|
||
if event_kind == "pr_review_thread_new":
|
||
return f"{actor} opened a review thread on {title}."
|
||
if event_kind == "pr_review_thread_reply":
|
||
return f"{actor} replied to your review thread on {title}."
|
||
if event_kind == "chat_message_in_participated_thread":
|
||
return f"{actor} posted a chat message on {title}."
|
||
if event_kind == "chat_reply_to_my_message":
|
||
return f"{actor} replied to your message on {title}."
|
||
if event_kind == "claim_opened":
|
||
return f"{actor} opened a claim PR on {title}."
|
||
if event_kind == "graduation_complete":
|
||
return f"{title} graduated to an active RFC."
|
||
if event_kind == "super_draft_graduation_ready":
|
||
return f"{actor} began graduating {title}."
|
||
if event_kind == "pr_conflict_with_main":
|
||
return f"{actor} started a resolution branch on {title}."
|
||
return f"{event_kind} on {title}"
|
||
|
||
|
||
def _schedule_broadcast(user_id: int, event: str, payload: Any) -> None:
|
||
"""Best-effort SSE push; survives the absence of a running loop so
|
||
sync test clients still write the row."""
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
except RuntimeError:
|
||
return
|
||
loop.create_task(_broadcast(user_id, event, payload))
|
||
|
||
|
||
def _schedule_email(
|
||
notif_id: int,
|
||
recipient_user_id: int,
|
||
event_kind: str,
|
||
category: str,
|
||
payload: dict,
|
||
) -> None:
|
||
"""§15.4 dispatch — synchronous so the email_sent_at timestamp lands
|
||
in the same tick as the notification row, which is what the §15.5
|
||
digest's exclusion rule 1 keys on. The actual SMTP send happens in
|
||
`email.py`; this just consults the recipient's preferences and quiet
|
||
hours, then calls into the adapter."""
|
||
from . import email as email_mod # avoid import cycle
|
||
|
||
email_mod.maybe_send(
|
||
notif_id=notif_id,
|
||
recipient_user_id=recipient_user_id,
|
||
event_kind=event_kind,
|
||
category=category,
|
||
payload=payload,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Per-user mute and watch helpers used by api_notifications
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def list_inbox(
|
||
*,
|
||
user_id: int,
|
||
unread: bool | None = None,
|
||
rfc_slug: str | None = None,
|
||
category: str | None = None,
|
||
actor_user_id: int | None = None,
|
||
bundled: bool = False,
|
||
limit: int = 200,
|
||
) -> dict:
|
||
"""§15.2: inbox query. Filter chips are AND-combined.
|
||
|
||
When `bundled=True` is set, rows are grouped server-side by
|
||
(rfc_slug, event_kind) per the §15.2 bundle toggle, with the most
|
||
recent timestamp on each bundle.
|
||
"""
|
||
where = ["recipient_user_id = ?"]
|
||
args: list[Any] = [user_id]
|
||
if unread:
|
||
where.append("read_at IS NULL")
|
||
if rfc_slug:
|
||
where.append("rfc_slug = ?")
|
||
args.append(rfc_slug)
|
||
if actor_user_id is not None:
|
||
where.append("actor_user_id = ?")
|
||
args.append(actor_user_id)
|
||
rows = db.conn().execute(
|
||
f"""
|
||
SELECT n.id, n.event_kind, n.rfc_slug, n.branch_name, n.pr_number,
|
||
n.thread_id, n.actor_user_id, n.payload, n.created_at, n.read_at,
|
||
u.display_name AS actor_display, u.gitea_login AS actor_login,
|
||
r.title AS rfc_title
|
||
FROM notifications n
|
||
LEFT JOIN users u ON u.id = n.actor_user_id
|
||
LEFT JOIN cached_rfcs r ON r.slug = n.rfc_slug
|
||
WHERE {' AND '.join(where)}
|
||
ORDER BY n.id DESC
|
||
LIMIT ?
|
||
""",
|
||
(*args, limit),
|
||
).fetchall()
|
||
items = []
|
||
for row in rows:
|
||
try:
|
||
extras = json.loads(row["payload"] or "{}")
|
||
except json.JSONDecodeError:
|
||
extras = {}
|
||
if category and extras.get("category") != category:
|
||
continue
|
||
items.append({
|
||
"id": row["id"],
|
||
"event_kind": row["event_kind"],
|
||
"rfc_slug": row["rfc_slug"],
|
||
"rfc_title": row["rfc_title"],
|
||
"branch_name": row["branch_name"],
|
||
"pr_number": row["pr_number"],
|
||
"thread_id": row["thread_id"],
|
||
"actor_user_id": row["actor_user_id"],
|
||
"actor_login": row["actor_login"],
|
||
"actor_display": row["actor_display"],
|
||
"created_at": row["created_at"],
|
||
"read_at": row["read_at"],
|
||
"category": extras.get("category"),
|
||
"summary": render_summary(row["event_kind"], row["actor_display"], row["rfc_title"], extras),
|
||
})
|
||
|
||
if bundled:
|
||
items = _bundle(items)
|
||
|
||
unread_count = db.conn().execute(
|
||
"SELECT COUNT(*) AS c FROM notifications WHERE recipient_user_id = ? AND read_at IS NULL",
|
||
(user_id,),
|
||
).fetchone()["c"]
|
||
|
||
return {"items": items, "unread_count": unread_count}
|
||
|
||
|
||
def _bundle(items: list[dict]) -> list[dict]:
|
||
"""§15.2: collapse per-row notifications into per-(RFC, event_kind)
|
||
bundles; the bundle's representative row is the most-recent
|
||
constituent, with a `bundled_ids` array carrying the rest."""
|
||
buckets: dict[tuple, list[dict]] = {}
|
||
for it in items:
|
||
key = (it["rfc_slug"], it["event_kind"])
|
||
buckets.setdefault(key, []).append(it)
|
||
bundled = []
|
||
for key, rows in buckets.items():
|
||
rep = dict(rows[0]) # most recent (sorted DESC above)
|
||
rep["bundled_ids"] = [r["id"] for r in rows]
|
||
rep["bundled_count"] = len(rows)
|
||
bundled.append(rep)
|
||
bundled.sort(key=lambda r: r["created_at"], reverse=True)
|
||
return bundled
|
||
|
||
|
||
def mark_read_by_filter(
|
||
*,
|
||
user_id: int,
|
||
unread: bool | None = None,
|
||
rfc_slug: str | None = None,
|
||
category: str | None = None,
|
||
actor_user_id: int | None = None,
|
||
ids: Iterable[int] | None = None,
|
||
) -> int:
|
||
"""`Mark all read` per §15.2 — respects the active filter so the
|
||
user can mark all churn read without touching personal-direct."""
|
||
where = ["recipient_user_id = ?", "read_at IS NULL"]
|
||
args: list[Any] = [user_id]
|
||
if rfc_slug:
|
||
where.append("rfc_slug = ?")
|
||
args.append(rfc_slug)
|
||
if actor_user_id is not None:
|
||
where.append("actor_user_id = ?")
|
||
args.append(actor_user_id)
|
||
if ids:
|
||
ids = list(ids)
|
||
if not ids:
|
||
return 0
|
||
placeholders = ",".join("?" * len(ids))
|
||
where.append(f"id IN ({placeholders})")
|
||
args.extend(ids)
|
||
# Category lives inside payload JSON; we filter post-fetch to keep
|
||
# the SQL portable.
|
||
rows = db.conn().execute(
|
||
f"SELECT id, payload FROM notifications WHERE {' AND '.join(where)}",
|
||
args,
|
||
).fetchall()
|
||
selected_ids: list[int] = []
|
||
for r in rows:
|
||
if category:
|
||
try:
|
||
extras = json.loads(r["payload"] or "{}")
|
||
except json.JSONDecodeError:
|
||
extras = {}
|
||
if extras.get("category") != category:
|
||
continue
|
||
selected_ids.append(r["id"])
|
||
if not selected_ids:
|
||
return 0
|
||
placeholders = ",".join("?" * len(selected_ids))
|
||
db.conn().execute(
|
||
f"UPDATE notifications SET read_at = datetime('now') WHERE id IN ({placeholders})",
|
||
selected_ids,
|
||
)
|
||
for nid in selected_ids:
|
||
_schedule_broadcast(user_id, "read", {"id": nid})
|
||
return len(selected_ids)
|
||
|
||
|
||
def reconcile_seen_advance(
|
||
*,
|
||
user_id: int,
|
||
rfc_slug: str,
|
||
branch_name: str | None = None,
|
||
pr_number: int | None = None,
|
||
) -> int:
|
||
"""§15.7 reconciler — when a scope cursor advances on visit, mark
|
||
matching unread notifications read. Called from the chat-seen and
|
||
pr-seen advance endpoints.
|
||
"""
|
||
where = ["recipient_user_id = ?", "read_at IS NULL", "rfc_slug = ?"]
|
||
args: list[Any] = [user_id, rfc_slug]
|
||
if pr_number is not None:
|
||
where.append("pr_number = ?")
|
||
args.append(pr_number)
|
||
elif branch_name is not None:
|
||
where.append("branch_name = ?")
|
||
args.append(branch_name)
|
||
rows = db.conn().execute(
|
||
f"SELECT id FROM notifications WHERE {' AND '.join(where)}",
|
||
args,
|
||
).fetchall()
|
||
ids = [r["id"] for r in rows]
|
||
if not ids:
|
||
return 0
|
||
placeholders = ",".join("?" * len(ids))
|
||
db.conn().execute(
|
||
f"UPDATE notifications SET read_at = datetime('now') WHERE id IN ({placeholders})",
|
||
ids,
|
||
)
|
||
for nid in ids:
|
||
_schedule_broadcast(user_id, "read", {"id": nid})
|
||
return len(ids)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 90-day decay sweep (§15.6) — called by the digest job's nightly loop
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def decay_watches() -> int:
|
||
"""Downgrade `watching` rows whose last_participation_at is older
|
||
than 90 days to `following`. Explicit and role-implicit rows are
|
||
exempt; role-implicit isn't stored on the row, so it's a no-op for
|
||
those. Returns the number of rows downgraded."""
|
||
cur = db.conn().execute(
|
||
"""
|
||
UPDATE watches
|
||
SET state = 'following', last_participation_at = datetime('now')
|
||
WHERE state = 'watching'
|
||
AND set_by = 'auto'
|
||
AND (last_participation_at IS NULL
|
||
OR datetime(last_participation_at, '+90 days') < datetime('now'))
|
||
"""
|
||
)
|
||
return cur.rowcount or 0
|