f67d0aa0db
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
276 lines
9.9 KiB
Python
276 lines
9.9 KiB
Python
"""§15.5: the digest scheduler.
|
|
|
|
A background asyncio task runs on a cadence (default hourly, configurable
|
|
via DIGEST_TICK_SECONDS for tests and dev). Each tick:
|
|
|
|
- releases held-during-quiet-hours emails via `email.flush_pending`;
|
|
- decays §15.6 watching rows whose last_participation_at is >90 days;
|
|
- assembles a digest for every user whose `digest_cadence` is `daily`
|
|
or `weekly` and whose next-cadence window has rolled over.
|
|
|
|
The digest's three exclusion rules per §15.5 are applied at assembly
|
|
time, and `notification_digests` records what was included so the next
|
|
run skips already-shipped rows.
|
|
|
|
Production runs the loop continuously; tests drive it via `run_tick()`
|
|
for deterministic post-conditions (same shape as Slice 5's `?_sync=1`
|
|
seam for the graduation orchestrator).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from . import db, email as email_mod, notify
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scheduler shell
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class DigestScheduler:
|
|
"""Periodic task wrapper. Mirrors `cache.Reconciler`'s shape so the
|
|
operator's mental model is "this app has two scheduled jobs, both
|
|
look the same.\""""
|
|
|
|
def __init__(self, *, tick_seconds: int | None = None):
|
|
self._tick = tick_seconds or int(os.environ.get("DIGEST_TICK_SECONDS", "3600"))
|
|
self._task: asyncio.Task | None = None
|
|
self._stop = asyncio.Event()
|
|
|
|
def start(self) -> None:
|
|
if self._task is None:
|
|
self._task = asyncio.create_task(self._loop())
|
|
|
|
async def stop(self) -> None:
|
|
self._stop.set()
|
|
if self._task is not None:
|
|
await self._task
|
|
|
|
async def _loop(self) -> None:
|
|
# One tick at startup so a fresh process serves digests immediately
|
|
# for any user whose cadence rolled over while the app was down.
|
|
await self._safe_tick()
|
|
while not self._stop.is_set():
|
|
try:
|
|
await asyncio.wait_for(self._stop.wait(), timeout=self._tick)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
if self._stop.is_set():
|
|
break
|
|
await self._safe_tick()
|
|
|
|
async def _safe_tick(self) -> None:
|
|
try:
|
|
run_tick()
|
|
except Exception:
|
|
log.exception("digest tick failed")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# The tick itself
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def run_tick() -> dict[str, int]:
|
|
"""One pass: flush held emails, decay watches, assemble due digests.
|
|
|
|
Returns counters for testing/observability. Idempotent on re-entry
|
|
— assemble_for_user respects the §15.5 exclusion rules so a second
|
|
tick during the same cadence window emits nothing."""
|
|
flushed = email_mod.flush_pending()
|
|
decayed = notify.decay_watches()
|
|
digests_sent = 0
|
|
rows = db.conn().execute(
|
|
"""
|
|
SELECT id, email, display_name, digest_cadence
|
|
FROM users
|
|
WHERE digest_cadence IN ('daily', 'weekly')
|
|
AND email IS NOT NULL AND email != ''
|
|
"""
|
|
).fetchall()
|
|
for row in rows:
|
|
if assemble_for_user(
|
|
user_id=row["id"],
|
|
cadence=row["digest_cadence"],
|
|
email=row["email"],
|
|
display_name=row["display_name"],
|
|
):
|
|
digests_sent += 1
|
|
return {"flushed": flushed, "decayed": decayed, "digests_sent": digests_sent}
|
|
|
|
|
|
def assemble_for_user(
|
|
*,
|
|
user_id: int,
|
|
cadence: str,
|
|
email: str,
|
|
display_name: str,
|
|
) -> bool:
|
|
"""§15.5 digest assembly for one user.
|
|
|
|
Returns True if a digest was sent, False if skipped (nothing to
|
|
report, or cadence window not yet rolled over)."""
|
|
cfg = email_mod.EmailConfig.from_env()
|
|
if not cfg.enabled:
|
|
return False
|
|
now = datetime.now(timezone.utc)
|
|
last_row = db.conn().execute(
|
|
"SELECT sent_at, period_end FROM notification_digests WHERE recipient_user_id = ? ORDER BY id DESC LIMIT 1",
|
|
(user_id,),
|
|
).fetchone()
|
|
period_start = (
|
|
_parse_iso(last_row["period_end"]) if last_row and last_row["period_end"] else None
|
|
)
|
|
if period_start is None:
|
|
# First digest: cover everything we have. Cap the lookback at
|
|
# 30 days so a fresh subscription doesn't dump months of history.
|
|
period_start = now - timedelta(days=30)
|
|
if not _cadence_window_rolled_over(period_start, now, cadence):
|
|
return False
|
|
|
|
rows = db.conn().execute(
|
|
"""
|
|
SELECT n.id, n.event_kind, n.rfc_slug, n.branch_name, n.pr_number,
|
|
n.created_at, n.payload, n.read_at, n.email_sent_at,
|
|
u.display_name AS actor_display,
|
|
r.title AS rfc_title
|
|
FROM notifications n
|
|
LEFT JOIN users u ON u.id = n.actor_user_id
|
|
LEFT JOIN cached_rfcs r ON r.slug = n.rfc_slug
|
|
WHERE n.recipient_user_id = ?
|
|
AND n.created_at >= ?
|
|
ORDER BY n.id ASC
|
|
""",
|
|
(user_id, period_start.strftime("%Y-%m-%d %H:%M:%S")),
|
|
).fetchall()
|
|
|
|
eligible: list[tuple] = []
|
|
for r in rows:
|
|
# §15.5 exclusion rule 1: already emailed.
|
|
if r["email_sent_at"]:
|
|
continue
|
|
# §15.5 exclusion rule 2: already read.
|
|
if r["read_at"]:
|
|
continue
|
|
try:
|
|
extras = json.loads(r["payload"] or "{}")
|
|
except json.JSONDecodeError:
|
|
extras = {}
|
|
# The §15.5 framing is "the catch-up surface for activity on
|
|
# watched RFCs." Personal-direct events have their own email
|
|
# path; exclude them from the digest body per the section's
|
|
# closing paragraph.
|
|
if extras.get("category") == "personal-direct":
|
|
continue
|
|
eligible.append((r, extras))
|
|
|
|
if not eligible:
|
|
# No digest is sent when there's nothing to report (§15.5),
|
|
# but we still record the period roll so the next window
|
|
# starts cleanly.
|
|
_record_emitted(user_id, period_start, now, ids=[])
|
|
return False
|
|
|
|
subject = _subject(eligible, cadence)
|
|
body = _body(eligible, cadence, cfg)
|
|
sent = email_mod._deliver(cfg, email, subject, body)
|
|
if not sent:
|
|
return False
|
|
ids = [r["id"] for r, _ in eligible]
|
|
placeholders = ",".join("?" * len(ids))
|
|
db.conn().execute(
|
|
f"UPDATE notifications SET digest_included_at = datetime('now') WHERE id IN ({placeholders})",
|
|
ids,
|
|
)
|
|
_record_emitted(user_id, period_start, now, ids=ids)
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _cadence_window_rolled_over(period_start: datetime, now: datetime, cadence: str) -> bool:
|
|
delta = now - period_start
|
|
if cadence == "daily":
|
|
return delta >= timedelta(days=1)
|
|
if cadence == "weekly":
|
|
return delta >= timedelta(days=7)
|
|
return False
|
|
|
|
|
|
def _parse_iso(text: str) -> datetime | None:
|
|
# SQLite stores datetimes as "YYYY-MM-DD HH:MM:SS" via datetime('now').
|
|
try:
|
|
dt = datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
|
|
return dt.replace(tzinfo=timezone.utc)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def _subject(eligible: list[tuple], cadence: str) -> str:
|
|
rfcs = {r["rfc_slug"] for r, _ in eligible if r["rfc_slug"]}
|
|
label = "Daily" if cadence == "daily" else "Weekly"
|
|
return f"[Wiggleverse] {label} digest — {len(eligible)} events across {len(rfcs)} RFCs"
|
|
|
|
|
|
def _body(eligible: list[tuple], cadence: str, cfg) -> str:
|
|
label = "the past day" if cadence == "daily" else "the past week"
|
|
lines = [f"Activity on RFCs you watch, from {label}.\n"]
|
|
grouped: dict[str | None, list[tuple]] = {}
|
|
for r, extras in eligible:
|
|
grouped.setdefault(r["rfc_slug"], []).append((r, extras))
|
|
by_volume = sorted(grouped.items(), key=lambda kv: -len(kv[1]))
|
|
for slug, items in by_volume:
|
|
title = items[0][0]["rfc_title"] or slug or "(no RFC)"
|
|
lines.append(f"\n{title}")
|
|
if slug:
|
|
lines.append(f" {cfg.app_url}/rfc/{slug}")
|
|
# Group by event_kind within the RFC per §15.5's per-RFC
|
|
# section shape ("3 PRs opened on RFC-0042 …").
|
|
by_kind: dict[str, list[tuple]] = {}
|
|
for r, extras in items:
|
|
by_kind.setdefault(r["event_kind"], []).append((r, extras))
|
|
for event_kind, kind_items in by_kind.items():
|
|
if len(kind_items) == 1:
|
|
r, extras = kind_items[0]
|
|
summary = notify.render_summary(
|
|
event_kind, r["actor_display"], r["rfc_title"], extras
|
|
)
|
|
line = f" · {summary}"
|
|
else:
|
|
line = f" · {len(kind_items)} {event_kind.replace('_', ' ')} events"
|
|
# §15.5 exclusion rule 3: annotate still-unread items as
|
|
# "still unread in your inbox."
|
|
if any(rr["read_at"] is None for rr, _ in kind_items):
|
|
line += " (still unread in your inbox)"
|
|
lines.append(line)
|
|
lines.append(f"\nOpen your inbox: {cfg.app_url}/inbox")
|
|
lines.append(f"Manage digest preferences: {cfg.app_url}/settings/notifications")
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def _record_emitted(user_id: int, period_start: datetime, now: datetime, *, ids: list[int]) -> None:
|
|
db.conn().execute(
|
|
"""
|
|
INSERT INTO notification_digests
|
|
(recipient_user_id, sent_at, period_start, period_end, signal_ids_included)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
user_id,
|
|
now.strftime("%Y-%m-%d %H:%M:%S"),
|
|
period_start.strftime("%Y-%m-%d %H:%M:%S"),
|
|
now.strftime("%Y-%m-%d %H:%M:%S"),
|
|
json.dumps(ids),
|
|
),
|
|
)
|