All checks were successful
Sync Beads to Gitea Issues / sync-beads (push) Successful in 7s
73 lines
2.7 KiB
Python
73 lines
2.7 KiB
Python
import json
|
||
import os
|
||
import requests
|
||
import sys
|
||
|
||
# Configuration from Environment
|
||
TOKEN = os.getenv("GITEA_TOKEN")
|
||
URL = os.getenv("GITEA_URL")
|
||
REPO = os.getenv("REPO_NAME")
|
||
HEADERS = {"Authorization": f"token {TOKEN}", "Content-Type": "application/json"}
|
||
|
||
def sync():
|
||
beads_path = ".beads/issues.jsonl"
|
||
|
||
if not os.path.exists(beads_path):
|
||
print(f"❌ ERROR: {beads_path} not found.")
|
||
sys.exit(1)
|
||
|
||
print(f"🔍 Reading Beads from: {beads_path}")
|
||
|
||
# 1. Fetch existing issues from Gitea
|
||
try:
|
||
# state=all includes closed issues so we don't create duplicates of finished tasks
|
||
api_url = f"{URL}/api/v1/repos/{REPO}/issues?state=all"
|
||
resp = requests.get(api_url, headers=HEADERS)
|
||
resp.raise_for_status()
|
||
# Map by [ID] to identify existing ones
|
||
existing = {i['title'].split(']')[0][1:]: i for i in resp.json() if ']' in i['title']}
|
||
print(f"📡 Found {len(existing)} existing issues in Gitea.")
|
||
except Exception as e:
|
||
print(f"❌ Gitea API Connection Failed: {e}")
|
||
sys.exit(1)
|
||
|
||
# 2. Parse the Beads JSONL file
|
||
with open(beads_path, "r") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
print(f"DEBUG: Processing line -> {line}")
|
||
try:
|
||
data = json.loads(line)
|
||
bid = data.get("id")
|
||
title = data.get("title")
|
||
# Matches your 'cat' output: "issue_type":"task"
|
||
itype = data.get("issue_type", "unknown")
|
||
|
||
if not bid:
|
||
print("⚠️ Skipping line: No ID found.")
|
||
continue
|
||
|
||
unique_title = f"[{bid}] {title}"
|
||
payload = {
|
||
"title": unique_title,
|
||
"body": f"{data.get('description', 'No description provided.')}\n\n---\n**Beads ID:** `{bid}`\n**Type:** `{itype}`",
|
||
"state": "closed" if data.get("status") in ["closed", "done"] else "open"
|
||
}
|
||
|
||
if bid in existing:
|
||
print(f"✅ Updating Gitea Issue #{existing[bid]['number']} for {bid}")
|
||
requests.patch(f"{URL}/api/v1/repos/{REPO}/issues/{existing[bid]['number']}", headers=HEADERS, json=payload)
|
||
else:
|
||
print(f"➕ Creating New Gitea Issue for {bid}")
|
||
r = requests.post(f"{URL}/api/v1/repos/{REPO}/issues", headers=HEADERS, json=payload)
|
||
r.raise_for_status()
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ Error processing bead: {e}")
|
||
|
||
if __name__ == "__main__":
|
||
sync()
|