"""§15.5: the digest scheduler. A background asyncio task runs on a cadence (default hourly, configurable via DIGEST_TICK_SECONDS for tests and dev). Each tick: - releases held-during-quiet-hours emails via `email.flush_pending`; - decays §15.6 watching rows whose last_participation_at is >90 days; - assembles a digest for every user whose `digest_cadence` is `daily` or `weekly` and whose next-cadence window has rolled over. The digest's three exclusion rules per §15.5 are applied at assembly time, and `notification_digests` records what was included so the next run skips already-shipped rows. Production runs the loop continuously; tests drive it via `run_tick()` for deterministic post-conditions (same shape as Slice 5's `?_sync=1` seam for the graduation orchestrator). """ from __future__ import annotations import asyncio import json import logging import os from datetime import datetime, timedelta, timezone from . import db, email as email_mod, notify log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Scheduler shell # --------------------------------------------------------------------------- class DigestScheduler: """Periodic task wrapper. Mirrors `cache.Reconciler`'s shape so the operator's mental model is "this app has two scheduled jobs, both look the same.\"""" def __init__(self, *, tick_seconds: int | None = None): self._tick = tick_seconds or int(os.environ.get("DIGEST_TICK_SECONDS", "3600")) self._task: asyncio.Task | None = None self._stop = asyncio.Event() def start(self) -> None: if self._task is None: self._task = asyncio.create_task(self._loop()) async def stop(self) -> None: self._stop.set() if self._task is not None: await self._task async def _loop(self) -> None: # One tick at startup so a fresh process serves digests immediately # for any user whose cadence rolled over while the app was down. await self._safe_tick() while not self._stop.is_set(): try: await asyncio.wait_for(self._stop.wait(), timeout=self._tick) except asyncio.TimeoutError: pass if self._stop.is_set(): break await self._safe_tick() async def _safe_tick(self) -> None: try: run_tick() except Exception: log.exception("digest tick failed") # --------------------------------------------------------------------------- # The tick itself # --------------------------------------------------------------------------- def run_tick() -> dict[str, int]: """One pass: flush held emails, decay watches, assemble due digests. Returns counters for testing/observability. Idempotent on re-entry — assemble_for_user respects the §15.5 exclusion rules so a second tick during the same cadence window emits nothing.""" flushed = email_mod.flush_pending() decayed = notify.decay_watches() digests_sent = 0 rows = db.conn().execute( """ SELECT id, email, display_name, digest_cadence FROM users WHERE digest_cadence IN ('daily', 'weekly') AND email IS NOT NULL AND email != '' """ ).fetchall() for row in rows: if assemble_for_user( user_id=row["id"], cadence=row["digest_cadence"], email=row["email"], display_name=row["display_name"], ): digests_sent += 1 return {"flushed": flushed, "decayed": decayed, "digests_sent": digests_sent} def assemble_for_user( *, user_id: int, cadence: str, email: str, display_name: str, ) -> bool: """§15.5 digest assembly for one user. Returns True if a digest was sent, False if skipped (nothing to report, or cadence window not yet rolled over).""" cfg = email_mod.EmailConfig.from_env() if not cfg.enabled: return False now = datetime.now(timezone.utc) last_row = db.conn().execute( "SELECT sent_at, period_end FROM notification_digests WHERE recipient_user_id = ? ORDER BY id DESC LIMIT 1", (user_id,), ).fetchone() period_start = ( _parse_iso(last_row["period_end"]) if last_row and last_row["period_end"] else None ) if period_start is None: # First digest: cover everything we have. Cap the lookback at # 30 days so a fresh subscription doesn't dump months of history. period_start = now - timedelta(days=30) if not _cadence_window_rolled_over(period_start, now, cadence): return False rows = db.conn().execute( """ SELECT n.id, n.event_kind, n.rfc_slug, n.branch_name, n.pr_number, n.created_at, n.payload, n.read_at, n.email_sent_at, u.display_name AS actor_display, r.title AS rfc_title FROM notifications n LEFT JOIN users u ON u.id = n.actor_user_id LEFT JOIN cached_rfcs r ON r.slug = n.rfc_slug WHERE n.recipient_user_id = ? AND n.created_at >= ? ORDER BY n.id ASC """, (user_id, period_start.strftime("%Y-%m-%d %H:%M:%S")), ).fetchall() eligible: list[tuple] = [] for r in rows: # §15.5 exclusion rule 1: already emailed. if r["email_sent_at"]: continue # §15.5 exclusion rule 2: already read. if r["read_at"]: continue try: extras = json.loads(r["payload"] or "{}") except json.JSONDecodeError: extras = {} # The §15.5 framing is "the catch-up surface for activity on # watched RFCs." Personal-direct events have their own email # path; exclude them from the digest body per the section's # closing paragraph. if extras.get("category") == "personal-direct": continue eligible.append((r, extras)) if not eligible: # No digest is sent when there's nothing to report (§15.5), # but we still record the period roll so the next window # starts cleanly. _record_emitted(user_id, period_start, now, ids=[]) return False subject = _subject(eligible, cadence) body = _body(eligible, cadence, cfg) sent = email_mod._deliver(cfg, email, subject, body) if not sent: return False ids = [r["id"] for r, _ in eligible] placeholders = ",".join("?" * len(ids)) db.conn().execute( f"UPDATE notifications SET digest_included_at = datetime('now') WHERE id IN ({placeholders})", ids, ) _record_emitted(user_id, period_start, now, ids=ids) return True # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _cadence_window_rolled_over(period_start: datetime, now: datetime, cadence: str) -> bool: delta = now - period_start if cadence == "daily": return delta >= timedelta(days=1) if cadence == "weekly": return delta >= timedelta(days=7) return False def _parse_iso(text: str) -> datetime | None: # SQLite stores datetimes as "YYYY-MM-DD HH:MM:SS" via datetime('now'). try: dt = datetime.strptime(text, "%Y-%m-%d %H:%M:%S") return dt.replace(tzinfo=timezone.utc) except (ValueError, TypeError): return None def _subject(eligible: list[tuple], cadence: str) -> str: rfcs = {r["rfc_slug"] for r, _ in eligible if r["rfc_slug"]} label = "Daily" if cadence == "daily" else "Weekly" return f"[Wiggleverse] {label} digest — {len(eligible)} events across {len(rfcs)} RFCs" def _body(eligible: list[tuple], cadence: str, cfg) -> str: label = "the past day" if cadence == "daily" else "the past week" lines = [f"Activity on RFCs you watch, from {label}.\n"] grouped: dict[str | None, list[tuple]] = {} for r, extras in eligible: grouped.setdefault(r["rfc_slug"], []).append((r, extras)) by_volume = sorted(grouped.items(), key=lambda kv: -len(kv[1])) for slug, items in by_volume: title = items[0][0]["rfc_title"] or slug or "(no RFC)" lines.append(f"\n{title}") if slug: lines.append(f" {cfg.app_url}/rfc/{slug}") # Group by event_kind within the RFC per §15.5's per-RFC # section shape ("3 PRs opened on RFC-0042 …"). by_kind: dict[str, list[tuple]] = {} for r, extras in items: by_kind.setdefault(r["event_kind"], []).append((r, extras)) for event_kind, kind_items in by_kind.items(): if len(kind_items) == 1: r, extras = kind_items[0] summary = notify.render_summary( event_kind, r["actor_display"], r["rfc_title"], extras ) line = f" · {summary}" else: line = f" · {len(kind_items)} {event_kind.replace('_', ' ')} events" # §15.5 exclusion rule 3: annotate still-unread items as # "still unread in your inbox." if any(rr["read_at"] is None for rr, _ in kind_items): line += " (still unread in your inbox)" lines.append(line) lines.append(f"\nOpen your inbox: {cfg.app_url}/inbox") lines.append(f"Manage digest preferences: {cfg.app_url}/settings/notifications") return "\n".join(lines) + "\n" def _record_emitted(user_id: int, period_start: datetime, now: datetime, *, ids: list[int]) -> None: db.conn().execute( """ INSERT INTO notification_digests (recipient_user_id, sent_at, period_start, period_end, signal_ids_included) VALUES (?, ?, ?, ?, ?) """, ( user_id, now.strftime("%Y-%m-%d %H:%M:%S"), period_start.strftime("%Y-%m-%d %H:%M:%S"), now.strftime("%Y-%m-%d %H:%M:%S"), json.dumps(ids), ), )