"""End-to-end integration tests for the Slice 6 vertical (§15 in full). The fan-out chokepoint in `notify.py` is the chief structural commitment of the slice. These tests prove: * §15.1 routing: every action_kind that maps to a §15 signal lands a `notifications` row of the right event_kind and category. * §15.6 auto-watch: every write that names an rfc_slug upserts a watches row for the actor (substantive-gesture rule). * §15.2 inbox listing + filter chips: unread, rfc_slug, category, and actor_user_id filters compose AND-wise. * §15.7 reconciler: advancing branch_chat_seen or pr_seen marks matching unread notifications read. * §15.8 per-user mute: the mute suppresses inbox rows from the muted actor; per-RFC mute suppresses every row for the slug. * §15.8 quiet hours: notifications still land; email is held. * §15.5 digest: cadence window roll-over emits a digest; a second run during the same window emits nothing. * §15.4 email-bounce webhook: sets the global opt-out and short-circuits future email dispatch. * §15.4 unsubscribe signed-URL: GET /api/email/unsubscribe?t=… flips one category off. * §15.3 SSE snapshot: opening the stream yields the current unread_count. """ from __future__ import annotations import json as _json import pytest from test_propose_vertical import ( # noqa: F401 FakeGitea, app_with_fake_gitea, provision_user_row, sign_in_as, tmp_env, ) from test_super_draft_vertical import seed_super_draft # noqa: F401 PITCH = "Open Human Model is a framework for representing humans." # --------------------------------------------------------------------------- # Fan-out: producer-side rules per §15.1 # --------------------------------------------------------------------------- def test_propose_rfc_auto_watches_the_proposer(app_with_fake_gitea): from fastapi.testclient import TestClient from app import db app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") client.post("/api/rfcs/propose", json={ "title": "Open Human Model", "slug": "ohm", "pitch": PITCH, "tags": [], }) # Auto-watch lands per §15.6 substantive gesture. row = db.conn().execute( "SELECT state, set_by FROM watches WHERE user_id = ? AND rfc_slug = ?", (2, "ohm"), ).fetchone() assert row is not None assert row["state"] == "watching" assert row["set_by"] == "auto" def test_proposal_merged_lands_personal_direct_for_proposer(app_with_fake_gitea): from fastapi.testclient import TestClient from app import db app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor", ) provision_user_row(user_id=1, login="ben", role="owner") sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor", email="alice@test") r = client.post("/api/rfcs/propose", json={ "title": "OHM", "slug": "ohm", "pitch": PITCH, "tags": [], }) pr_number = r.json()["pr_number"] sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner", email="ben@test") r = client.post(f"/api/proposals/{pr_number}/merge") assert r.status_code == 200, r.text # Alice (proposer) gets a personal-direct proposal_merged row. rows = db.conn().execute( """ SELECT event_kind, payload FROM notifications WHERE recipient_user_id = ? ORDER BY id """, (2,), ).fetchall() kinds = [r["event_kind"] for r in rows] assert "proposal_merged" in kinds # Category metadata round-trips on the payload. merged_row = next(r for r in rows if r["event_kind"] == "proposal_merged") assert _json.loads(merged_row["payload"])["category"] == "personal-direct" def test_proposal_declined_routes_to_proposer_only(app_with_fake_gitea): from fastapi.testclient import TestClient from app import db app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") provision_user_row(user_id=1, login="ben", role="owner") sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") r = client.post("/api/rfcs/propose", json={ "title": "OHM", "slug": "ohm", "pitch": PITCH, "tags": [], }) pr_number = r.json()["pr_number"] sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner") r = client.post(f"/api/proposals/{pr_number}/decline", json={"comment": "Not aligned with this quarter's focus"}) assert r.status_code == 200, r.text rows = db.conn().execute( "SELECT recipient_user_id, event_kind FROM notifications WHERE event_kind = 'proposal_declined'" ).fetchall() recipients = {r["recipient_user_id"] for r in rows} # Alice (id=2) receives it; Ben (the actor) does not. assert recipients == {2} # --------------------------------------------------------------------------- # Inbox surface # --------------------------------------------------------------------------- def test_inbox_lists_rows_with_filter_chips(app_with_fake_gitea): from fastapi.testclient import TestClient app, fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") provision_user_row(user_id=1, login="ben", role="owner") sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") r = client.post("/api/rfcs/propose", json={"title": "OHM", "slug": "ohm", "pitch": PITCH, "tags": []}) pr_number = r.json()["pr_number"] sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner") client.post(f"/api/proposals/{pr_number}/merge") sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") r = client.get("/api/notifications") assert r.status_code == 200, r.text items = r.json()["items"] assert any(i["event_kind"] == "proposal_merged" for i in items) assert r.json()["unread_count"] >= 1 # Filter by category isolates personal-direct. r = client.get("/api/notifications", params={"category": "personal-direct"}) items = r.json()["items"] assert all(i["category"] == "personal-direct" for i in items) # Filter by rfc_slug narrows further. r = client.get("/api/notifications", params={"rfc_slug": "ohm"}) items = r.json()["items"] assert all(i["rfc_slug"] == "ohm" for i in items) def test_mark_read_per_row_and_by_filter(app_with_fake_gitea): from fastapi.testclient import TestClient from app import db app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") provision_user_row(user_id=1, login="ben", role="owner") sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") r = client.post("/api/rfcs/propose", json={"title": "OHM", "slug": "ohm", "pitch": PITCH, "tags": []}) sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner") client.post(f"/api/proposals/{r.json()['pr_number']}/merge") sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") items = client.get("/api/notifications").json()["items"] notif_id = items[0]["id"] r = client.post(f"/api/notifications/{notif_id}/read") assert r.status_code == 200 row = db.conn().execute("SELECT read_at FROM notifications WHERE id = ?", (notif_id,)).fetchone() assert row["read_at"] is not None # Mark-all-read by filter — should be idempotent. r = client.post("/api/notifications/read", json={"rfc_slug": "ohm"}) assert r.status_code == 200 # --------------------------------------------------------------------------- # §15.7 reconciler # --------------------------------------------------------------------------- def test_chat_seen_advance_marks_chat_notifications_read(app_with_fake_gitea): """Per §15.7: when branch_chat_seen advances, unread chat-kind notifications scoped to the same (slug, branch) flip to read.""" from fastapi.testclient import TestClient from app import db app, fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") provision_user_row(user_id=3, login="bob", role="contributor") seed_super_draft(fake, slug="ohm", title="OHM", pitch=PITCH) # Alice cuts the edit branch (auto-watch). Bob joins the branch # chat — Alice gets a chat_message_in_participated_thread row. sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") branch = client.post("/api/rfcs/ohm/start-edit-branch", json={}).json()["branch_name"] # Alice posts the first message so she's a prior author for Bob's reply. view = client.get(f"/api/rfcs/ohm/branches/{branch}").json() thread_id = view["main_thread_id"] client.post( f"/api/rfcs/ohm/branches/{branch}/threads/{thread_id}/messages", json={"text": "first thought"}, ) sign_in_as(client, user_id=3, gitea_login="bob", display_name="Bob", role="contributor") r = client.post( f"/api/rfcs/ohm/branches/{branch}/threads/{thread_id}/messages", json={"text": "reply from bob"}, ) msg_id = r.json()["message_id"] # Alice has an unread chat_reply_to_my_message row. sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") unread = db.conn().execute( "SELECT id FROM notifications WHERE recipient_user_id = 2 AND read_at IS NULL" ).fetchall() assert len(unread) >= 1 # Alice visits the branch — chat-seen advances → reconciler clears. r = client.post( f"/api/rfcs/ohm/branches/{branch}/chat-seen", json={"last_seen_message_id": msg_id}, ) assert r.status_code == 200 assert r.json()["reconciled"] >= 1 remaining = db.conn().execute( "SELECT id FROM notifications WHERE recipient_user_id = 2 AND branch_name = ? AND read_at IS NULL", (branch,), ).fetchall() assert remaining == [] # --------------------------------------------------------------------------- # §15.8 mutes # --------------------------------------------------------------------------- def test_per_user_mute_suppresses_inbox_rows(app_with_fake_gitea): from fastapi.testclient import TestClient from app import db app, fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") provision_user_row(user_id=3, login="bob", role="contributor") seed_super_draft(fake, slug="ohm", title="OHM", pitch=PITCH) sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") # Alice mutes Bob. r = client.post("/api/users/3/notification-mute") assert r.status_code == 200 # Alice cuts the branch and is a prior author. branch = client.post("/api/rfcs/ohm/start-edit-branch", json={}).json()["branch_name"] view = client.get(f"/api/rfcs/ohm/branches/{branch}").json() thread_id = view["main_thread_id"] client.post( f"/api/rfcs/ohm/branches/{branch}/threads/{thread_id}/messages", json={"text": "alice opener"}, ) # Bob posts. With the mute in place Alice gets no inbox row. sign_in_as(client, user_id=3, gitea_login="bob", display_name="Bob", role="contributor") client.post( f"/api/rfcs/ohm/branches/{branch}/threads/{thread_id}/messages", json={"text": "bob reply"}, ) rows = db.conn().execute( "SELECT id FROM notifications WHERE recipient_user_id = 2 AND actor_user_id = 3" ).fetchall() assert rows == [] def test_per_rfc_mute_suppresses_every_signal(app_with_fake_gitea): """§15.6: state='muted' on the watches row is the strongest leave- me-alone gesture, including for personal-direct events.""" from fastapi.testclient import TestClient from app import db app, fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") provision_user_row(user_id=3, login="bob", role="contributor") # Seed the slug so the watch endpoint can resolve it. seed_super_draft(fake, slug="ohm", title="OHM", pitch=PITCH, proposed_by="alice") # Alice mutes the slug. We bypass the API because the user-facing # surface doesn't expose 'muted' as an auto-set — it's an # explicit gesture. The endpoint accepts it. sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") r = client.post("/api/rfcs/ohm/watch", json={"state": "muted"}) assert r.status_code == 200, r.text # Bob (a different user) cuts an edit branch — would normally # auto-watch and produce a structural beat to other watchers. # The mute suppresses Alice's row. sign_in_as(client, user_id=3, gitea_login="bob", display_name="Bob", role="contributor") # Add Alice as a watcher first via a chat message (auto-watch # already set 'muted' though); ensure no row regardless. client.post("/api/rfcs/ohm/start-edit-branch", json={}) rows = db.conn().execute( "SELECT event_kind FROM notifications WHERE recipient_user_id = 2 AND rfc_slug = 'ohm'" ).fetchall() assert rows == [] def test_admin_cannot_mute_users(app_with_fake_gitea): from fastapi.testclient import TestClient app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=1, login="ben", role="owner") provision_user_row(user_id=3, login="bob", role="contributor") sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner") r = client.post("/api/users/3/notification-mute") assert r.status_code == 403 # --------------------------------------------------------------------------- # §15.8 quiet hours + §15.4 email # --------------------------------------------------------------------------- def test_quiet_hours_holds_email_but_inbox_lands(app_with_fake_gitea, monkeypatch): from fastapi.testclient import TestClient from app import db, email as email_mod app, _fake = app_with_fake_gitea monkeypatch.setenv("APP_URL", "http://localhost:8000") with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") provision_user_row(user_id=1, login="ben", role="owner") # Quiet hours covering every wall-clock moment — 00:00 → 23:59 UTC. db.conn().execute( """ UPDATE users SET notification_quiet_hours_start = '00:00', notification_quiet_hours_end = '23:59', notification_quiet_hours_timezone = 'UTC', email = 'alice@test' WHERE id = 2 """ ) email_mod.reset_sent_envelopes() sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") r = client.post("/api/rfcs/propose", json={"title": "OHM", "slug": "ohm", "pitch": PITCH, "tags": []}) sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner") client.post(f"/api/proposals/{r.json()['pr_number']}/merge") # Inbox row landed. rows = db.conn().execute( "SELECT id, email_sent_at FROM notifications WHERE recipient_user_id = 2 AND event_kind = 'proposal_merged'" ).fetchall() assert len(rows) >= 1 # Email held (email_sent_at is NULL; no envelope in the buffer). assert rows[0]["email_sent_at"] is None sent_to_alice = [e for e in email_mod.sent_envelopes() if e["to"] == "alice@test"] assert sent_to_alice == [] def test_email_bounce_webhook_sets_global_opt_out(app_with_fake_gitea): from fastapi.testclient import TestClient from app import db app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") db.conn().execute("UPDATE users SET email = 'alice@test' WHERE id = 2") r = client.post("/api/webhooks/email-bounce", json={"email": "alice@test", "kind": "hard"}) assert r.status_code == 200 assert r.json()["matched"] is True row = db.conn().execute("SELECT email_opt_out_all FROM users WHERE id = 2").fetchone() assert row["email_opt_out_all"] == 1 def test_email_unsubscribe_signed_url_flips_category_off(app_with_fake_gitea): from fastapi.testclient import TestClient from app import email as email_mod, db app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") token = email_mod.make_unsubscribe_url(2, "personal-direct").split("t=", 1)[1] r = client.get(f"/api/email/unsubscribe?t={token}") assert r.status_code == 200 row = db.conn().execute("SELECT email_personal_direct FROM users WHERE id = 2").fetchone() assert row["email_personal_direct"] == 0 # --------------------------------------------------------------------------- # §15.5 digest # --------------------------------------------------------------------------- def test_digest_emits_then_skips_already_included(app_with_fake_gitea, monkeypatch): """Two consecutive `run_tick` passes: the first emits a digest with the eligible rows; the second runs but emits nothing because the cadence window has not yet rolled over (the just-recorded `notification_digests` row's `period_end` is now).""" from fastapi.testclient import TestClient from app import db, digest as digest_mod, email as email_mod app, fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") provision_user_row(user_id=3, login="bob", role="contributor") db.conn().execute( "UPDATE users SET email = 'alice@test', digest_cadence = 'daily' WHERE id = 2" ) seed_super_draft(fake, slug="ohm", title="OHM", pitch=PITCH) db.conn().execute( """ INSERT INTO watches (user_id, rfc_slug, state, set_by, set_at, last_participation_at) VALUES (2, 'ohm', 'following', 'explicit', datetime('now', '-1 day'), datetime('now', '-1 day')) """ ) # An eligible churn row from an hour ago. db.conn().execute( """ INSERT INTO notifications (recipient_user_id, event_kind, rfc_slug, actor_user_id, payload, created_at) VALUES (2, 'pr_commit_added', 'ohm', 3, ?, datetime('now', '-1 hour')) """, (_json.dumps({"category": "churn"}),), ) # Seed a prior digest emission that's >24h ago so the daily # cadence has rolled over and the first tick fires. db.conn().execute( """ INSERT INTO notification_digests (recipient_user_id, sent_at, period_start, period_end, signal_ids_included) VALUES (2, datetime('now', '-2 days'), datetime('now', '-3 days'), datetime('now', '-2 days'), '[]') """ ) email_mod.reset_sent_envelopes() # First tick: digest emitted. result = digest_mod.run_tick() assert result["digests_sent"] == 1 envelopes = [e for e in email_mod.sent_envelopes() if "digest" in e["subject"].lower()] assert len(envelopes) == 1 # Verify digest_included_at landed for the row that was in the # body — the audit field stays queryable per §15.5. included = db.conn().execute( "SELECT id FROM notifications WHERE recipient_user_id = 2 AND digest_included_at IS NOT NULL" ).fetchall() assert len(included) >= 1 # Second tick fires immediately. The cadence window has not # rolled over (period_end on the new digest row is now), so # nothing is emitted. result2 = digest_mod.run_tick() assert result2["digests_sent"] == 0 # --------------------------------------------------------------------------- # §15.3 SSE snapshot # --------------------------------------------------------------------------- def test_notify_subscriber_receives_broadcast(app_with_fake_gitea): """The per-user SSE subscriber registry in `notify.subscribe` is the substrate behind `/api/notifications/stream`. Driving it directly verifies that an inbox-row insert pushes onto every subscriber's queue, which is what backs the live badge counter and the toast surface per §15.3. The HTTP-level stream test is intentionally out-of-scope here: TestClient buffers chunked responses and so cannot observe an SSE handler that yields once and then waits — the production path uses a real ASGI server with chunk flushing. """ import asyncio as _asyncio from app import db, notify app, _fake = app_with_fake_gitea from fastapi.testclient import TestClient with TestClient(app): provision_user_row(user_id=2, login="alice", role="contributor") async def _drive(): sub = await notify.subscribe(2) # Insert a notification row via the chokepoint. The push # is scheduled on the running loop. db.conn().execute( """ INSERT INTO notifications (recipient_user_id, event_kind, rfc_slug, payload) VALUES (2, 'proposal_merged', 'ohm', '{"category":"personal-direct"}') """ ) nid = db.conn().execute("SELECT last_insert_rowid() AS id").fetchone()["id"] await notify._broadcast(2, "notification", notify._row_payload(nid)) evt = await _asyncio.wait_for(sub.queue.get(), timeout=2.0) await notify.unsubscribe(sub) return evt evt = _asyncio.new_event_loop().run_until_complete(_drive()) assert evt["event"] == "notification" assert evt["payload"]["event_kind"] == "proposal_merged" # --------------------------------------------------------------------------- # Preferences # --------------------------------------------------------------------------- def test_notification_preferences_round_trip(app_with_fake_gitea): from fastapi.testclient import TestClient app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") r = client.get("/api/users/me/notification-preferences") assert r.status_code == 200 assert r.json()["email_personal_direct"] is True assert r.json()["email_watched_churn"] is False r = client.post( "/api/users/me/notification-preferences", json={"email_watched_structural": True, "digest_cadence": "weekly"}, ) assert r.status_code == 200 r = client.get("/api/users/me/notification-preferences") assert r.json()["email_watched_structural"] is True assert r.json()["digest_cadence"] == "weekly" def test_quiet_hours_endpoint_round_trip(app_with_fake_gitea): from fastapi.testclient import TestClient app, _fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") # Setting a partial trio is rejected per §15.8. r = client.post("/api/users/me/quiet-hours", json={"start": "22:00"}) assert r.status_code == 422 # Full trio sets cleanly. r = client.post( "/api/users/me/quiet-hours", json={"start": "22:00", "end": "07:00", "timezone": "America/Los_Angeles"}, ) assert r.status_code == 200, r.text r = client.get("/api/users/me/quiet-hours") assert r.json()["start"] == "22:00" assert r.json()["timezone"] == "America/Los_Angeles" # All-null clears. r = client.post( "/api/users/me/quiet-hours", json={"start": None, "end": None, "timezone": None}, ) assert r.status_code == 200 assert client.get("/api/users/me/quiet-hours").json()["start"] is None # --------------------------------------------------------------------------- # Watches surface # --------------------------------------------------------------------------- def test_explicit_watch_set_overrides_auto(app_with_fake_gitea): from fastapi.testclient import TestClient from app import db app, fake = app_with_fake_gitea with TestClient(app) as client: provision_user_row(user_id=2, login="alice", role="contributor") seed_super_draft(fake, slug="ohm", title="OHM", pitch=PITCH) sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor") r = client.post("/api/rfcs/ohm/watch", json={"state": "following"}) assert r.status_code == 200 row = db.conn().execute( "SELECT state, set_by FROM watches WHERE user_id = 2 AND rfc_slug = 'ohm'" ).fetchone() assert row["state"] == "following" assert row["set_by"] == "explicit" # The auto-watch upsert from a subsequent gesture must not # downgrade the explicit setting. Trigger a substantive # gesture (cut an edit branch). client.post("/api/rfcs/ohm/start-edit-branch", json={}) row = db.conn().execute( "SELECT state, set_by FROM watches WHERE user_id = 2 AND rfc_slug = 'ohm'" ).fetchone() # Following → watching is the *one* auto-upgrade in §15.6, but # only for set_by='auto' rows; explicit rows must stay where # the user put them. assert row["set_by"] == "explicit" assert row["state"] == "following"