Files
rfc-app/backend/app/digest.py
T
Ben Stull f67d0aa0db Slice 6: notifications per §15
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 23:09:04 -07:00

276 lines
9.9 KiB
Python

"""§15.5: the digest scheduler.
A background asyncio task runs on a cadence (default hourly, configurable
via DIGEST_TICK_SECONDS for tests and dev). Each tick:
- releases held-during-quiet-hours emails via `email.flush_pending`;
- decays §15.6 watching rows whose last_participation_at is >90 days;
- assembles a digest for every user whose `digest_cadence` is `daily`
or `weekly` and whose next-cadence window has rolled over.
The digest's three exclusion rules per §15.5 are applied at assembly
time, and `notification_digests` records what was included so the next
run skips already-shipped rows.
Production runs the loop continuously; tests drive it via `run_tick()`
for deterministic post-conditions (same shape as Slice 5's `?_sync=1`
seam for the graduation orchestrator).
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
from datetime import datetime, timedelta, timezone
from . import db, email as email_mod, notify
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Scheduler shell
# ---------------------------------------------------------------------------
class DigestScheduler:
"""Periodic task wrapper. Mirrors `cache.Reconciler`'s shape so the
operator's mental model is "this app has two scheduled jobs, both
look the same.\""""
def __init__(self, *, tick_seconds: int | None = None):
self._tick = tick_seconds or int(os.environ.get("DIGEST_TICK_SECONDS", "3600"))
self._task: asyncio.Task | None = None
self._stop = asyncio.Event()
def start(self) -> None:
if self._task is None:
self._task = asyncio.create_task(self._loop())
async def stop(self) -> None:
self._stop.set()
if self._task is not None:
await self._task
async def _loop(self) -> None:
# One tick at startup so a fresh process serves digests immediately
# for any user whose cadence rolled over while the app was down.
await self._safe_tick()
while not self._stop.is_set():
try:
await asyncio.wait_for(self._stop.wait(), timeout=self._tick)
except asyncio.TimeoutError:
pass
if self._stop.is_set():
break
await self._safe_tick()
async def _safe_tick(self) -> None:
try:
run_tick()
except Exception:
log.exception("digest tick failed")
# ---------------------------------------------------------------------------
# The tick itself
# ---------------------------------------------------------------------------
def run_tick() -> dict[str, int]:
"""One pass: flush held emails, decay watches, assemble due digests.
Returns counters for testing/observability. Idempotent on re-entry
— assemble_for_user respects the §15.5 exclusion rules so a second
tick during the same cadence window emits nothing."""
flushed = email_mod.flush_pending()
decayed = notify.decay_watches()
digests_sent = 0
rows = db.conn().execute(
"""
SELECT id, email, display_name, digest_cadence
FROM users
WHERE digest_cadence IN ('daily', 'weekly')
AND email IS NOT NULL AND email != ''
"""
).fetchall()
for row in rows:
if assemble_for_user(
user_id=row["id"],
cadence=row["digest_cadence"],
email=row["email"],
display_name=row["display_name"],
):
digests_sent += 1
return {"flushed": flushed, "decayed": decayed, "digests_sent": digests_sent}
def assemble_for_user(
*,
user_id: int,
cadence: str,
email: str,
display_name: str,
) -> bool:
"""§15.5 digest assembly for one user.
Returns True if a digest was sent, False if skipped (nothing to
report, or cadence window not yet rolled over)."""
cfg = email_mod.EmailConfig.from_env()
if not cfg.enabled:
return False
now = datetime.now(timezone.utc)
last_row = db.conn().execute(
"SELECT sent_at, period_end FROM notification_digests WHERE recipient_user_id = ? ORDER BY id DESC LIMIT 1",
(user_id,),
).fetchone()
period_start = (
_parse_iso(last_row["period_end"]) if last_row and last_row["period_end"] else None
)
if period_start is None:
# First digest: cover everything we have. Cap the lookback at
# 30 days so a fresh subscription doesn't dump months of history.
period_start = now - timedelta(days=30)
if not _cadence_window_rolled_over(period_start, now, cadence):
return False
rows = db.conn().execute(
"""
SELECT n.id, n.event_kind, n.rfc_slug, n.branch_name, n.pr_number,
n.created_at, n.payload, n.read_at, n.email_sent_at,
u.display_name AS actor_display,
r.title AS rfc_title
FROM notifications n
LEFT JOIN users u ON u.id = n.actor_user_id
LEFT JOIN cached_rfcs r ON r.slug = n.rfc_slug
WHERE n.recipient_user_id = ?
AND n.created_at >= ?
ORDER BY n.id ASC
""",
(user_id, period_start.strftime("%Y-%m-%d %H:%M:%S")),
).fetchall()
eligible: list[tuple] = []
for r in rows:
# §15.5 exclusion rule 1: already emailed.
if r["email_sent_at"]:
continue
# §15.5 exclusion rule 2: already read.
if r["read_at"]:
continue
try:
extras = json.loads(r["payload"] or "{}")
except json.JSONDecodeError:
extras = {}
# The §15.5 framing is "the catch-up surface for activity on
# watched RFCs." Personal-direct events have their own email
# path; exclude them from the digest body per the section's
# closing paragraph.
if extras.get("category") == "personal-direct":
continue
eligible.append((r, extras))
if not eligible:
# No digest is sent when there's nothing to report (§15.5),
# but we still record the period roll so the next window
# starts cleanly.
_record_emitted(user_id, period_start, now, ids=[])
return False
subject = _subject(eligible, cadence)
body = _body(eligible, cadence, cfg)
sent = email_mod._deliver(cfg, email, subject, body)
if not sent:
return False
ids = [r["id"] for r, _ in eligible]
placeholders = ",".join("?" * len(ids))
db.conn().execute(
f"UPDATE notifications SET digest_included_at = datetime('now') WHERE id IN ({placeholders})",
ids,
)
_record_emitted(user_id, period_start, now, ids=ids)
return True
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _cadence_window_rolled_over(period_start: datetime, now: datetime, cadence: str) -> bool:
delta = now - period_start
if cadence == "daily":
return delta >= timedelta(days=1)
if cadence == "weekly":
return delta >= timedelta(days=7)
return False
def _parse_iso(text: str) -> datetime | None:
# SQLite stores datetimes as "YYYY-MM-DD HH:MM:SS" via datetime('now').
try:
dt = datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
return dt.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
return None
def _subject(eligible: list[tuple], cadence: str) -> str:
rfcs = {r["rfc_slug"] for r, _ in eligible if r["rfc_slug"]}
label = "Daily" if cadence == "daily" else "Weekly"
return f"[Wiggleverse] {label} digest — {len(eligible)} events across {len(rfcs)} RFCs"
def _body(eligible: list[tuple], cadence: str, cfg) -> str:
label = "the past day" if cadence == "daily" else "the past week"
lines = [f"Activity on RFCs you watch, from {label}.\n"]
grouped: dict[str | None, list[tuple]] = {}
for r, extras in eligible:
grouped.setdefault(r["rfc_slug"], []).append((r, extras))
by_volume = sorted(grouped.items(), key=lambda kv: -len(kv[1]))
for slug, items in by_volume:
title = items[0][0]["rfc_title"] or slug or "(no RFC)"
lines.append(f"\n{title}")
if slug:
lines.append(f" {cfg.app_url}/rfc/{slug}")
# Group by event_kind within the RFC per §15.5's per-RFC
# section shape ("3 PRs opened on RFC-0042 …").
by_kind: dict[str, list[tuple]] = {}
for r, extras in items:
by_kind.setdefault(r["event_kind"], []).append((r, extras))
for event_kind, kind_items in by_kind.items():
if len(kind_items) == 1:
r, extras = kind_items[0]
summary = notify.render_summary(
event_kind, r["actor_display"], r["rfc_title"], extras
)
line = f" · {summary}"
else:
line = f" · {len(kind_items)} {event_kind.replace('_', ' ')} events"
# §15.5 exclusion rule 3: annotate still-unread items as
# "still unread in your inbox."
if any(rr["read_at"] is None for rr, _ in kind_items):
line += " (still unread in your inbox)"
lines.append(line)
lines.append(f"\nOpen your inbox: {cfg.app_url}/inbox")
lines.append(f"Manage digest preferences: {cfg.app_url}/settings/notifications")
return "\n".join(lines) + "\n"
def _record_emitted(user_id: int, period_start: datetime, now: datetime, *, ids: list[int]) -> None:
db.conn().execute(
"""
INSERT INTO notification_digests
(recipient_user_id, sent_at, period_start, period_end, signal_ids_included)
VALUES (?, ?, ?, ?, ?)
""",
(
user_id,
now.strftime("%Y-%m-%d %H:%M:%S"),
period_start.strftime("%Y-%m-%d %H:%M:%S"),
now.strftime("%Y-%m-%d %H:%M:%S"),
json.dumps(ids),
),
)