Files
Ben Stull f67d0aa0db Slice 6: notifications per §15
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 23:09:04 -07:00

448 lines
16 KiB
Python

"""§15.4: the email loop.
The transactional-email adapter is a thin wrapper over SMTP. When SMTP
credentials aren't configured the adapter falls back to logging the
envelope to stdout — sufficient for dev and for the integration tests,
which assert on the log buffer rather than spinning up a mail server.
Per §15.4: opt-in per category (with category-specific defaults), one-
click unsubscribe per category via signed URL, single non-spoofing From
identity, body mirrors the inbox row verbatim. Bounces and complaints
route to a global opt-out per the same section.
Quiet hours per §15.8 hold the send (the notification row still lands;
the email defers) until the window ends. The release-from-hold pass is
the same `flush_pending` the digest job calls; it bundles into a single
"Activity while you were away" email when more than a threshold
accumulated, otherwise sending individually.
"""
from __future__ import annotations
import json
import logging
import os
import smtplib
from dataclasses import dataclass
from datetime import datetime, time, timezone
from email.message import EmailMessage
from email.utils import formataddr
from itertools import groupby
from typing import Any
from urllib.parse import urlencode
from itsdangerous import BadSignature, URLSafeSerializer
from . import db
log = logging.getLogger(__name__)
# Buffer of outbound envelopes for test inspection. The integration tests
# read from this rather than spinning up a real SMTP server. Production
# leaves it as an unbounded list (it's never read), which is fine at
# v1 volumes; if it ever becomes a memory issue, we cap it then.
_SENT: list[dict] = []
def sent_envelopes() -> list[dict]:
return list(_SENT)
def reset_sent_envelopes() -> None:
_SENT.clear()
# ---------------------------------------------------------------------------
# Configuration — read from env at call time so tests can monkeypatch
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class EmailConfig:
smtp_host: str
smtp_port: int
smtp_user: str
smtp_password: str
smtp_starttls: bool
from_address: str
from_name: str
app_url: str
bundle_threshold: int
enabled: bool
@classmethod
def from_env(cls) -> "EmailConfig":
host = os.environ.get("SMTP_HOST", "").strip()
return cls(
smtp_host=host,
smtp_port=int(os.environ.get("SMTP_PORT", "587")),
smtp_user=os.environ.get("SMTP_USER", "").strip(),
smtp_password=os.environ.get("SMTP_PASSWORD", ""),
smtp_starttls=os.environ.get("SMTP_STARTTLS", "1") not in ("0", "false", "False", ""),
from_address=os.environ.get("EMAIL_FROM", "notifications@wiggleverse.local").strip(),
from_name=os.environ.get("EMAIL_FROM_NAME", "Wiggleverse").strip(),
app_url=os.environ.get("APP_URL", "http://localhost:8000").rstrip("/"),
bundle_threshold=int(os.environ.get("EMAIL_BUNDLE_THRESHOLD", "5")),
enabled=os.environ.get("EMAIL_ENABLED", "1") not in ("0", "false", "False"),
)
# Signed-URL token for §15.4 one-click unsubscribe. The signer is
# scoped to (user_id, category) so the URL is idempotent and revocable
# by rotating SECRET_KEY.
def _signer() -> URLSafeSerializer:
secret = os.environ.get("SECRET_KEY", "")
if not secret:
raise RuntimeError("SECRET_KEY required for email signing")
return URLSafeSerializer(secret, salt="email-unsubscribe")
def make_unsubscribe_url(user_id: int, category: str) -> str:
cfg = EmailConfig.from_env()
token = _signer().dumps({"u": user_id, "c": category})
qs = urlencode({"t": token})
return f"{cfg.app_url}/api/email/unsubscribe?{qs}"
def verify_unsubscribe_token(token: str) -> tuple[int, str]:
"""Returns (user_id, category) or raises BadSignature."""
data = _signer().loads(token)
return int(data["u"]), str(data["c"])
# ---------------------------------------------------------------------------
# Category routing
# ---------------------------------------------------------------------------
# Map §15.1 event_kinds to one of the four §15.4 categories. The 'churn'
# category never emails per §15.4 — naming the refusal in settings is
# more honest than silently omitting the toggle.
_EVENT_TO_CATEGORY: dict[str, str] = {
"proposal_merged": "personal-direct",
"proposal_declined": "personal-direct",
"proposal_opened_on_watched_topic": "structural",
"pr_opened": "structural",
"pr_merged": "structural",
"pr_withdrawn": "structural",
"pr_commit_added": "churn",
"pr_review_thread_new": "structural",
"pr_review_thread_reply": "personal-direct",
"pr_conflict_with_main": "structural",
"chat_message_in_participated_thread": "churn",
"chat_reply_to_my_message": "personal-direct",
"change_proposed_on_edited_passage": "personal-direct",
"flag_dropped_on_watched_rfc": "structural",
"flag_resolved_on_my_flag": "personal-direct",
"contribute_grant_added": "personal-direct",
"contribute_grant_revoked": "personal-direct",
"graduation_complete": "personal-direct",
"super_draft_graduation_ready": "admin-actionable",
"claim_opened": "structural",
}
def category_for(event_kind: str, fallback_category: str) -> str:
return _EVENT_TO_CATEGORY.get(event_kind, fallback_category)
def _user_wants_email(user_row: Any, category: str) -> bool:
"""Apply the §15.4 per-category toggle. Churn always returns False
(the toggle is permanently off per the spec). Global opt-out
(`email_opt_out_all`) suppresses every category."""
if user_row["email_opt_out_all"]:
return False
if category == "churn":
return False
if category == "personal-direct":
return bool(user_row["email_personal_direct"])
if category == "structural":
return bool(user_row["email_watched_structural"])
if category == "admin-actionable":
if user_row["role"] not in ("owner", "admin"):
return False
return bool(user_row["email_admin_actionable"])
return False
# ---------------------------------------------------------------------------
# Quiet hours (§15.8)
# ---------------------------------------------------------------------------
def _in_quiet_hours(user_row: Any) -> bool:
start = user_row["notification_quiet_hours_start"]
end = user_row["notification_quiet_hours_end"]
tz_name = user_row["notification_quiet_hours_timezone"]
if not (start and end and tz_name):
return False
try:
from zoneinfo import ZoneInfo
tz = ZoneInfo(tz_name)
except Exception:
return False
now_local = datetime.now(tz).time()
start_t = _parse_hhmm(start)
end_t = _parse_hhmm(end)
if start_t is None or end_t is None:
return False
if start_t <= end_t:
return start_t <= now_local < end_t
# Wraps midnight (e.g. 22:00 → 07:00).
return now_local >= start_t or now_local < end_t
def _parse_hhmm(text: str) -> time | None:
try:
hh, mm = text.split(":", 1)
return time(int(hh), int(mm))
except (ValueError, AttributeError):
return None
# ---------------------------------------------------------------------------
# The dispatch entry point — called from notify._schedule_email
# ---------------------------------------------------------------------------
def maybe_send(
*,
notif_id: int,
recipient_user_id: int,
event_kind: str,
category: str,
payload: dict,
) -> None:
"""Apply §15.4's per-category opt-in and §15.8's quiet-hours hold,
then send. The notification row still landed in `notifications`
regardless; this only governs the out-of-band reach."""
user = db.conn().execute(
"""
SELECT id, email, display_name, role,
email_personal_direct, email_watched_structural, email_admin_actionable,
email_opt_out_all,
notification_quiet_hours_start, notification_quiet_hours_end,
notification_quiet_hours_timezone
FROM users WHERE id = ?
""",
(recipient_user_id,),
).fetchone()
if user is None or not user["email"]:
return
effective_category = category_for(event_kind, category)
if not _user_wants_email(user, effective_category):
return
if _in_quiet_hours(user):
# §15.8: hold during the window. The row's email_sent_at stays
# null; the digest's exclusion rule 1 won't kick in yet. The
# `flush_pending` pass at window end picks it up.
return
_send_one(user, notif_id, payload, effective_category)
def _send_one(user: Any, notif_id: int, payload: dict, category: str) -> None:
cfg = EmailConfig.from_env()
if not cfg.enabled:
return
subject = _subject(payload)
body = _body(payload, user["id"], category, cfg)
sent = _deliver(cfg, user["email"], subject, body)
if not sent:
return
db.conn().execute(
"UPDATE notifications SET email_sent_at = datetime('now') WHERE id = ?",
(notif_id,),
)
def _subject(payload: dict) -> str:
rfc_title = payload.get("rfc_title") or payload.get("rfc_slug") or ""
summary = payload.get("summary") or payload.get("event_kind") or ""
if rfc_title:
return f"[Wiggleverse] {summary}{rfc_title}".strip()
return f"[Wiggleverse] {summary}".strip()
def _body(payload: dict, user_id: int, category: str, cfg: EmailConfig) -> str:
summary = payload.get("summary") or ""
actor = payload.get("actor_display") or "the app"
rfc_title = payload.get("rfc_title") or payload.get("rfc_slug") or "this RFC"
when = payload.get("created_at") or ""
link = _deep_link(payload, cfg)
unsub = make_unsubscribe_url(user_id, category)
manage = f"{cfg.app_url}/settings/notifications"
return (
f"{summary}\n\n"
f"by {actor} on {rfc_title} · {when}\n\n"
f"{link}\n\n"
f"---\n"
f"Unsubscribe from this category: {unsub}\n"
f"Manage all preferences: {manage}\n"
)
def _deep_link(payload: dict, cfg: EmailConfig) -> str:
slug = payload.get("rfc_slug")
pr = payload.get("pr_number")
branch = payload.get("branch_name")
if slug and pr:
return f"{cfg.app_url}/rfc/{slug}/pr/{pr}"
if slug and branch:
return f"{cfg.app_url}/rfc/{slug}?branch={branch}"
if slug:
return f"{cfg.app_url}/rfc/{slug}"
return cfg.app_url
def _deliver(cfg: EmailConfig, to_address: str, subject: str, body: str) -> bool:
envelope = {
"to": to_address,
"from": formataddr((cfg.from_name, cfg.from_address)),
"subject": subject,
"body": body,
}
_SENT.append(envelope)
if not cfg.smtp_host:
log.info("email (stdout fallback): to=%s subject=%s", to_address, subject)
return True
try:
msg = EmailMessage()
msg["From"] = envelope["from"]
msg["To"] = to_address
msg["Subject"] = subject
msg.set_content(body)
smtp = smtplib.SMTP(cfg.smtp_host, cfg.smtp_port, timeout=30)
try:
if cfg.smtp_starttls:
smtp.starttls()
if cfg.smtp_user:
smtp.login(cfg.smtp_user, cfg.smtp_password)
smtp.send_message(msg)
finally:
smtp.quit()
return True
except Exception:
log.exception("email send failed: to=%s subject=%s", to_address, subject)
return False
# ---------------------------------------------------------------------------
# Quiet-hours release pass — called from the digest job
# ---------------------------------------------------------------------------
def flush_pending() -> int:
"""§15.8 release-from-hold. For each user whose quiet-hours window
has ended (or who has no quiet hours configured), find notifications
whose `email_sent_at IS NULL` and whose category is enabled, and
send them. Bundle into a single "Activity while you were away"
when more than the threshold accumulated."""
cfg = EmailConfig.from_env()
if not cfg.enabled:
return 0
users = db.conn().execute(
"""
SELECT id, email, display_name, role,
email_personal_direct, email_watched_structural, email_admin_actionable,
email_opt_out_all,
notification_quiet_hours_start, notification_quiet_hours_end,
notification_quiet_hours_timezone
FROM users
"""
).fetchall()
sent_count = 0
for user in users:
if not user["email"]:
continue
if _in_quiet_hours(user):
continue
rows = db.conn().execute(
"""
SELECT n.id, n.event_kind, n.rfc_slug, n.branch_name, n.pr_number,
n.created_at, n.payload,
u.display_name AS actor_display,
r.title AS rfc_title
FROM notifications n
LEFT JOIN users u ON u.id = n.actor_user_id
LEFT JOIN cached_rfcs r ON r.slug = n.rfc_slug
WHERE n.recipient_user_id = ?
AND n.email_sent_at IS NULL
AND n.read_at IS NULL
ORDER BY n.id ASC
""",
(user["id"],),
).fetchall()
emailable = []
for r in rows:
try:
extras = json.loads(r["payload"] or "{}")
except json.JSONDecodeError:
extras = {}
cat = category_for(r["event_kind"], extras.get("category", "structural"))
if not _user_wants_email(user, cat):
continue
emailable.append((r, cat, extras))
if not emailable:
continue
if len(emailable) >= cfg.bundle_threshold:
sent_count += _send_bundle(cfg, user, emailable)
else:
for r, cat, extras in emailable:
payload = _row_to_payload(r, extras)
_send_one(user, r["id"], payload, cat)
sent_count += 1
return sent_count
def _row_to_payload(row: Any, extras: dict) -> dict:
return {
"event_kind": row["event_kind"],
"rfc_slug": row["rfc_slug"],
"rfc_title": row["rfc_title"],
"branch_name": row["branch_name"],
"pr_number": row["pr_number"],
"actor_display": row["actor_display"],
"created_at": row["created_at"],
"summary": _summary_for(row["event_kind"], row["actor_display"], row["rfc_title"], extras),
**extras,
}
def _summary_for(event_kind: str, actor_display: str | None, rfc_title: str | None, extras: dict) -> str:
# Local copy to avoid import cycle with notify.py.
from .notify import render_summary
return render_summary(event_kind, actor_display, rfc_title, extras)
def _send_bundle(cfg: EmailConfig, user: Any, emailable: list) -> int:
"""One "Activity while you were away" email per §15.4. Subject names
the count, body lists summaries grouped by RFC."""
count = len(emailable)
subject = f"[Wiggleverse] Activity while you were away — {count} events"
sections: list[str] = []
sorted_rows = sorted(emailable, key=lambda t: t[0]["rfc_slug"] or "")
for slug, group in groupby(sorted_rows, key=lambda t: t[0]["rfc_slug"]):
group_rows = list(group)
title = group_rows[0][0]["rfc_title"] or slug or "(no RFC)"
sections.append(f"\n{title}")
for r, _cat, extras in group_rows:
summary = _summary_for(r["event_kind"], r["actor_display"], r["rfc_title"], extras)
sections.append(f" · {summary}")
body = (
"Activity on RFCs you watch, accumulated during your quiet hours:\n"
+ "\n".join(sections)
+ f"\n\nOpen your inbox: {cfg.app_url}/inbox\n"
+ f"Manage all preferences: {cfg.app_url}/settings/notifications\n"
)
sent = _deliver(cfg, user["email"], subject, body)
if not sent:
return 0
ids = [r["id"] for r, _, _ in emailable]
placeholders = ",".join("?" * len(ids))
db.conn().execute(
f"UPDATE notifications SET email_sent_at = datetime('now') WHERE id IN ({placeholders})",
ids,
)
return count