f67d0aa0db
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
615 lines
27 KiB
Python
615 lines
27 KiB
Python
"""End-to-end integration tests for the Slice 6 vertical (§15 in full).
|
|
|
|
The fan-out chokepoint in `notify.py` is the chief structural commitment
|
|
of the slice. These tests prove:
|
|
|
|
* §15.1 routing: every action_kind that maps to a §15 signal lands a
|
|
`notifications` row of the right event_kind and category.
|
|
* §15.6 auto-watch: every write that names an rfc_slug upserts a
|
|
watches row for the actor (substantive-gesture rule).
|
|
* §15.2 inbox listing + filter chips: unread, rfc_slug, category, and
|
|
actor_user_id filters compose AND-wise.
|
|
* §15.7 reconciler: advancing branch_chat_seen or pr_seen marks
|
|
matching unread notifications read.
|
|
* §15.8 per-user mute: the mute suppresses inbox rows from the muted
|
|
actor; per-RFC mute suppresses every row for the slug.
|
|
* §15.8 quiet hours: notifications still land; email is held.
|
|
* §15.5 digest: cadence window roll-over emits a digest; a second
|
|
run during the same window emits nothing.
|
|
* §15.4 email-bounce webhook: sets the global opt-out and
|
|
short-circuits future email dispatch.
|
|
* §15.4 unsubscribe signed-URL: GET /api/email/unsubscribe?t=… flips
|
|
one category off.
|
|
* §15.3 SSE snapshot: opening the stream yields the current
|
|
unread_count.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json as _json
|
|
|
|
import pytest
|
|
|
|
from test_propose_vertical import ( # noqa: F401
|
|
FakeGitea,
|
|
app_with_fake_gitea,
|
|
provision_user_row,
|
|
sign_in_as,
|
|
tmp_env,
|
|
)
|
|
from test_super_draft_vertical import seed_super_draft # noqa: F401
|
|
|
|
|
|
PITCH = "Open Human Model is a framework for representing humans."
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fan-out: producer-side rules per §15.1
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_propose_rfc_auto_watches_the_proposer(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
|
|
client.post("/api/rfcs/propose", json={
|
|
"title": "Open Human Model", "slug": "ohm", "pitch": PITCH, "tags": [],
|
|
})
|
|
|
|
# Auto-watch lands per §15.6 substantive gesture.
|
|
row = db.conn().execute(
|
|
"SELECT state, set_by FROM watches WHERE user_id = ? AND rfc_slug = ?",
|
|
(2, "ohm"),
|
|
).fetchone()
|
|
assert row is not None
|
|
assert row["state"] == "watching"
|
|
assert row["set_by"] == "auto"
|
|
|
|
|
|
def test_proposal_merged_lands_personal_direct_for_proposer(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor", )
|
|
provision_user_row(user_id=1, login="ben", role="owner")
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor", email="alice@test")
|
|
r = client.post("/api/rfcs/propose", json={
|
|
"title": "OHM", "slug": "ohm", "pitch": PITCH, "tags": [],
|
|
})
|
|
pr_number = r.json()["pr_number"]
|
|
|
|
sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner", email="ben@test")
|
|
r = client.post(f"/api/proposals/{pr_number}/merge")
|
|
assert r.status_code == 200, r.text
|
|
|
|
# Alice (proposer) gets a personal-direct proposal_merged row.
|
|
rows = db.conn().execute(
|
|
"""
|
|
SELECT event_kind, payload FROM notifications
|
|
WHERE recipient_user_id = ? ORDER BY id
|
|
""",
|
|
(2,),
|
|
).fetchall()
|
|
kinds = [r["event_kind"] for r in rows]
|
|
assert "proposal_merged" in kinds
|
|
# Category metadata round-trips on the payload.
|
|
merged_row = next(r for r in rows if r["event_kind"] == "proposal_merged")
|
|
assert _json.loads(merged_row["payload"])["category"] == "personal-direct"
|
|
|
|
|
|
def test_proposal_declined_routes_to_proposer_only(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
provision_user_row(user_id=1, login="ben", role="owner")
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.post("/api/rfcs/propose", json={
|
|
"title": "OHM", "slug": "ohm", "pitch": PITCH, "tags": [],
|
|
})
|
|
pr_number = r.json()["pr_number"]
|
|
sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner")
|
|
r = client.post(f"/api/proposals/{pr_number}/decline", json={"comment": "Not aligned with this quarter's focus"})
|
|
assert r.status_code == 200, r.text
|
|
|
|
rows = db.conn().execute(
|
|
"SELECT recipient_user_id, event_kind FROM notifications WHERE event_kind = 'proposal_declined'"
|
|
).fetchall()
|
|
recipients = {r["recipient_user_id"] for r in rows}
|
|
# Alice (id=2) receives it; Ben (the actor) does not.
|
|
assert recipients == {2}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Inbox surface
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_inbox_lists_rows_with_filter_chips(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
|
|
app, fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
provision_user_row(user_id=1, login="ben", role="owner")
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.post("/api/rfcs/propose", json={"title": "OHM", "slug": "ohm", "pitch": PITCH, "tags": []})
|
|
pr_number = r.json()["pr_number"]
|
|
sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner")
|
|
client.post(f"/api/proposals/{pr_number}/merge")
|
|
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.get("/api/notifications")
|
|
assert r.status_code == 200, r.text
|
|
items = r.json()["items"]
|
|
assert any(i["event_kind"] == "proposal_merged" for i in items)
|
|
assert r.json()["unread_count"] >= 1
|
|
|
|
# Filter by category isolates personal-direct.
|
|
r = client.get("/api/notifications", params={"category": "personal-direct"})
|
|
items = r.json()["items"]
|
|
assert all(i["category"] == "personal-direct" for i in items)
|
|
|
|
# Filter by rfc_slug narrows further.
|
|
r = client.get("/api/notifications", params={"rfc_slug": "ohm"})
|
|
items = r.json()["items"]
|
|
assert all(i["rfc_slug"] == "ohm" for i in items)
|
|
|
|
|
|
def test_mark_read_per_row_and_by_filter(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
provision_user_row(user_id=1, login="ben", role="owner")
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.post("/api/rfcs/propose", json={"title": "OHM", "slug": "ohm", "pitch": PITCH, "tags": []})
|
|
sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner")
|
|
client.post(f"/api/proposals/{r.json()['pr_number']}/merge")
|
|
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
items = client.get("/api/notifications").json()["items"]
|
|
notif_id = items[0]["id"]
|
|
r = client.post(f"/api/notifications/{notif_id}/read")
|
|
assert r.status_code == 200
|
|
|
|
row = db.conn().execute("SELECT read_at FROM notifications WHERE id = ?", (notif_id,)).fetchone()
|
|
assert row["read_at"] is not None
|
|
|
|
# Mark-all-read by filter — should be idempotent.
|
|
r = client.post("/api/notifications/read", json={"rfc_slug": "ohm"})
|
|
assert r.status_code == 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# §15.7 reconciler
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_chat_seen_advance_marks_chat_notifications_read(app_with_fake_gitea):
|
|
"""Per §15.7: when branch_chat_seen advances, unread chat-kind
|
|
notifications scoped to the same (slug, branch) flip to read."""
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
|
|
app, fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
provision_user_row(user_id=3, login="bob", role="contributor")
|
|
seed_super_draft(fake, slug="ohm", title="OHM", pitch=PITCH)
|
|
|
|
# Alice cuts the edit branch (auto-watch). Bob joins the branch
|
|
# chat — Alice gets a chat_message_in_participated_thread row.
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
branch = client.post("/api/rfcs/ohm/start-edit-branch", json={}).json()["branch_name"]
|
|
# Alice posts the first message so she's a prior author for Bob's reply.
|
|
view = client.get(f"/api/rfcs/ohm/branches/{branch}").json()
|
|
thread_id = view["main_thread_id"]
|
|
client.post(
|
|
f"/api/rfcs/ohm/branches/{branch}/threads/{thread_id}/messages",
|
|
json={"text": "first thought"},
|
|
)
|
|
sign_in_as(client, user_id=3, gitea_login="bob", display_name="Bob", role="contributor")
|
|
r = client.post(
|
|
f"/api/rfcs/ohm/branches/{branch}/threads/{thread_id}/messages",
|
|
json={"text": "reply from bob"},
|
|
)
|
|
msg_id = r.json()["message_id"]
|
|
|
|
# Alice has an unread chat_reply_to_my_message row.
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
unread = db.conn().execute(
|
|
"SELECT id FROM notifications WHERE recipient_user_id = 2 AND read_at IS NULL"
|
|
).fetchall()
|
|
assert len(unread) >= 1
|
|
|
|
# Alice visits the branch — chat-seen advances → reconciler clears.
|
|
r = client.post(
|
|
f"/api/rfcs/ohm/branches/{branch}/chat-seen",
|
|
json={"last_seen_message_id": msg_id},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.json()["reconciled"] >= 1
|
|
remaining = db.conn().execute(
|
|
"SELECT id FROM notifications WHERE recipient_user_id = 2 AND branch_name = ? AND read_at IS NULL",
|
|
(branch,),
|
|
).fetchall()
|
|
assert remaining == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# §15.8 mutes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_per_user_mute_suppresses_inbox_rows(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
|
|
app, fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
provision_user_row(user_id=3, login="bob", role="contributor")
|
|
seed_super_draft(fake, slug="ohm", title="OHM", pitch=PITCH)
|
|
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
# Alice mutes Bob.
|
|
r = client.post("/api/users/3/notification-mute")
|
|
assert r.status_code == 200
|
|
|
|
# Alice cuts the branch and is a prior author.
|
|
branch = client.post("/api/rfcs/ohm/start-edit-branch", json={}).json()["branch_name"]
|
|
view = client.get(f"/api/rfcs/ohm/branches/{branch}").json()
|
|
thread_id = view["main_thread_id"]
|
|
client.post(
|
|
f"/api/rfcs/ohm/branches/{branch}/threads/{thread_id}/messages",
|
|
json={"text": "alice opener"},
|
|
)
|
|
|
|
# Bob posts. With the mute in place Alice gets no inbox row.
|
|
sign_in_as(client, user_id=3, gitea_login="bob", display_name="Bob", role="contributor")
|
|
client.post(
|
|
f"/api/rfcs/ohm/branches/{branch}/threads/{thread_id}/messages",
|
|
json={"text": "bob reply"},
|
|
)
|
|
rows = db.conn().execute(
|
|
"SELECT id FROM notifications WHERE recipient_user_id = 2 AND actor_user_id = 3"
|
|
).fetchall()
|
|
assert rows == []
|
|
|
|
|
|
def test_per_rfc_mute_suppresses_every_signal(app_with_fake_gitea):
|
|
"""§15.6: state='muted' on the watches row is the strongest leave-
|
|
me-alone gesture, including for personal-direct events."""
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
|
|
app, fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
provision_user_row(user_id=3, login="bob", role="contributor")
|
|
# Seed the slug so the watch endpoint can resolve it.
|
|
seed_super_draft(fake, slug="ohm", title="OHM", pitch=PITCH, proposed_by="alice")
|
|
|
|
# Alice mutes the slug. We bypass the API because the user-facing
|
|
# surface doesn't expose 'muted' as an auto-set — it's an
|
|
# explicit gesture. The endpoint accepts it.
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.post("/api/rfcs/ohm/watch", json={"state": "muted"})
|
|
assert r.status_code == 200, r.text
|
|
|
|
# Bob (a different user) cuts an edit branch — would normally
|
|
# auto-watch and produce a structural beat to other watchers.
|
|
# The mute suppresses Alice's row.
|
|
sign_in_as(client, user_id=3, gitea_login="bob", display_name="Bob", role="contributor")
|
|
# Add Alice as a watcher first via a chat message (auto-watch
|
|
# already set 'muted' though); ensure no row regardless.
|
|
client.post("/api/rfcs/ohm/start-edit-branch", json={})
|
|
rows = db.conn().execute(
|
|
"SELECT event_kind FROM notifications WHERE recipient_user_id = 2 AND rfc_slug = 'ohm'"
|
|
).fetchall()
|
|
assert rows == []
|
|
|
|
|
|
def test_admin_cannot_mute_users(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=1, login="ben", role="owner")
|
|
provision_user_row(user_id=3, login="bob", role="contributor")
|
|
sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner")
|
|
r = client.post("/api/users/3/notification-mute")
|
|
assert r.status_code == 403
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# §15.8 quiet hours + §15.4 email
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_quiet_hours_holds_email_but_inbox_lands(app_with_fake_gitea, monkeypatch):
|
|
from fastapi.testclient import TestClient
|
|
from app import db, email as email_mod
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
monkeypatch.setenv("APP_URL", "http://localhost:8000")
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
provision_user_row(user_id=1, login="ben", role="owner")
|
|
# Quiet hours covering every wall-clock moment — 00:00 → 23:59 UTC.
|
|
db.conn().execute(
|
|
"""
|
|
UPDATE users
|
|
SET notification_quiet_hours_start = '00:00',
|
|
notification_quiet_hours_end = '23:59',
|
|
notification_quiet_hours_timezone = 'UTC',
|
|
email = 'alice@test'
|
|
WHERE id = 2
|
|
"""
|
|
)
|
|
email_mod.reset_sent_envelopes()
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.post("/api/rfcs/propose", json={"title": "OHM", "slug": "ohm", "pitch": PITCH, "tags": []})
|
|
sign_in_as(client, user_id=1, gitea_login="ben", display_name="Ben", role="owner")
|
|
client.post(f"/api/proposals/{r.json()['pr_number']}/merge")
|
|
|
|
# Inbox row landed.
|
|
rows = db.conn().execute(
|
|
"SELECT id, email_sent_at FROM notifications WHERE recipient_user_id = 2 AND event_kind = 'proposal_merged'"
|
|
).fetchall()
|
|
assert len(rows) >= 1
|
|
# Email held (email_sent_at is NULL; no envelope in the buffer).
|
|
assert rows[0]["email_sent_at"] is None
|
|
sent_to_alice = [e for e in email_mod.sent_envelopes() if e["to"] == "alice@test"]
|
|
assert sent_to_alice == []
|
|
|
|
|
|
def test_email_bounce_webhook_sets_global_opt_out(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
db.conn().execute("UPDATE users SET email = 'alice@test' WHERE id = 2")
|
|
r = client.post("/api/webhooks/email-bounce", json={"email": "alice@test", "kind": "hard"})
|
|
assert r.status_code == 200
|
|
assert r.json()["matched"] is True
|
|
row = db.conn().execute("SELECT email_opt_out_all FROM users WHERE id = 2").fetchone()
|
|
assert row["email_opt_out_all"] == 1
|
|
|
|
|
|
def test_email_unsubscribe_signed_url_flips_category_off(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
from app import email as email_mod, db
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
token = email_mod.make_unsubscribe_url(2, "personal-direct").split("t=", 1)[1]
|
|
r = client.get(f"/api/email/unsubscribe?t={token}")
|
|
assert r.status_code == 200
|
|
row = db.conn().execute("SELECT email_personal_direct FROM users WHERE id = 2").fetchone()
|
|
assert row["email_personal_direct"] == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# §15.5 digest
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_digest_emits_then_skips_already_included(app_with_fake_gitea, monkeypatch):
|
|
"""Two consecutive `run_tick` passes: the first emits a digest with
|
|
the eligible rows; the second runs but emits nothing because the
|
|
cadence window has not yet rolled over (the just-recorded
|
|
`notification_digests` row's `period_end` is now)."""
|
|
from fastapi.testclient import TestClient
|
|
from app import db, digest as digest_mod, email as email_mod
|
|
|
|
app, fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
provision_user_row(user_id=3, login="bob", role="contributor")
|
|
db.conn().execute(
|
|
"UPDATE users SET email = 'alice@test', digest_cadence = 'daily' WHERE id = 2"
|
|
)
|
|
seed_super_draft(fake, slug="ohm", title="OHM", pitch=PITCH)
|
|
db.conn().execute(
|
|
"""
|
|
INSERT INTO watches (user_id, rfc_slug, state, set_by, set_at, last_participation_at)
|
|
VALUES (2, 'ohm', 'following', 'explicit', datetime('now', '-1 day'), datetime('now', '-1 day'))
|
|
"""
|
|
)
|
|
# An eligible churn row from an hour ago.
|
|
db.conn().execute(
|
|
"""
|
|
INSERT INTO notifications
|
|
(recipient_user_id, event_kind, rfc_slug, actor_user_id, payload, created_at)
|
|
VALUES (2, 'pr_commit_added', 'ohm', 3, ?, datetime('now', '-1 hour'))
|
|
""",
|
|
(_json.dumps({"category": "churn"}),),
|
|
)
|
|
# Seed a prior digest emission that's >24h ago so the daily
|
|
# cadence has rolled over and the first tick fires.
|
|
db.conn().execute(
|
|
"""
|
|
INSERT INTO notification_digests (recipient_user_id, sent_at, period_start, period_end, signal_ids_included)
|
|
VALUES (2, datetime('now', '-2 days'), datetime('now', '-3 days'), datetime('now', '-2 days'), '[]')
|
|
"""
|
|
)
|
|
email_mod.reset_sent_envelopes()
|
|
|
|
# First tick: digest emitted.
|
|
result = digest_mod.run_tick()
|
|
assert result["digests_sent"] == 1
|
|
envelopes = [e for e in email_mod.sent_envelopes() if "digest" in e["subject"].lower()]
|
|
assert len(envelopes) == 1
|
|
|
|
# Verify digest_included_at landed for the row that was in the
|
|
# body — the audit field stays queryable per §15.5.
|
|
included = db.conn().execute(
|
|
"SELECT id FROM notifications WHERE recipient_user_id = 2 AND digest_included_at IS NOT NULL"
|
|
).fetchall()
|
|
assert len(included) >= 1
|
|
|
|
# Second tick fires immediately. The cadence window has not
|
|
# rolled over (period_end on the new digest row is now), so
|
|
# nothing is emitted.
|
|
result2 = digest_mod.run_tick()
|
|
assert result2["digests_sent"] == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# §15.3 SSE snapshot
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_notify_subscriber_receives_broadcast(app_with_fake_gitea):
|
|
"""The per-user SSE subscriber registry in `notify.subscribe` is
|
|
the substrate behind `/api/notifications/stream`. Driving it
|
|
directly verifies that an inbox-row insert pushes onto every
|
|
subscriber's queue, which is what backs the live badge counter
|
|
and the toast surface per §15.3.
|
|
|
|
The HTTP-level stream test is intentionally out-of-scope here:
|
|
TestClient buffers chunked responses and so cannot observe an
|
|
SSE handler that yields once and then waits — the production
|
|
path uses a real ASGI server with chunk flushing.
|
|
"""
|
|
import asyncio as _asyncio
|
|
from app import db, notify
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
from fastapi.testclient import TestClient
|
|
with TestClient(app):
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
|
|
async def _drive():
|
|
sub = await notify.subscribe(2)
|
|
# Insert a notification row via the chokepoint. The push
|
|
# is scheduled on the running loop.
|
|
db.conn().execute(
|
|
"""
|
|
INSERT INTO notifications (recipient_user_id, event_kind, rfc_slug, payload)
|
|
VALUES (2, 'proposal_merged', 'ohm', '{"category":"personal-direct"}')
|
|
"""
|
|
)
|
|
nid = db.conn().execute("SELECT last_insert_rowid() AS id").fetchone()["id"]
|
|
await notify._broadcast(2, "notification", notify._row_payload(nid))
|
|
evt = await _asyncio.wait_for(sub.queue.get(), timeout=2.0)
|
|
await notify.unsubscribe(sub)
|
|
return evt
|
|
|
|
evt = _asyncio.new_event_loop().run_until_complete(_drive())
|
|
assert evt["event"] == "notification"
|
|
assert evt["payload"]["event_kind"] == "proposal_merged"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Preferences
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_notification_preferences_round_trip(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
|
|
r = client.get("/api/users/me/notification-preferences")
|
|
assert r.status_code == 200
|
|
assert r.json()["email_personal_direct"] is True
|
|
assert r.json()["email_watched_churn"] is False
|
|
|
|
r = client.post(
|
|
"/api/users/me/notification-preferences",
|
|
json={"email_watched_structural": True, "digest_cadence": "weekly"},
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
r = client.get("/api/users/me/notification-preferences")
|
|
assert r.json()["email_watched_structural"] is True
|
|
assert r.json()["digest_cadence"] == "weekly"
|
|
|
|
|
|
def test_quiet_hours_endpoint_round_trip(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
|
|
app, _fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
|
|
# Setting a partial trio is rejected per §15.8.
|
|
r = client.post("/api/users/me/quiet-hours", json={"start": "22:00"})
|
|
assert r.status_code == 422
|
|
|
|
# Full trio sets cleanly.
|
|
r = client.post(
|
|
"/api/users/me/quiet-hours",
|
|
json={"start": "22:00", "end": "07:00", "timezone": "America/Los_Angeles"},
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
|
|
r = client.get("/api/users/me/quiet-hours")
|
|
assert r.json()["start"] == "22:00"
|
|
assert r.json()["timezone"] == "America/Los_Angeles"
|
|
|
|
# All-null clears.
|
|
r = client.post(
|
|
"/api/users/me/quiet-hours",
|
|
json={"start": None, "end": None, "timezone": None},
|
|
)
|
|
assert r.status_code == 200
|
|
assert client.get("/api/users/me/quiet-hours").json()["start"] is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Watches surface
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_explicit_watch_set_overrides_auto(app_with_fake_gitea):
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
|
|
app, fake = app_with_fake_gitea
|
|
with TestClient(app) as client:
|
|
provision_user_row(user_id=2, login="alice", role="contributor")
|
|
seed_super_draft(fake, slug="ohm", title="OHM", pitch=PITCH)
|
|
sign_in_as(client, user_id=2, gitea_login="alice", display_name="Alice", role="contributor")
|
|
r = client.post("/api/rfcs/ohm/watch", json={"state": "following"})
|
|
assert r.status_code == 200
|
|
|
|
row = db.conn().execute(
|
|
"SELECT state, set_by FROM watches WHERE user_id = 2 AND rfc_slug = 'ohm'"
|
|
).fetchone()
|
|
assert row["state"] == "following"
|
|
assert row["set_by"] == "explicit"
|
|
|
|
# The auto-watch upsert from a subsequent gesture must not
|
|
# downgrade the explicit setting. Trigger a substantive
|
|
# gesture (cut an edit branch).
|
|
client.post("/api/rfcs/ohm/start-edit-branch", json={})
|
|
row = db.conn().execute(
|
|
"SELECT state, set_by FROM watches WHERE user_id = 2 AND rfc_slug = 'ohm'"
|
|
).fetchone()
|
|
# Following → watching is the *one* auto-upgrade in §15.6, but
|
|
# only for set_by='auto' rows; explicit rows must stay where
|
|
# the user put them.
|
|
assert row["set_by"] == "explicit"
|
|
assert row["state"] == "following"
|