"""Thin HTTP client for the Gitea REST API. Read operations live here and are called from any module that needs them (reconciler, super-draft body fetch, webhook handler). Write operations also live here but are not called directly from outside this module — they are wrapped by `bot.py` per §1, so that every commit, branch, and PR carries the §6.5 On-behalf-of trailer and a row in the actions log. This split keeps the chokepoint legible: anything that wants to read imports from here; anything that wants to write imports from `bot.py` and never reaches around it. """ from __future__ import annotations import base64 from typing import Any import httpx from .config import Config class GiteaError(Exception): def __init__(self, status: int, detail: str): super().__init__(f"Gitea {status}: {detail}") self.status = status self.detail = detail class Gitea: def __init__(self, config: Config): self._config = config self._client = httpx.AsyncClient( base_url=f"{config.gitea_url}/api/v1", headers={ "Authorization": f"token {config.gitea_bot_token}", "Accept": "application/json", }, timeout=30.0, ) async def close(self) -> None: await self._client.aclose() async def _request(self, method: str, path: str, **kwargs: Any) -> httpx.Response: resp = await self._client.request(method, path, **kwargs) if resp.status_code >= 400: try: detail = resp.json().get("message", resp.text) except Exception: detail = resp.text raise GiteaError(resp.status_code, detail) return resp # ----- Repo lifecycle ----- async def get_repo(self, owner: str, repo: str) -> dict | None: try: resp = await self._request("GET", f"/repos/{owner}/{repo}") return resp.json() except GiteaError as e: if e.status == 404: return None raise async def create_org_repo(self, org: str, name: str, *, description: str = "", private: bool = False) -> dict: resp = await self._request( "POST", f"/orgs/{org}/repos", json={ "name": name, "description": description, "private": private, "auto_init": False, }, ) return resp.json() async def delete_repo(self, owner: str, repo: str) -> None: await self._request("DELETE", f"/repos/{owner}/{repo}") # ----- Branches ----- async def get_branch(self, owner: str, repo: str, branch: str) -> dict | None: try: resp = await self._request("GET", f"/repos/{owner}/{repo}/branches/{branch}") return resp.json() except GiteaError as e: if e.status == 404: return None raise async def list_branches(self, owner: str, repo: str) -> list[dict]: resp = await self._request("GET", f"/repos/{owner}/{repo}/branches", params={"limit": 50}) return resp.json() async def create_branch(self, owner: str, repo: str, new_branch: str, from_branch: str = "main") -> dict: resp = await self._request( "POST", f"/repos/{owner}/{repo}/branches", json={"new_branch_name": new_branch, "old_branch_name": from_branch}, ) return resp.json() async def delete_branch(self, owner: str, repo: str, branch: str) -> None: await self._request("DELETE", f"/repos/{owner}/{repo}/branches/{branch}") # ----- File contents ----- async def get_contents(self, owner: str, repo: str, path: str, ref: str = "main") -> dict | None: try: resp = await self._request( "GET", f"/repos/{owner}/{repo}/contents/{path}", params={"ref": ref}, ) return resp.json() except GiteaError as e: if e.status == 404: return None raise async def list_dir(self, owner: str, repo: str, path: str = "", ref: str = "main") -> list[dict]: try: resp = await self._request( "GET", f"/repos/{owner}/{repo}/contents/{path}", params={"ref": ref}, ) except GiteaError as e: if e.status == 404: return [] raise data = resp.json() return data if isinstance(data, list) else [data] async def read_file(self, owner: str, repo: str, path: str, ref: str = "main") -> tuple[str, str] | None: """Return (content, sha) or None if the file is missing.""" item = await self.get_contents(owner, repo, path, ref) if not item or item.get("type") != "file": return None return base64.b64decode(item["content"]).decode("utf-8"), item["sha"] async def create_file( self, owner: str, repo: str, path: str, *, content: str, message: str, branch: str, author_name: str | None = None, author_email: str | None = None, ) -> dict: body: dict[str, Any] = { "message": message, "content": base64.b64encode(content.encode("utf-8")).decode("ascii"), "branch": branch, } if author_name and author_email: body["author"] = {"name": author_name, "email": author_email} body["committer"] = {"name": author_name, "email": author_email} resp = await self._request("POST", f"/repos/{owner}/{repo}/contents/{path}", json=body) return resp.json() async def update_file( self, owner: str, repo: str, path: str, *, content: str, sha: str, message: str, branch: str, author_name: str | None = None, author_email: str | None = None, ) -> dict: body: dict[str, Any] = { "message": message, "content": base64.b64encode(content.encode("utf-8")).decode("ascii"), "sha": sha, "branch": branch, } if author_name and author_email: body["author"] = {"name": author_name, "email": author_email} body["committer"] = {"name": author_name, "email": author_email} resp = await self._request("PUT", f"/repos/{owner}/{repo}/contents/{path}", json=body) return resp.json() # ----- Pull requests ----- async def list_pulls(self, owner: str, repo: str, state: str = "open") -> list[dict]: resp = await self._request( "GET", f"/repos/{owner}/{repo}/pulls", params={"state": state, "limit": 50}, ) return resp.json() async def get_pull(self, owner: str, repo: str, number: int) -> dict | None: try: resp = await self._request("GET", f"/repos/{owner}/{repo}/pulls/{number}") return resp.json() except GiteaError as e: if e.status == 404: return None raise async def create_pull( self, owner: str, repo: str, *, title: str, body: str, head: str, base: str = "main", ) -> dict: resp = await self._request( "POST", f"/repos/{owner}/{repo}/pulls", json={"title": title, "body": body, "head": head, "base": base}, ) return resp.json() async def merge_pull( self, owner: str, repo: str, number: int, *, merge_message_title: str, merge_message_body: str, style: str = "merge", ) -> None: # Per §10.5: no-fast-forward merge commit. We pass Do='merge' so # Gitea produces a merge commit rather than a fast-forward. await self._request( "POST", f"/repos/{owner}/{repo}/pulls/{number}/merge", json={ "Do": style, "MergeTitleField": merge_message_title, "MergeMessageField": merge_message_body, }, ) async def close_pull(self, owner: str, repo: str, number: int) -> None: await self._request( "PATCH", f"/repos/{owner}/{repo}/issues/{number}", json={"state": "closed"}, ) async def create_issue_comment(self, owner: str, repo: str, number: int, body: str) -> dict: resp = await self._request( "POST", f"/repos/{owner}/{repo}/issues/{number}/comments", json={"body": body}, ) return resp.json() # ----- Webhooks ----- async def ensure_webhook(self, owner: str, repo: str, *, url: str, secret: str, events: list[str]) -> dict: existing = (await self._request("GET", f"/repos/{owner}/{repo}/hooks")).json() for hook in existing: if hook.get("config", {}).get("url") == url: return hook resp = await self._request( "POST", f"/repos/{owner}/{repo}/hooks", json={ "type": "gitea", "active": True, "events": events, "config": { "url": url, "content_type": "json", "secret": secret, }, }, ) return resp.json()