"""Gitea OAuth and user provisioning. OAuth identity is the basis for the app's user account per §18; the §6 authorization layer is built on top by reading from the users table. On first sign-in we insert a row with role='contributor' (or 'owner' if the gitea_login matches the configured OWNER_GITEA_LOGIN — bootstrapping for owner zero per §6.1). On subsequent sign-ins we refresh the display name and avatar from Gitea so a rename in Gitea propagates here. """ from __future__ import annotations import secrets from dataclasses import dataclass from typing import Any import httpx from fastapi import HTTPException, Request from . import db from .bot import Actor from .config import Config @dataclass class SessionUser: user_id: int gitea_id: int gitea_login: str display_name: str email: str avatar_url: str role: str def as_actor(self) -> Actor: return Actor( user_id=self.user_id, gitea_login=self.gitea_login, display_name=self.display_name, email=self.email, ) def authorization_url(config: Config, state: str) -> str: return ( f"{config.gitea_url}/login/oauth/authorize" f"?client_id={config.oauth_client_id}" f"&redirect_uri={config.redirect_uri}" f"&response_type=code" f"&state={state}" ) async def exchange_code(config: Config, code: str) -> dict[str, Any]: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{config.gitea_url}/login/oauth/access_token", json={ "client_id": config.oauth_client_id, "client_secret": config.oauth_client_secret, "code": code, "grant_type": "authorization_code", "redirect_uri": config.redirect_uri, }, headers={"Accept": "application/json"}, ) resp.raise_for_status() return resp.json() async def fetch_user_profile(config: Config, access_token: str) -> dict[str, Any]: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.get( f"{config.gitea_url}/api/v1/user", headers={"Authorization": f"token {access_token}"}, ) resp.raise_for_status() return resp.json() def provision_user(config: Config, profile: dict[str, Any]) -> SessionUser: """Insert or update the users row for this Gitea profile. Owner zero (§6.1) is whoever's gitea_login matches OWNER_GITEA_LOGIN. The owner role is granted on first sign-in and never revoked from a later config change — once owner, always owner until an explicit role transition (which lives in §6.1 and isn't part of slice 1). """ gitea_id = profile["id"] login = profile["login"] display = profile.get("full_name") or login email = profile.get("email") or "" avatar = profile.get("avatar_url") or "" c = db.conn() existing = c.execute("SELECT * FROM users WHERE gitea_id = ?", (gitea_id,)).fetchone() if existing is None: role = "owner" if config.owner_gitea_login and login == config.owner_gitea_login else "contributor" cur = c.execute( """ INSERT INTO users (gitea_id, gitea_login, email, display_name, avatar_url, role) VALUES (?, ?, ?, ?, ?, ?) """, (gitea_id, login, email, display, avatar, role), ) user_id = cur.lastrowid else: user_id = existing["id"] role = existing["role"] c.execute( """ UPDATE users SET gitea_login = ?, email = ?, display_name = ?, avatar_url = ?, last_seen_at = datetime('now') WHERE id = ? """, (login, email, display, avatar, user_id), ) return SessionUser( user_id=user_id, gitea_id=gitea_id, gitea_login=login, display_name=display, email=email, avatar_url=avatar, role=role, ) # ----- Session helpers ----- SESSION_USER_KEY = "user" SESSION_STATE_KEY = "oauth_state" def store_session(request: Request, user: SessionUser) -> None: request.session[SESSION_USER_KEY] = { "user_id": user.user_id, "gitea_id": user.gitea_id, "gitea_login": user.gitea_login, "display_name": user.display_name, "email": user.email, "avatar_url": user.avatar_url, "role": user.role, } def current_user(request: Request) -> SessionUser | None: raw = request.session.get(SESSION_USER_KEY) if not raw: return None # Re-read the role from the database every request so role changes # take effect on the next API call without forcing a logout. row = db.conn().execute( "SELECT id, gitea_id, gitea_login, email, display_name, avatar_url, role FROM users WHERE id = ?", (raw["user_id"],), ).fetchone() if row is None: return None return SessionUser( user_id=row["id"], gitea_id=row["gitea_id"], gitea_login=row["gitea_login"], display_name=row["display_name"], email=row["email"] or "", avatar_url=row["avatar_url"] or "", role=row["role"], ) def require_user(request: Request) -> SessionUser: user = current_user(request) if user is None: raise HTTPException(status_code=401, detail="Not authenticated") return user def require_contributor(request: Request) -> SessionUser: """§6.1: authenticated, not write-muted.""" user = require_user(request) row = db.conn().execute("SELECT muted FROM users WHERE id = ?", (user.user_id,)).fetchone() if row and row["muted"]: raise HTTPException(status_code=403, detail="Your account is muted") return user def require_admin(request: Request) -> SessionUser: """§6.1: owner or admin.""" user = require_user(request) if user.role not in ("owner", "admin"): raise HTTPException(status_code=403, detail="Admin or owner role required") return user def new_state() -> str: return secrets.token_urlsafe(16)