All checks were successful
Sync Beads to Gitea Issues / sync-beads (push) Successful in 6s
63 lines
2.3 KiB
Python
63 lines
2.3 KiB
Python
import json
|
||
import os
|
||
import requests
|
||
import sys
|
||
|
||
TOKEN = os.getenv("GITEA_TOKEN")
|
||
URL = os.getenv("GITEA_URL")
|
||
REPO = os.getenv("REPO_NAME")
|
||
HEADERS = {"Authorization": f"token {TOKEN}", "Content-Type": f"application/json"}
|
||
|
||
def sync():
|
||
beads_path = ".beads/issues.jsonl"
|
||
if not os.path.exists(beads_path):
|
||
print(f"❌ ERROR: {beads_path} not found.")
|
||
sys.exit(1)
|
||
|
||
print(f"🔍 Reading Beads from: {beads_path}")
|
||
|
||
# Fetch existing Gitea issues
|
||
try:
|
||
# Note: state=all is important so we can find closed issues to re-open if needed
|
||
resp = requests.get(f"{URL}/api/v1/repos/{REPO}/issues?state=all", headers=HEADERS)
|
||
resp.raise_for_status()
|
||
# Map by [ID] to identify existing ones
|
||
existing = {i['title'].split(']')[0][1:]: i for i in resp.json() if ']' in i['title']}
|
||
except Exception as e:
|
||
print(f"❌ Gitea API Connection Failed: {e}")
|
||
sys.exit(1)
|
||
|
||
with open(beads_path, "r") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line: continue
|
||
|
||
data = json.loads(line)
|
||
bid = data.get("id")
|
||
title = data.get("title")
|
||
# The fix: Beads uses issue_type
|
||
itype = data.get("issue_type", "unknown")
|
||
|
||
print(f"Found Bead: {bid} ({itype}) - {title}")
|
||
|
||
# Only sync actual tasks/issues
|
||
if not bid or itype not in ["task", "bug", "feature", "issue"]:
|
||
continue
|
||
|
||
unique_title = f"[{bid}] {title}"
|
||
payload = {
|
||
"title": unique_title,
|
||
"body": f"{data.get('description', 'No description provided.')}\n\n---\n**Beads ID:** `{bid}`\n**Owner:** `{data.get('owner')}`",
|
||
"state": "closed" if data.get("status") in ["closed", "done"] else "open"
|
||
}
|
||
|
||
if bid in existing:
|
||
print(f"✅ Updating Gitea Issue #{existing[bid]['number']}")
|
||
requests.patch(f"{URL}/api/v1/repos/{REPO}/issues/{existing[bid]['number']}", headers=HEADERS, json=payload)
|
||
else:
|
||
print(f"➕ Creating New Gitea Issue")
|
||
requests.post(f"{URL}/api/v1/repos/{REPO}/issues", headers=HEADERS, json=payload)
|
||
|
||
if __name__ == "__main__":
|
||
sync()
|