"""§15.4: the email loop. The transactional-email adapter is a thin wrapper over SMTP. When SMTP credentials aren't configured the adapter falls back to logging the envelope to stdout — sufficient for dev and for the integration tests, which assert on the log buffer rather than spinning up a mail server. Per §15.4: opt-in per category (with category-specific defaults), one- click unsubscribe per category via signed URL, single non-spoofing From identity, body mirrors the inbox row verbatim. Bounces and complaints route to a global opt-out per the same section. Quiet hours per §15.8 hold the send (the notification row still lands; the email defers) until the window ends. The release-from-hold pass is the same `flush_pending` the digest job calls; it bundles into a single "Activity while you were away" email when more than a threshold accumulated, otherwise sending individually. """ from __future__ import annotations import json import logging import os import smtplib from dataclasses import dataclass from datetime import datetime, time, timezone from email.message import EmailMessage from email.utils import formataddr from itertools import groupby from typing import Any from urllib.parse import urlencode from itsdangerous import BadSignature, URLSafeSerializer from . import db log = logging.getLogger(__name__) # Buffer of outbound envelopes for test inspection. The integration tests # read from this rather than spinning up a real SMTP server. Production # leaves it as an unbounded list (it's never read), which is fine at # v1 volumes; if it ever becomes a memory issue, we cap it then. _SENT: list[dict] = [] def sent_envelopes() -> list[dict]: return list(_SENT) def reset_sent_envelopes() -> None: _SENT.clear() # --------------------------------------------------------------------------- # Configuration — read from env at call time so tests can monkeypatch # --------------------------------------------------------------------------- @dataclass(frozen=True) class EmailConfig: smtp_host: str smtp_port: int smtp_user: str smtp_password: str smtp_starttls: bool from_address: str from_name: str app_url: str bundle_threshold: int enabled: bool @classmethod def from_env(cls) -> "EmailConfig": host = os.environ.get("SMTP_HOST", "").strip() return cls( smtp_host=host, smtp_port=int(os.environ.get("SMTP_PORT", "587")), smtp_user=os.environ.get("SMTP_USER", "").strip(), smtp_password=os.environ.get("SMTP_PASSWORD", ""), smtp_starttls=os.environ.get("SMTP_STARTTLS", "1") not in ("0", "false", "False", ""), from_address=os.environ.get("EMAIL_FROM", "notifications@wiggleverse.local").strip(), from_name=os.environ.get("EMAIL_FROM_NAME", "Wiggleverse").strip(), app_url=os.environ.get("APP_URL", "http://localhost:8000").rstrip("/"), bundle_threshold=int(os.environ.get("EMAIL_BUNDLE_THRESHOLD", "5")), enabled=os.environ.get("EMAIL_ENABLED", "1") not in ("0", "false", "False"), ) # Signed-URL token for §15.4 one-click unsubscribe. The signer is # scoped to (user_id, category) so the URL is idempotent and revocable # by rotating SECRET_KEY. def _signer() -> URLSafeSerializer: secret = os.environ.get("SECRET_KEY", "") if not secret: raise RuntimeError("SECRET_KEY required for email signing") return URLSafeSerializer(secret, salt="email-unsubscribe") def make_unsubscribe_url(user_id: int, category: str) -> str: cfg = EmailConfig.from_env() token = _signer().dumps({"u": user_id, "c": category}) qs = urlencode({"t": token}) return f"{cfg.app_url}/api/email/unsubscribe?{qs}" def verify_unsubscribe_token(token: str) -> tuple[int, str]: """Returns (user_id, category) or raises BadSignature.""" data = _signer().loads(token) return int(data["u"]), str(data["c"]) # --------------------------------------------------------------------------- # Category routing # --------------------------------------------------------------------------- # Map §15.1 event_kinds to one of the four §15.4 categories. The 'churn' # category never emails per §15.4 — naming the refusal in settings is # more honest than silently omitting the toggle. _EVENT_TO_CATEGORY: dict[str, str] = { "proposal_merged": "personal-direct", "proposal_declined": "personal-direct", "proposal_opened_on_watched_topic": "structural", "pr_opened": "structural", "pr_merged": "structural", "pr_withdrawn": "structural", "pr_commit_added": "churn", "pr_review_thread_new": "structural", "pr_review_thread_reply": "personal-direct", "pr_conflict_with_main": "structural", "chat_message_in_participated_thread": "churn", "chat_reply_to_my_message": "personal-direct", "change_proposed_on_edited_passage": "personal-direct", "flag_dropped_on_watched_rfc": "structural", "flag_resolved_on_my_flag": "personal-direct", "contribute_grant_added": "personal-direct", "contribute_grant_revoked": "personal-direct", "graduation_complete": "personal-direct", "super_draft_graduation_ready": "admin-actionable", "claim_opened": "structural", } def category_for(event_kind: str, fallback_category: str) -> str: return _EVENT_TO_CATEGORY.get(event_kind, fallback_category) def _user_wants_email(user_row: Any, category: str) -> bool: """Apply the §15.4 per-category toggle. Churn always returns False (the toggle is permanently off per the spec). Global opt-out (`email_opt_out_all`) suppresses every category.""" if user_row["email_opt_out_all"]: return False if category == "churn": return False if category == "personal-direct": return bool(user_row["email_personal_direct"]) if category == "structural": return bool(user_row["email_watched_structural"]) if category == "admin-actionable": if user_row["role"] not in ("owner", "admin"): return False return bool(user_row["email_admin_actionable"]) return False # --------------------------------------------------------------------------- # Quiet hours (§15.8) # --------------------------------------------------------------------------- def _in_quiet_hours(user_row: Any) -> bool: start = user_row["notification_quiet_hours_start"] end = user_row["notification_quiet_hours_end"] tz_name = user_row["notification_quiet_hours_timezone"] if not (start and end and tz_name): return False try: from zoneinfo import ZoneInfo tz = ZoneInfo(tz_name) except Exception: return False now_local = datetime.now(tz).time() start_t = _parse_hhmm(start) end_t = _parse_hhmm(end) if start_t is None or end_t is None: return False if start_t <= end_t: return start_t <= now_local < end_t # Wraps midnight (e.g. 22:00 → 07:00). return now_local >= start_t or now_local < end_t def _parse_hhmm(text: str) -> time | None: try: hh, mm = text.split(":", 1) return time(int(hh), int(mm)) except (ValueError, AttributeError): return None # --------------------------------------------------------------------------- # The dispatch entry point — called from notify._schedule_email # --------------------------------------------------------------------------- def maybe_send( *, notif_id: int, recipient_user_id: int, event_kind: str, category: str, payload: dict, ) -> None: """Apply §15.4's per-category opt-in and §15.8's quiet-hours hold, then send. The notification row still landed in `notifications` regardless; this only governs the out-of-band reach.""" user = db.conn().execute( """ SELECT id, email, display_name, role, email_personal_direct, email_watched_structural, email_admin_actionable, email_opt_out_all, notification_quiet_hours_start, notification_quiet_hours_end, notification_quiet_hours_timezone FROM users WHERE id = ? """, (recipient_user_id,), ).fetchone() if user is None or not user["email"]: return effective_category = category_for(event_kind, category) if not _user_wants_email(user, effective_category): return if _in_quiet_hours(user): # §15.8: hold during the window. The row's email_sent_at stays # null; the digest's exclusion rule 1 won't kick in yet. The # `flush_pending` pass at window end picks it up. return _send_one(user, notif_id, payload, effective_category) def _send_one(user: Any, notif_id: int, payload: dict, category: str) -> None: cfg = EmailConfig.from_env() if not cfg.enabled: return subject = _subject(payload) body = _body(payload, user["id"], category, cfg) sent = _deliver(cfg, user["email"], subject, body) if not sent: return db.conn().execute( "UPDATE notifications SET email_sent_at = datetime('now') WHERE id = ?", (notif_id,), ) def _subject(payload: dict) -> str: rfc_title = payload.get("rfc_title") or payload.get("rfc_slug") or "" summary = payload.get("summary") or payload.get("event_kind") or "" if rfc_title: return f"[Wiggleverse] {summary} — {rfc_title}".strip() return f"[Wiggleverse] {summary}".strip() def _body(payload: dict, user_id: int, category: str, cfg: EmailConfig) -> str: summary = payload.get("summary") or "" actor = payload.get("actor_display") or "the app" rfc_title = payload.get("rfc_title") or payload.get("rfc_slug") or "this RFC" when = payload.get("created_at") or "" link = _deep_link(payload, cfg) unsub = make_unsubscribe_url(user_id, category) manage = f"{cfg.app_url}/settings/notifications" return ( f"{summary}\n\n" f"by {actor} on {rfc_title} · {when}\n\n" f"{link}\n\n" f"---\n" f"Unsubscribe from this category: {unsub}\n" f"Manage all preferences: {manage}\n" ) def _deep_link(payload: dict, cfg: EmailConfig) -> str: slug = payload.get("rfc_slug") pr = payload.get("pr_number") branch = payload.get("branch_name") if slug and pr: return f"{cfg.app_url}/rfc/{slug}/pr/{pr}" if slug and branch: return f"{cfg.app_url}/rfc/{slug}?branch={branch}" if slug: return f"{cfg.app_url}/rfc/{slug}" return cfg.app_url def _deliver(cfg: EmailConfig, to_address: str, subject: str, body: str) -> bool: envelope = { "to": to_address, "from": formataddr((cfg.from_name, cfg.from_address)), "subject": subject, "body": body, } _SENT.append(envelope) if not cfg.smtp_host: log.info("email (stdout fallback): to=%s subject=%s", to_address, subject) return True try: msg = EmailMessage() msg["From"] = envelope["from"] msg["To"] = to_address msg["Subject"] = subject msg.set_content(body) smtp = smtplib.SMTP(cfg.smtp_host, cfg.smtp_port, timeout=30) try: if cfg.smtp_starttls: smtp.starttls() if cfg.smtp_user: smtp.login(cfg.smtp_user, cfg.smtp_password) smtp.send_message(msg) finally: smtp.quit() return True except Exception: log.exception("email send failed: to=%s subject=%s", to_address, subject) return False # --------------------------------------------------------------------------- # Quiet-hours release pass — called from the digest job # --------------------------------------------------------------------------- def flush_pending() -> int: """§15.8 release-from-hold. For each user whose quiet-hours window has ended (or who has no quiet hours configured), find notifications whose `email_sent_at IS NULL` and whose category is enabled, and send them. Bundle into a single "Activity while you were away" when more than the threshold accumulated.""" cfg = EmailConfig.from_env() if not cfg.enabled: return 0 users = db.conn().execute( """ SELECT id, email, display_name, role, email_personal_direct, email_watched_structural, email_admin_actionable, email_opt_out_all, notification_quiet_hours_start, notification_quiet_hours_end, notification_quiet_hours_timezone FROM users """ ).fetchall() sent_count = 0 for user in users: if not user["email"]: continue if _in_quiet_hours(user): continue rows = db.conn().execute( """ SELECT n.id, n.event_kind, n.rfc_slug, n.branch_name, n.pr_number, n.created_at, n.payload, u.display_name AS actor_display, r.title AS rfc_title FROM notifications n LEFT JOIN users u ON u.id = n.actor_user_id LEFT JOIN cached_rfcs r ON r.slug = n.rfc_slug WHERE n.recipient_user_id = ? AND n.email_sent_at IS NULL AND n.read_at IS NULL ORDER BY n.id ASC """, (user["id"],), ).fetchall() emailable = [] for r in rows: try: extras = json.loads(r["payload"] or "{}") except json.JSONDecodeError: extras = {} cat = category_for(r["event_kind"], extras.get("category", "structural")) if not _user_wants_email(user, cat): continue emailable.append((r, cat, extras)) if not emailable: continue if len(emailable) >= cfg.bundle_threshold: sent_count += _send_bundle(cfg, user, emailable) else: for r, cat, extras in emailable: payload = _row_to_payload(r, extras) _send_one(user, r["id"], payload, cat) sent_count += 1 return sent_count def _row_to_payload(row: Any, extras: dict) -> dict: return { "event_kind": row["event_kind"], "rfc_slug": row["rfc_slug"], "rfc_title": row["rfc_title"], "branch_name": row["branch_name"], "pr_number": row["pr_number"], "actor_display": row["actor_display"], "created_at": row["created_at"], "summary": _summary_for(row["event_kind"], row["actor_display"], row["rfc_title"], extras), **extras, } def _summary_for(event_kind: str, actor_display: str | None, rfc_title: str | None, extras: dict) -> str: # Local copy to avoid import cycle with notify.py. from .notify import render_summary return render_summary(event_kind, actor_display, rfc_title, extras) def _send_bundle(cfg: EmailConfig, user: Any, emailable: list) -> int: """One "Activity while you were away" email per §15.4. Subject names the count, body lists summaries grouped by RFC.""" count = len(emailable) subject = f"[Wiggleverse] Activity while you were away — {count} events" sections: list[str] = [] sorted_rows = sorted(emailable, key=lambda t: t[0]["rfc_slug"] or "") for slug, group in groupby(sorted_rows, key=lambda t: t[0]["rfc_slug"]): group_rows = list(group) title = group_rows[0][0]["rfc_title"] or slug or "(no RFC)" sections.append(f"\n{title}") for r, _cat, extras in group_rows: summary = _summary_for(r["event_kind"], r["actor_display"], r["rfc_title"], extras) sections.append(f" · {summary}") body = ( "Activity on RFCs you watch, accumulated during your quiet hours:\n" + "\n".join(sections) + f"\n\nOpen your inbox: {cfg.app_url}/inbox\n" + f"Manage all preferences: {cfg.app_url}/settings/notifications\n" ) sent = _deliver(cfg, user["email"], subject, body) if not sent: return 0 ids = [r["id"] for r, _, _ in emailable] placeholders = ",".join("?" * len(ids)) db.conn().execute( f"UPDATE notifications SET email_sent_at = datetime('now') WHERE id IN ({placeholders})", ids, ) return count