Files
Ben Stull 779ba6db59 Slice 1: scaffolding + propose-to-super-draft vertical
Brings the §1 bot wrapper, the §4 cache (webhook + reconciler), the
§5 schema (six numbered migrations), Gitea OAuth + §6 user
provisioning, the §7 catalog left pane, and the propose-to-merge
vertical: propose modal opens an idea PR against the meta repo, an
owner merges from the pending-idea view, the cache picks it up via
webhook or reconciler sweep, and the catalog renders the new
super-draft.

Per §1 the bot is the only Git writer; every commit, branch
creation, and PR merge carries the §6.5 On-behalf-of: trailer and
an `actions` audit row. Per §4 the cache is never written from a
user action — it's webhook+reconciler only.

Covered by `backend/tests/test_propose_vertical.py` against an
in-process Gitea simulator.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 04:31:11 -07:00

287 lines
9.4 KiB
Python

"""Thin HTTP client for the Gitea REST API.
Read operations live here and are called from any module that needs them
(reconciler, super-draft body fetch, webhook handler). Write operations
also live here but are not called directly from outside this module —
they are wrapped by `bot.py` per §1, so that every commit, branch, and
PR carries the §6.5 On-behalf-of trailer and a row in the actions log.
This split keeps the chokepoint legible: anything that wants to read
imports from here; anything that wants to write imports from `bot.py`
and never reaches around it.
"""
from __future__ import annotations
import base64
from typing import Any
import httpx
from .config import Config
class GiteaError(Exception):
def __init__(self, status: int, detail: str):
super().__init__(f"Gitea {status}: {detail}")
self.status = status
self.detail = detail
class Gitea:
def __init__(self, config: Config):
self._config = config
self._client = httpx.AsyncClient(
base_url=f"{config.gitea_url}/api/v1",
headers={
"Authorization": f"token {config.gitea_bot_token}",
"Accept": "application/json",
},
timeout=30.0,
)
async def close(self) -> None:
await self._client.aclose()
async def _request(self, method: str, path: str, **kwargs: Any) -> httpx.Response:
resp = await self._client.request(method, path, **kwargs)
if resp.status_code >= 400:
try:
detail = resp.json().get("message", resp.text)
except Exception:
detail = resp.text
raise GiteaError(resp.status_code, detail)
return resp
# ----- Repo lifecycle -----
async def get_repo(self, owner: str, repo: str) -> dict | None:
try:
resp = await self._request("GET", f"/repos/{owner}/{repo}")
return resp.json()
except GiteaError as e:
if e.status == 404:
return None
raise
async def create_org_repo(self, org: str, name: str, *, description: str = "", private: bool = False) -> dict:
resp = await self._request(
"POST",
f"/orgs/{org}/repos",
json={
"name": name,
"description": description,
"private": private,
"auto_init": False,
},
)
return resp.json()
async def delete_repo(self, owner: str, repo: str) -> None:
await self._request("DELETE", f"/repos/{owner}/{repo}")
# ----- Branches -----
async def get_branch(self, owner: str, repo: str, branch: str) -> dict | None:
try:
resp = await self._request("GET", f"/repos/{owner}/{repo}/branches/{branch}")
return resp.json()
except GiteaError as e:
if e.status == 404:
return None
raise
async def list_branches(self, owner: str, repo: str) -> list[dict]:
resp = await self._request("GET", f"/repos/{owner}/{repo}/branches", params={"limit": 50})
return resp.json()
async def create_branch(self, owner: str, repo: str, new_branch: str, from_branch: str = "main") -> dict:
resp = await self._request(
"POST",
f"/repos/{owner}/{repo}/branches",
json={"new_branch_name": new_branch, "old_branch_name": from_branch},
)
return resp.json()
async def delete_branch(self, owner: str, repo: str, branch: str) -> None:
await self._request("DELETE", f"/repos/{owner}/{repo}/branches/{branch}")
# ----- File contents -----
async def get_contents(self, owner: str, repo: str, path: str, ref: str = "main") -> dict | None:
try:
resp = await self._request(
"GET",
f"/repos/{owner}/{repo}/contents/{path}",
params={"ref": ref},
)
return resp.json()
except GiteaError as e:
if e.status == 404:
return None
raise
async def list_dir(self, owner: str, repo: str, path: str = "", ref: str = "main") -> list[dict]:
try:
resp = await self._request(
"GET",
f"/repos/{owner}/{repo}/contents/{path}",
params={"ref": ref},
)
except GiteaError as e:
if e.status == 404:
return []
raise
data = resp.json()
return data if isinstance(data, list) else [data]
async def read_file(self, owner: str, repo: str, path: str, ref: str = "main") -> tuple[str, str] | None:
"""Return (content, sha) or None if the file is missing."""
item = await self.get_contents(owner, repo, path, ref)
if not item or item.get("type") != "file":
return None
return base64.b64decode(item["content"]).decode("utf-8"), item["sha"]
async def create_file(
self,
owner: str,
repo: str,
path: str,
*,
content: str,
message: str,
branch: str,
author_name: str | None = None,
author_email: str | None = None,
) -> dict:
body: dict[str, Any] = {
"message": message,
"content": base64.b64encode(content.encode("utf-8")).decode("ascii"),
"branch": branch,
}
if author_name and author_email:
body["author"] = {"name": author_name, "email": author_email}
body["committer"] = {"name": author_name, "email": author_email}
resp = await self._request("POST", f"/repos/{owner}/{repo}/contents/{path}", json=body)
return resp.json()
async def update_file(
self,
owner: str,
repo: str,
path: str,
*,
content: str,
sha: str,
message: str,
branch: str,
author_name: str | None = None,
author_email: str | None = None,
) -> dict:
body: dict[str, Any] = {
"message": message,
"content": base64.b64encode(content.encode("utf-8")).decode("ascii"),
"sha": sha,
"branch": branch,
}
if author_name and author_email:
body["author"] = {"name": author_name, "email": author_email}
body["committer"] = {"name": author_name, "email": author_email}
resp = await self._request("PUT", f"/repos/{owner}/{repo}/contents/{path}", json=body)
return resp.json()
# ----- Pull requests -----
async def list_pulls(self, owner: str, repo: str, state: str = "open") -> list[dict]:
resp = await self._request(
"GET",
f"/repos/{owner}/{repo}/pulls",
params={"state": state, "limit": 50},
)
return resp.json()
async def get_pull(self, owner: str, repo: str, number: int) -> dict | None:
try:
resp = await self._request("GET", f"/repos/{owner}/{repo}/pulls/{number}")
return resp.json()
except GiteaError as e:
if e.status == 404:
return None
raise
async def create_pull(
self,
owner: str,
repo: str,
*,
title: str,
body: str,
head: str,
base: str = "main",
) -> dict:
resp = await self._request(
"POST",
f"/repos/{owner}/{repo}/pulls",
json={"title": title, "body": body, "head": head, "base": base},
)
return resp.json()
async def merge_pull(
self,
owner: str,
repo: str,
number: int,
*,
merge_message_title: str,
merge_message_body: str,
style: str = "merge",
) -> None:
# Per §10.5: no-fast-forward merge commit. We pass Do='merge' so
# Gitea produces a merge commit rather than a fast-forward.
await self._request(
"POST",
f"/repos/{owner}/{repo}/pulls/{number}/merge",
json={
"Do": style,
"MergeTitleField": merge_message_title,
"MergeMessageField": merge_message_body,
},
)
async def close_pull(self, owner: str, repo: str, number: int) -> None:
await self._request(
"PATCH",
f"/repos/{owner}/{repo}/issues/{number}",
json={"state": "closed"},
)
async def create_issue_comment(self, owner: str, repo: str, number: int, body: str) -> dict:
resp = await self._request(
"POST",
f"/repos/{owner}/{repo}/issues/{number}/comments",
json={"body": body},
)
return resp.json()
# ----- Webhooks -----
async def ensure_webhook(self, owner: str, repo: str, *, url: str, secret: str, events: list[str]) -> dict:
existing = (await self._request("GET", f"/repos/{owner}/{repo}/hooks")).json()
for hook in existing:
if hook.get("config", {}).get("url") == url:
return hook
resp = await self._request(
"POST",
f"/repos/{owner}/{repo}/hooks",
json={
"type": "gitea",
"active": True,
"events": events,
"config": {
"url": url,
"content_type": "json",
"secret": secret,
},
},
)
return resp.json()