#!/usr/bin/env python3
"""
QA Copilot - Local Server
Proxy between the browser and Ollama, with CORS bypass.

Key design:
- Uses Ollama stream:true internally to keep the TCP socket alive during
  long generations — this eliminates ALL timeout errors even for 10-min runs.
- Returns a standard {"content":"...","done":true} JSON so the HTML pages
  need no changes.
- Also exposes GET /api/stream?... for real SSE streaming (future use).
"""

import html
import http.server
import secrets
import urllib.request
import urllib.error
import urllib.parse
import json
import os
from pathlib import Path as _Path

# Load .env from the script directory (if present) so QA_JWT_SECRET etc. persist
try:
    from dotenv import load_dotenv
    load_dotenv(_Path(__file__).parent / ".env")
except ImportError:
    pass
import re
import socket
import threading
import hashlib
import hmac
import base64
import uuid
import datetime
import sqlite3
import authz
import identity
import monitoring
import fine_tuning
import rate_limiter
from llm_providers import llm_chat, llm_stream, llm_json, get_default_provider, get_providers_info, LLMError
import generation
import logging_config
import notifications
import onboarding
import test_runs
import workspace
import perf_results

_log = logging_config.get_logger("server")

_JIRA_KEY_RE = re.compile(r'\b([A-Z][A-Z0-9_]+-\d+)\b')

def _fix_body_field_quotes(text: str) -> str:
    """
    Re-escape bare double-quotes AND literal control chars inside JSON body/code
    string values. Mirrors the client-side fixBodyQuotes + fixLiteralControlChars
    logic but runs server-side on the complete LLM response.

    Handles fields: body, corps, script, testPlan, source.
    Structural-close heuristic: a '"' followed (past whitespace) by ',' '}' or ']'
    outside method-call parens ends the field value.
    """
    _BODY_KEYS = re.compile(r'"(?:body|corps|script|testPlan|source)"\s*:\s*"')
    result: list[str] = []
    i = 0
    while i < len(text):
        m = _BODY_KEYS.search(text, i)
        if not m:
            result.append(text[i:])
            break
        result.append(text[i:m.end()])  # everything up to and including opening "
        i = m.end()
        paren_depth = 0
        val: list[str] = []
        while i < len(text):
            c = text[i]
            if c == '\\' and i + 1 < len(text):       # already-escaped sequence
                val.append(c + text[i + 1]); i += 2
            elif c == '(':
                paren_depth += 1; val.append(c); i += 1
            elif c == ')':
                paren_depth = max(0, paren_depth - 1); val.append(c); i += 1
            elif c == '\n':
                val.append('\\n'); i += 1              # literal newline → escape
            elif c == '\r':
                val.append('\\r'); i += 1
            elif c == '\t':
                val.append('\\t'); i += 1
            elif c == '"':
                j = i + 1
                while j < len(text) and text[j] in ' \n\r\t':
                    j += 1
                if j >= len(text) or (paren_depth == 0 and text[j] in '}],'):
                    result.append(''.join(val) + '"'); i += 1; break
                else:
                    val.append('\\"'); i += 1          # bare inner quote → escape
            else:
                val.append(c); i += 1
        else:
            result.append(''.join(val))                # EOF inside field
    return ''.join(result)


def _close_truncated_json(text: str) -> str:
    """Append closing '"', '}', ']' to repair a truncated JSON string."""
    stack: list[str] = []
    in_str = esc = False
    for c in text:
        if esc:            esc = False; continue
        if c == '\\' and in_str: esc = True; continue
        if c == '"':       in_str = not in_str; continue
        if in_str:         continue
        if c == '{':       stack.append('}')
        elif c == '[':     stack.append(']')
        elif c in ']}' and stack: stack.pop()
    return text + ('"' if in_str else '') + ''.join(reversed(stack))


def _extract_json(text: str) -> dict:
    """
    Extract a parsed JSON dict from LLM text.
    Applies progressively more aggressive repairs:
      1. Direct json.loads (works when model outputs valid JSON)
      2. Fix literal control chars in strings (handles pretty-printed bodies)
      3. Fix body field quotes + control chars (handles unescaped source code)
      4. Close truncated JSON (handles max_tokens cut-off)
    """
    text = re.sub(r'```(?:json|java|python|xml|groovy)?\s*', '', text).strip()
    start = text.find('{')
    if start == -1:
        raise ValueError("No JSON object in response")
    text = text[start:]

    _last_err = None
    for attempt, candidate in enumerate([
        text,
        _fix_body_field_quotes(text),
        _close_truncated_json(_fix_body_field_quotes(text)),
    ]):
        try:
            return json.loads(candidate)
        except json.JSONDecodeError as e:
            _last_err = e
    raise _last_err  # type: ignore[misc]

# ── Robust code/perf parsing — provider-specific paths ───────────────────────
# Ollama: delimiter markers (body content is opaque to JSON parsing)
# Claude: tool_use via llm_json (pre-parsed dict, zero text parsing)
# OpenAI: json_schema via llm_json (pre-parsed dict, zero text parsing)

_DELIMITER_OVERRIDE = """

IMPORTANT — OUTPUT FORMAT OVERRIDE: Do NOT output JSON. Use these exact markers on their own lines:
<<<CLASS>>>
[class or simulation name only — no package prefix]
<<<PACKAGE>>>
[package name only]
<<<IMPORTS>>>
[one import statement per line with semicolons]
<<<DESCRIPTION>>>
[one-line description]
<<<BODY>>>
[complete source code — no markdown fences]
<<<END>>>
"""

_PERF_DELIMITER_OVERRIDE = """

IMPORTANT — OUTPUT FORMAT OVERRIDE: Do NOT output JSON. Use ONLY these two markers on their own lines:
<<<DESCRIPTION>>>
[one-line description of what this performance test covers]
<<<BODY>>>
[complete performance test source code — no markdown fences, no explanations outside the code]
<<<END>>>
"""

_CODE_SCHEMA = {
    "type": "object",
    "properties": {
        "code": {
            "type": "object",
            "properties": {
                "class":   {"type": "string"},
                "package": {"type": "string"},
                "imports": {"type": "array", "items": {"type": "string"}},
                "body":    {"type": "string"}
            },
            "required": ["class", "package", "body"]
        }
    },
    "required": ["code"]
}

_PERF_SCHEMA = {
    "type": "object",
    "properties": {
        "perf": {
            "type": "object",
            "properties": {
                "class":       {"type": "string"},
                "description": {"type": "string"},
                "body":        {"type": "string"}
            },
            "required": ["body"]
        }
    },
    "required": ["perf"]
}

_CAS_SCHEMA = {
    "type": "object",
    "properties": {
        "cas": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "title":          {"type": "string"},
                    "preconditions":  {"type": "string"},
                    "steps":          {"type": "array", "items": {"type": "string"}},
                    "expectedResult": {"type": "string"},
                    "priority":       {"type": "string"},
                    "type":           {"type": "string"},
                },
                "required": ["title", "steps", "expectedResult"],
            },
        }
    },
    "required": ["cas"],
}

_GHERKIN_SCHEMA = {
    "type": "object",
    "properties": {
        "gherkin": {
            "type": "object",
            "properties": {
                "feature": {"type": "string"},
                "scenarios": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "title": {"type": "string"},
                            "tags":  {"type": "array", "items": {"type": "string"}},
                            "steps": {
                                "type": "object",
                                "properties": {
                                    "given": {"type": "array", "items": {"type": "string"}},
                                    "when":  {"type": "array", "items": {"type": "string"}},
                                    "then":  {"type": "array", "items": {"type": "string"}},
                                },
                            },
                        },
                        "required": ["title", "steps"],
                    },
                },
            },
            "required": ["feature", "scenarios"],
        }
    },
    "required": ["gherkin"],
}

_RISQUES_SCHEMA = {
    "type": "object",
    "properties": {
        "risques": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "title":          {"type": "string"},
                    "description":    {"type": "string"},
                    "category":       {"type": "string"},
                    "severity":       {"type": "string"},
                    "recommendation": {"type": "string"},
                },
                "required": ["title", "description", "severity", "category", "recommendation"],
            },
        }
    },
    "required": ["risques"],
}

_DATA_SCHEMA = {
    "type": "object",
    "properties": {
        "data": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "name":        {"type": "string"},
                    "description": {"type": "string"},
                    "values": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "field": {"type": "string"},
                                "value": {"type": "string"},
                                "type":  {"type": "string"},
                            },
                            "required": ["field", "value", "type"],
                        },
                    },
                },
                "required": ["name", "values"],
            },
        }
    },
    "required": ["data"],
}


def _get_module_schema(module: str) -> dict | None:
    """Return the JSON schema for a generation module, or None if unknown."""
    return {
        "cas":     _CAS_SCHEMA,
        "gherkin": _GHERKIN_SCHEMA,
        "risques": _RISQUES_SCHEMA,
        "data":    _DATA_SCHEMA,
        "code":    _CODE_SCHEMA,
        "perf":    _PERF_SCHEMA,
    }.get(module)


def _parse_delimiter_output(text: str, module: str) -> dict:
    """
    Parse <<<MARKER>>> delimiter format into a result dict.
    Used for Ollama code/perf modules — body content never touches JSON parsing.
    """
    markers: dict = {}
    current: str | None = None
    lines: list = []
    for line in text.split('\n'):
        m = re.match(r'<<<(\w+)>>>', line.strip())
        if m:
            if current:
                markers[current] = '\n'.join(lines).strip()
            current, lines = m.group(1), []
        elif current:
            lines.append(line)
    if current:
        markers[current] = '\n'.join(lines).strip()

    if not markers.get('BODY'):
        print(f"  ⚠  [delimiter] No <<<BODY>>> found in output for module={module!r}. Raw[:200]: {text[:200]!r}")

    if module == 'code':
        raw_imports = markers.get('IMPORTS', '')
        imports = [l.strip() for l in raw_imports.splitlines() if l.strip()]
        return {'code': {
            'class':   markers.get('CLASS', ''),
            'package': markers.get('PACKAGE', ''),
            'imports': imports,
            'body':    markers.get('BODY', ''),
        }}
    elif module == 'perf':
        body = markers.get('BODY', '')
        # Fallback: if BODY is empty but CLASS has multi-line content, the model
        # probably put the full source there — use it as body.
        if not body and len(markers.get('CLASS', '').splitlines()) > 3:
            body = markers.get('CLASS', '')
        return {'perf': {
            'class':       markers.get('CLASS', 'PerfTest'),
            'description': markers.get('DESCRIPTION', ''),
            'body':        body,
        }}
    raise ValueError(f"Unknown source module for delimiter parsing: {module!r}")

import socketserver
import time
from pathlib import Path

# ── CI/CD webhook job store ───────────────────────────────────────────────────
# In-memory only — cleared on server restart. Jobs expire after 1 hour.
_JOBS: dict = {}           # job_id → {status, created_at, result, error}
_JOBS_LOCK = threading.Lock()
_JOB_TTL_SECONDS = 3600

PORT        = int(os.environ.get("QA_PORT", 5174))
OLLAMA_URL  = os.environ.get("OLLAMA_URL", "http://localhost:11434")
DEFAULT_MODEL = os.environ.get("QA_DEFAULT_MODEL", "codellama:13b")
SCRIPT_DIR  = Path(__file__).parent
COVERAGE_DB  = SCRIPT_DIR / "coverage.db"
USERDATA_DB  = SCRIPT_DIR / "userdata.db"

# Multi-project + RBAC feature flag. When False (default), today's
# _require_auth(role=...) path runs verbatim. When True, the new
# authz.require enforcement + org-aware login take over. Plan 2 of the
# multi-project rollout. Spec: docs/superpowers/specs/2026-05-05-...
MULTI_PROJECT_ENABLED: bool = os.getenv("MULTI_PROJECT_ENABLED", "0") == "1"

# Max tokens cap — prevents runaway generation on slow local models
MAX_TOKENS_CAP = int(os.environ.get("QA_MAX_TOKENS", 8000))

# Context window passed to Ollama — must be large enough to hold the full prompt
# (system prompt + RAG context + user input + generated output).
# 16384 handles BDD + all modules + RAG context. Increase to 32768 for very
# large Jira tickets or when generation is still truncated.
OLLAMA_CTX = int(os.environ.get("OLLAMA_CTX", 16384))

# Socket read timeout per chunk (streaming keeps socket alive between tokens).
# When input contains a full Jira ticket + RAG context, Ollama's prompt prefill
# can take several minutes before the first token arrives — use a generous value.
CHUNK_TIMEOUT = int(os.environ.get("QA_CHUNK_TIMEOUT", 300))

# CORS allowed origins — comma-separated list; set to "*" to allow all
_CORS_ORIGINS = [o.strip() for o in os.environ.get("QA_CORS_ORIGINS", "http://localhost:5173,http://localhost:5174,http://localhost:3000").split(",") if o.strip()]

# RAG server URL — leave empty to disable RAG enrichment (set RAG_URL=http://... to enable)
RAG_URL = os.environ.get("RAG_URL", "")
RAG_TOP_K = int(os.environ.get("RAG_TOP_K", "5"))
REVIEW_SERVER_URL = os.environ.get("REVIEW_SERVER_URL", "http://localhost:8765")
_FT_DIR = Path(__file__).parent.parent / "fine-tuning"
_ASSEMBLE_PY = _FT_DIR / "assemble.py"
_TRAIN_PY = _FT_DIR / "backends" / "mlx_train.py"

# ── License validation (self-hosted tier only) ────────────────────────────────
LICENSE_KEY = os.environ.get("QA_LICENSE_KEY", "").strip()
LICENSE_VALIDATE_URL = "https://app.qacopilot.fr/api/license/validate"

def _validate_license() -> None:
    """
    Validate the self-hosted license key against the cloud control plane.
    Called once at startup. Exits the process with code 1 if validation fails.
    Skipped entirely when QA_LICENSE_KEY is not set (cloud / local dev mode).
    """
    if not LICENSE_KEY:
        return  # Cloud tier or local dev — no license required

    print(f"  Validating license key against {LICENSE_VALIDATE_URL} …")
    try:
        req = urllib.request.Request(
            f"{LICENSE_VALIDATE_URL}?key={urllib.parse.quote(LICENSE_KEY)}",
            method="GET",
            headers={"User-Agent": "QACopilot-Server/1.0"},
        )
        with urllib.request.urlopen(req, timeout=15) as resp:
            body = json.loads(resp.read().decode())
        if not body.get("valid"):
            reason = body.get("reason", "unknown")
            print(f"  ERROR: License key invalid — {reason}. Exiting.")
            raise SystemExit(1)
        expires = body.get("expires_at", "")
        print(f"  License valid. Expires: {expires}")
    except SystemExit:
        raise
    except Exception as exc:
        print(f"  ERROR: License validation failed — {exc}. Exiting.")
        raise SystemExit(1)


# ── Control Plane (SaaS auth + billing) ──────────────────────────────────────
# Set CONTROL_PLANE_URL to the deployed control plane base URL in production.
# Leave empty (default) for local dev — all auth checks are skipped.
CONTROL_PLANE_URL = os.environ.get("CONTROL_PLANE_URL", "")


# ── Control Plane auth helpers ────────────────────────────────────────────────

def verify_jwt(token: str) -> dict:
    """
    Check whether a Clerk JWT is allowed to generate.

    Calls GET CONTROL_PLANE_URL/api/usage/check with the token as a
    Bearer header.  Returns {"allowed": bool, "reason": str}.

    Dev mode: if CONTROL_PLANE_URL is empty, always returns allowed=True
    without making any network call.
    """
    if not CONTROL_PLANE_URL:
        return {"allowed": True, "reason": "dev_mode"}
    try:
        req = urllib.request.Request(
            f"{CONTROL_PLANE_URL}/api/usage/check",
            headers={
                "Authorization": f"Bearer {token}",
                "Accept": "application/json",
            },
            method="GET",
        )
        with urllib.request.urlopen(req, timeout=5) as resp:
            data = json.loads(resp.read())
        return {
            "allowed": bool(data.get("allowed", False)),
            "reason":  str(data.get("reason", "")),
        }
    except Exception as exc:
        print(f"  ⚠  verify_jwt error: {exc}")
        return {"allowed": False, "reason": f"control_plane_error: {exc}"}


def report_usage(token: str, tokens_used: int) -> None:
    """
    Fire-and-forget: report token consumption to the control plane.

    Calls POST CONTROL_PLANE_URL/api/usage/report.
    Silently swallows all errors — a failed report must never break a
    completed generation.

    Dev mode: if CONTROL_PLANE_URL is empty, this is a no-op.
    """
    if not CONTROL_PLANE_URL:
        return
    try:
        payload = json.dumps({"tokens_used": tokens_used}).encode()
        req = urllib.request.Request(
            f"{CONTROL_PLANE_URL}/api/usage/report",
            data=payload,
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type":  "application/json",
            },
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=5) as resp:
            resp.read()  # drain
    except Exception as exc:
        print(f"  ⚠  report_usage error (non-fatal): {exc}")


def _check_cp_jwt(self) -> bool:
    """
    Gate /api/chat and /api/stream behind a Clerk JWT validated by the control plane.
    Returns True if the request is allowed to proceed, False otherwise
    (a 401 or 402 response has already been sent).

    Defined at module level so tests can call server._check_cp_jwt(handler)
    directly.  Bound to QACopilotHandler below so self._check_cp_jwt() also
    works in production.

    Dev mode: if CONTROL_PLANE_URL is empty, always returns True.
    """
    if not CONTROL_PLANE_URL:
        return True
    auth = (self.headers or {}).get("Authorization", "")
    if not auth.startswith("Bearer "):
        body = json.dumps({"error": "missing_token"}, ensure_ascii=False).encode()
        self.send_response(401)
        self.send_header("Content-Type", "application/json; charset=utf-8")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        try:
            self.wfile.write(body)
        except BrokenPipeError:
            pass
        return False
    token = auth[len("Bearer "):]
    result = verify_jwt(token)
    if result["allowed"]:
        self._cp_token = token
        return True
    reason = result.get("reason", "")
    status = 402 if reason == "trial_exhausted" else 401
    body = json.dumps({"error": reason or "unauthorized"}, ensure_ascii=False).encode()
    self.send_response(status)
    self.send_header("Content-Type", "application/json; charset=utf-8")
    self.send_header("Content-Length", str(len(body)))
    self.end_headers()
    try:
        self.wfile.write(body)
    except BrokenPipeError:
        pass
    return False


# ── Auth / JWT ────────────────────────────────────────────────────────────────
_env_jwt_secret = os.environ.get("QA_JWT_SECRET", "")
if _env_jwt_secret:
    JWT_SECRET = _env_jwt_secret
else:
    JWT_SECRET = secrets.token_hex(32)  # ephemeral — tokens won't survive restart
    print("  ⚠  QA_JWT_SECRET not set — using a random ephemeral secret. "
          "Set QA_JWT_SECRET in your environment for persistent sessions.")

_BAD_SECRETS = {"change-me", "change-me-in-production", "secret", "password", "admin", "test"}

def _validate_secrets() -> None:
    """Check for weak/default secrets at startup. Exits with code 1 on failure.
    Skipped when QA_DEV_MODE=1."""
    if os.environ.get("QA_DEV_MODE", "") == "1":
        return
    secret = os.environ.get("QA_JWT_SECRET", "")
    if not secret:
        return  # Ephemeral secret is OK (already warned above)
    if secret.lower() in _BAD_SECRETS:
        print(f"\n  ✗  FATAL: QA_JWT_SECRET is set to a known-bad value '{secret}'.")
        print("     Please set a strong, unique secret (at least 32 characters).")
        print("     Example: export QA_JWT_SECRET=$(python3 -c 'import secrets; print(secrets.token_hex(32))')\n")
        raise SystemExit(1)
    if len(secret) < 16:
        print(f"\n  ✗  FATAL: QA_JWT_SECRET is too short ({len(secret)} chars, minimum 16).")
        print("     Please set a strong, unique secret (at least 32 characters recommended).\n")
        raise SystemExit(1)

def _validate_admin_password() -> None:
    """Check for weak admin passwords at startup. Skipped in dev mode."""
    if os.environ.get("QA_DEV_MODE", "") == "1":
        return
    pw = os.environ.get("QA_ADMIN_PASSWORD", "")
    if not pw:
        return  # Auto-generated password is OK
    if len(pw) < 12:
        print(f"\n  ⚠  WARNING: QA_ADMIN_PASSWORD is short ({len(pw)} chars). Recommend 16+ characters.\n")

USERS_FILE      = SCRIPT_DIR / "users.json"
ADMIN_USERNAME  = os.environ.get("QA_ADMIN_USERNAME", "admin")
ADMIN_PASSWORD  = os.environ.get("QA_ADMIN_PASSWORD", "")
if not ADMIN_PASSWORD:
    ADMIN_PASSWORD = secrets.token_urlsafe(16)
    print(f"  ⚠  QA_ADMIN_PASSWORD not set — generated password: {ADMIN_PASSWORD}")
TOKEN_TTL_HOURS = int(os.environ.get("QA_TOKEN_TTL_HOURS", "24"))
_USERS_LOCK     = threading.Lock()


# ── JWT helpers ───────────────────────────────────────────────────────────────

def _b64url_enc(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def _b64url_dec(s: str) -> bytes:
    s += "=" * (4 - len(s) % 4)
    return base64.urlsafe_b64decode(s)

def _jwt_encode(payload: dict) -> str:
    header  = _b64url_enc(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
    body    = _b64url_enc(json.dumps(payload).encode())
    signing = f"{header}.{body}".encode()
    sig     = _b64url_enc(hmac.new(JWT_SECRET.encode(), signing, hashlib.sha256).digest())
    return f"{header}.{body}.{sig}"

def _jwt_decode(token: str) -> dict | None:
    try:
        parts = token.split(".")
        if len(parts) != 3:
            return None
        header, body, sig = parts
        signing  = f"{header}.{body}".encode()
        expected = _b64url_enc(hmac.new(JWT_SECRET.encode(), signing, hashlib.sha256).digest())
        if not hmac.compare_digest(sig, expected):
            return None
        payload = json.loads(_b64url_dec(body))
        if payload.get("exp", 0) < time.time():
            return None
        return payload
    except Exception:
        return None


# ── Password helpers ──────────────────────────────────────────────────────────

def _hash_password(password: str) -> str:
    salt = os.urandom(16)
    dk   = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, 260_000)
    return base64.b64encode(salt + dk).decode()

def _verify_password(password: str, stored: str) -> bool:
    try:
        raw  = base64.b64decode(stored.encode())
        salt = raw[:16]
        dk   = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, 260_000)
        return hmac.compare_digest(raw[16:], dk)
    except Exception:
        return False


# ── User store ────────────────────────────────────────────────────────────────

def _load_users() -> list:
    """Load all users from SQLite."""
    conn = sqlite3.connect(str(USERDATA_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    conn.row_factory = sqlite3.Row
    try:
        rows = conn.execute("SELECT * FROM users").fetchall()
        return [{**dict(r), "active": bool(r["active"])} for r in rows]
    except Exception:
        return []
    finally:
        conn.close()


def _save_users(users: list):
    """Save all users to SQLite (full replace — used for bulk operations)."""
    conn = sqlite3.connect(str(USERDATA_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    try:
        for u in users:
            conn.execute(
                """INSERT INTO users (id, username, email, password_hash, api_key, role, active, created_at)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                   ON CONFLICT(id) DO UPDATE SET
                       username=excluded.username, email=excluded.email,
                       password_hash=excluded.password_hash, api_key=excluded.api_key,
                       role=excluded.role, active=excluded.active""",
                (u["id"], u["username"], u.get("email", ""), u["password_hash"],
                 u.get("api_key"), u.get("role", "user"), 1 if u.get("active", True) else 0,
                 u.get("created_at", "")),
            )
        conn.commit()
    finally:
        conn.close()

def _bootstrap_admin():
    """Create the admin account on first run if no users exist."""
    existing = _load_users()
    if existing:
        return
    # Also migrate from users.json if it exists (one-time migration)
    if USERS_FILE.exists():
        try:
            with open(USERS_FILE, "r", encoding="utf-8") as f:
                file_users = json.load(f)
            if file_users:
                _save_users(file_users)
                print(f"  ✓ Migrated {len(file_users)} users from users.json to SQLite")
                return
        except Exception:
            pass
    admin = {
        "id":            str(uuid.uuid4()),
        "username":      ADMIN_USERNAME,
        "email":         "admin@local",
        "password_hash": _hash_password(ADMIN_PASSWORD),
        "role":          "admin",
        "active":        True,
        "created_at":    datetime.datetime.now(datetime.timezone.utc).isoformat() + "Z",
    }
    _save_users([admin])
    print(f"  ✓  Admin bootstrapped: {ADMIN_USERNAME} / {ADMIN_PASSWORD}")

def _make_token(user: dict) -> str:
    return _jwt_encode({
        "sub":      user["id"],
        "username": user["username"],
        "role":     user["role"],
        "exp":      int(time.time()) + TOKEN_TTL_HOURS * 3600,
    })

def _safe_user(user: dict) -> dict:
    return {k: v for k, v in user.items() if k != "password_hash"}


def _generate_api_key() -> str:
    """Return a cryptographically random 32-byte hex API key."""
    return os.urandom(32).hex()


def _require_api_key(auth_header: str) -> dict | None:
    """
    Validate an X-Api-Key header against stored user api_keys.
    Returns the user dict or None if invalid/missing.
    """
    key = auth_header.strip()
    if not key:
        return None
    with _USERS_LOCK:
        users = _load_users()
    return next((u for u in users if u.get("api_key") == key and u.get("active")), None)


def _query_rag(text: str, modules: list | None = None) -> list:
    """
    Query the RAG server and return the raw hits list.

    Args:
        text: The query text (story description or user input).
        modules: Active generation modules (cas, code, gherkin, etc.) for
                 source diversity. Passed through to RAG server.

    Returns [] if RAG is disabled, unreachable, or finds nothing.
    """
    if not RAG_URL:
        return []
    try:
        body = {"question": text, "top_k": RAG_TOP_K}
        if modules:
            body["modules"] = modules
        payload = json.dumps(body).encode()
        req = urllib.request.Request(
            f"{RAG_URL}/rag/query",
            data=payload,
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=30) as resp:
            data = json.loads(resp.read())
        return data.get("hits", [])
    except Exception as e:
        monitoring.insert_error_log(level="WARN", source="rag", message=f"RAG unavailable: {e}")
        return []


def _query_rag_groups(text: str, modules: list | None = None) -> dict:
    """
    Query the RAG server and return the grouped hits dict.

    Returns a dict like {"jira": [...], "confluence": [...], "web": [...]}
    where each entry has the raw hit format from /rag/query.
    Returns {} if RAG is unavailable.
    """
    if not RAG_URL:
        return {}
    try:
        body: dict = {"question": text, "top_k": 8}
        if modules:
            body["modules"] = modules
        payload = json.dumps(body).encode()
        req = urllib.request.Request(
            f"{RAG_URL}/rag/query",
            data=payload,
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=30) as resp:
            data = json.loads(resp.read())
        return data.get("groups", {})
    except Exception as e:
        monitoring.insert_error_log(level="WARN", source="rag", message=f"RAG unavailable: {e}")
        return {}


def _query_rag_codebase(text: str, top_k: int = 5, source: str | None = None) -> list:
    """
    Query the RAG server filtered to codebase sources.
    Used to retrieve framework code examples for test generation.
    Returns [] if RAG is disabled, unreachable, or finds nothing.
    """
    if not RAG_URL:
        return []
    try:
        body: dict = {"question": text, "top_k": top_k}
        if source:
            body["source"] = source
        else:
            # Search all codebase-related sources
            body["source"] = "java_codebase"
        payload = json.dumps(body).encode()
        req = urllib.request.Request(
            f"{RAG_URL}/rag/query",
            data=payload,
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=30) as resp:
            data = json.loads(resp.read())
        return data.get("hits", [])
    except Exception as e:
        monitoring.insert_error_log(level="WARN", source="rag", message=f"RAG codebase unavailable: {e}")
        return []


def _hits_to_context(hits: list) -> str:
    """Format RAG hits into a context block for the system prompt.
    Applies execution-based boosts from Xray feedback (Roadmap 1.4).
    """
    boosted = []
    for h in hits:
        p = h.get("payload") or {}
        src   = p.get("source", "")
        title = p.get("title", "")
        chunk = p.get("chunk", "")
        score = h.get("score", 0.0)
        chunk_id = p.get("id") or p.get("chunk_id", "")
        if chunk:
            boost = _db_get_chunk_boost(chunk_id) if chunk_id else 0.0
            boosted.append((score + boost, f"[{src}] {title} (relevance={score + boost:.2f})\n{chunk}"))
    boosted.sort(key=lambda x: x[0], reverse=True)
    return "\n\n---\n\n".join(text for _, text in boosted)


# ── Coverage DB helpers ───────────────────────────────────────────────────────

def _init_coverage_db() -> None:
    """Create coverage.db and tables on first run (idempotent)."""
    conn = sqlite3.connect(str(COVERAGE_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("""
        CREATE TABLE IF NOT EXISTS stories (
            story_key    TEXT PRIMARY KEY,
            total_tests  INTEGER DEFAULT 0,
            last_updated TEXT
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS tests (
            id         TEXT PRIMARY KEY,
            story_key  TEXT REFERENCES stories(story_key),
            title      TEXT NOT NULL,
            module     TEXT,
            source     TEXT,
            vector_id  TEXT,
            pushed_at  TEXT
        )
    """)
    conn.execute("CREATE INDEX IF NOT EXISTS idx_tests_story ON tests(story_key)")
    conn.execute("""
        CREATE TABLE IF NOT EXISTS feedback (
            id         INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp  TEXT NOT NULL,
            story_key  TEXT NOT NULL,
            chunk_id   TEXT NOT NULL,
            source     TEXT NOT NULL
        )
    """)
    conn.execute("CREATE INDEX IF NOT EXISTS idx_feedback_chunk ON feedback(chunk_id)")
    conn.execute("""
        CREATE TABLE IF NOT EXISTS execution_feedback (
            id         INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp  TEXT NOT NULL,
            story_key  TEXT NOT NULL,
            chunk_id   TEXT NOT NULL,
            outcome    TEXT NOT NULL,
            source     TEXT NOT NULL DEFAULT 'xray'
        )
    """)
    conn.execute("CREATE INDEX IF NOT EXISTS idx_exec_fb_chunk ON execution_feedback(chunk_id)")
    conn.execute("""
        CREATE TABLE IF NOT EXISTS generation_history (
            id                INTEGER PRIMARY KEY AUTOINCREMENT,
            story_key         TEXT NOT NULL,
            story_content_hash TEXT NOT NULL,
            story_content     TEXT NOT NULL,
            generated_json    TEXT NOT NULL,
            created_at        TEXT NOT NULL,
            module            TEXT NOT NULL DEFAULT '',
            framework         TEXT NOT NULL DEFAULT ''
        )
    """)
    # Backfill columns for installs that pre-date Roadmap #57 (no module/framework).
    # ALTER TABLE ADD COLUMN is non-destructive; we check first so a clean install
    # with the columns already in CREATE doesn't error.
    existing_cols = {r[1] for r in conn.execute("PRAGMA table_info(generation_history)").fetchall()}
    if "module" not in existing_cols:
        conn.execute("ALTER TABLE generation_history ADD COLUMN module TEXT NOT NULL DEFAULT ''")
    if "framework" not in existing_cols:
        conn.execute("ALTER TABLE generation_history ADD COLUMN framework TEXT NOT NULL DEFAULT ''")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_gen_hist_story ON generation_history(story_key)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_gen_hist_lookup ON generation_history(story_key, module, framework)")
    conn.commit()
    conn.close()


# ── User data DB (history + stats) ───────────────────────────────────────────

def _init_userdata_db() -> None:
    """Create userdata.db tables on first run (idempotent)."""
    conn = sqlite3.connect(str(USERDATA_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("""
        CREATE TABLE IF NOT EXISTS history (
            id         INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id    TEXT NOT NULL,
            entry_id   INTEGER NOT NULL,
            created_at TEXT NOT NULL,
            payload    TEXT NOT NULL,
            UNIQUE(user_id, entry_id)
        )
    """)
    conn.execute("CREATE INDEX IF NOT EXISTS idx_history_user ON history(user_id, created_at DESC)")
    conn.execute("""
        CREATE TABLE IF NOT EXISTS stats (
            user_id          TEXT PRIMARY KEY,
            analyses         INTEGER DEFAULT 0,
            cas              INTEGER DEFAULT 0,
            risques          INTEGER DEFAULT 0,
            total_gen_time   REAL    DEFAULT 0.0,
            updated_at       TEXT    NOT NULL
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS contact_messages (
            id         INTEGER PRIMARY KEY AUTOINCREMENT,
            name       TEXT NOT NULL,
            email      TEXT NOT NULL,
            message    TEXT NOT NULL,
            ip         TEXT,
            created_at TEXT NOT NULL
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS password_resets (
            id         INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id    TEXT NOT NULL,
            token_hash TEXT NOT NULL UNIQUE,
            expires_at TEXT NOT NULL,
            created_at TEXT NOT NULL
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id         TEXT PRIMARY KEY,
            username   TEXT NOT NULL UNIQUE,
            email      TEXT NOT NULL DEFAULT '',
            password_hash TEXT NOT NULL,
            api_key    TEXT,
            role       TEXT NOT NULL DEFAULT 'user',
            active     INTEGER NOT NULL DEFAULT 1,
            created_at TEXT NOT NULL
        )
    """)
    conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_username ON users(username)")
    conn.execute("""
        CREATE TABLE IF NOT EXISTS server_jobs (
            job_id     TEXT PRIMARY KEY,
            job_type   TEXT NOT NULL DEFAULT 'webhook',
            status     TEXT NOT NULL DEFAULT 'queued',
            story_key  TEXT,
            config     TEXT,
            result     TEXT,
            error      TEXT,
            created_at TEXT NOT NULL,
            updated_at TEXT NOT NULL
        )
    """)
    conn.execute("CREATE INDEX IF NOT EXISTS idx_server_jobs_type ON server_jobs(job_type)")
    conn.commit()
    conn.close()


def _job_save(job_id: str, job_type: str, status: str, story_key: str = "",
              config: dict = None, result: str = None, error: str = None) -> None:
    """Upsert a job into the persistent store."""
    now = datetime.datetime.now(datetime.timezone.utc).isoformat()
    conn = sqlite3.connect(str(USERDATA_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    try:
        conn.execute(
            """INSERT INTO server_jobs (job_id, job_type, status, story_key, config, result, error, created_at, updated_at)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
               ON CONFLICT(job_id) DO UPDATE SET
                   status=excluded.status, result=excluded.result, error=excluded.error, updated_at=excluded.updated_at""",
            (job_id, job_type, status, story_key,
             json.dumps(config) if config else None,
             result, error, now, now),
        )
        conn.commit()
    finally:
        conn.close()


def _job_get(job_id: str) -> dict | None:
    """Get a job from persistent store."""
    conn = sqlite3.connect(str(USERDATA_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    conn.row_factory = sqlite3.Row
    try:
        row = conn.execute("SELECT * FROM server_jobs WHERE job_id = ?", (job_id,)).fetchone()
        return dict(row) if row else None
    finally:
        conn.close()


def _job_list_recent(job_type: str = None, limit: int = 20) -> list:
    """List recent jobs."""
    conn = sqlite3.connect(str(USERDATA_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    conn.row_factory = sqlite3.Row
    try:
        if job_type:
            rows = conn.execute(
                "SELECT * FROM server_jobs WHERE job_type = ? ORDER BY created_at DESC LIMIT ?",
                (job_type, limit)).fetchall()
        else:
            rows = conn.execute(
                "SELECT * FROM server_jobs ORDER BY created_at DESC LIMIT ?",
                (limit,)).fetchall()
        return [dict(r) for r in rows]
    finally:
        conn.close()


def _ud_conn():
    conn = sqlite3.connect(str(USERDATA_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    conn.row_factory = sqlite3.Row
    return conn


def _history_list(user_id: str, limit: int = 20) -> list:
    with _ud_conn() as conn:
        rows = conn.execute(
            "SELECT payload FROM history WHERE user_id=? ORDER BY created_at DESC LIMIT ?",
            (user_id, limit)
        ).fetchall()
    return [json.loads(r["payload"]) for r in rows]


def _history_save(user_id: str, entry: dict) -> None:
    """Upsert one history entry (keyed by entry['id'])."""
    with _ud_conn() as conn:
        conn.execute(
            """INSERT INTO history(user_id, entry_id, created_at, payload)
               VALUES(?,?,?,?)
               ON CONFLICT(user_id, entry_id) DO UPDATE SET payload=excluded.payload, created_at=excluded.created_at""",
            (user_id, int(entry.get("id", 0)), entry.get("date", ""), json.dumps(entry))
        )
        # Keep only the 20 most recent per user
        conn.execute(
            """DELETE FROM history WHERE user_id=? AND entry_id NOT IN (
                   SELECT entry_id FROM history WHERE user_id=? ORDER BY created_at DESC LIMIT 20
               )""",
            (user_id, user_id)
        )


def _history_delete(user_id: str, entry_id: int) -> bool:
    with _ud_conn() as conn:
        cur = conn.execute(
            "DELETE FROM history WHERE user_id=? AND entry_id=?",
            (user_id, entry_id)
        )
    return cur.rowcount > 0


def _stats_get(user_id: str) -> dict:
    with _ud_conn() as conn:
        row = conn.execute("SELECT * FROM stats WHERE user_id=?", (user_id,)).fetchone()
    if not row:
        return {"analyses": 0, "cas": 0, "risques": 0, "totalGenTimeSec": 0.0}
    return {
        "analyses":       row["analyses"],
        "cas":            row["cas"],
        "risques":        row["risques"],
        "totalGenTimeSec": row["total_gen_time"],
    }


def _stats_save(user_id: str, s: dict) -> None:
    with _ud_conn() as conn:
        conn.execute(
            """INSERT INTO stats(user_id, analyses, cas, risques, total_gen_time, updated_at)
               VALUES(?,?,?,?,?,?)
               ON CONFLICT(user_id) DO UPDATE SET
                   analyses=excluded.analyses,
                   cas=excluded.cas,
                   risques=excluded.risques,
                   total_gen_time=excluded.total_gen_time,
                   updated_at=excluded.updated_at""",
            (user_id,
             int(s.get("analyses", 0)),
             int(s.get("cas", 0)),
             int(s.get("risques", 0)),
             float(s.get("totalGenTimeSec", 0)),
             datetime.datetime.now(datetime.timezone.utc).isoformat() + "Z")
        )


def _db_get_covered(story_key: str) -> list:
    """Return all indexed tests for a story as a list of dicts."""
    conn = sqlite3.connect(str(COVERAGE_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    conn.row_factory = sqlite3.Row
    rows = conn.execute(
        "SELECT * FROM tests WHERE story_key = ? ORDER BY pushed_at DESC",
        (story_key,),
    ).fetchall()
    conn.close()
    return [dict(r) for r in rows]


def _db_upsert_tests(story_key: str, tests: list) -> None:
    """
    Insert tests into coverage.db (skip duplicates by test id).
    tests: list of {id, title, module, source}
    """
    now = datetime.datetime.now(datetime.timezone.utc).isoformat()
    conn = sqlite3.connect(str(COVERAGE_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute(
        "INSERT OR IGNORE INTO stories (story_key, total_tests, last_updated) VALUES (?, 0, ?)",
        (story_key, now),
    )
    for t in tests:
        conn.execute(
            "INSERT OR IGNORE INTO tests (id, story_key, title, module, source, vector_id, pushed_at) "
            "VALUES (?, ?, ?, ?, ?, ?, ?)",
            (t["id"], story_key, t["title"],
             t.get("module", ""), t.get("source", "generated"), t.get("vector_id", ""), now),
        )
    conn.execute(
        "UPDATE stories SET "
        "total_tests = (SELECT COUNT(*) FROM tests WHERE story_key = ?), "
        "last_updated = ? WHERE story_key = ?",
        (story_key, now, story_key),
    )
    conn.commit()
    conn.close()


def _db_insert_feedback(story_key: str, chunk_ids: list) -> None:
    """
    Record positive feedback for RAG chunks that contributed to a successful
    Xray push. Each chunk_id is stored with the story key and timestamp.

    This data is used to boost retrieval scores for chunks that have previously
    contributed to accepted test generations.
    """
    if not chunk_ids:
        return
    now = datetime.datetime.now(datetime.UTC).isoformat().replace("+00:00", "Z")
    conn = sqlite3.connect(str(COVERAGE_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    try:
        for cid in chunk_ids:
            if isinstance(cid, dict):
                chunk_id = cid.get("id", "")
                source = cid.get("source", "")
            else:
                chunk_id = str(cid)
                source = ""
            if chunk_id:
                conn.execute(
                    "INSERT INTO feedback (timestamp, story_key, chunk_id, source) VALUES (?, ?, ?, ?)",
                    (now, story_key, chunk_id, source),
                )
        conn.commit()
    finally:
        conn.close()


def _db_record_execution(story_key: str, chunk_ids: list, outcome: str) -> None:
    """Record test execution outcome (pass/fail) for RAG chunks."""
    conn = sqlite3.connect(str(COVERAGE_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    now = datetime.datetime.now(datetime.timezone.utc).isoformat() + "Z"
    for cid in chunk_ids:
        conn.execute(
            "INSERT INTO execution_feedback (timestamp, story_key, chunk_id, outcome) VALUES (?, ?, ?, ?)",
            (now, story_key, cid, outcome),
        )
    conn.commit()
    conn.close()


def _db_get_chunk_boost(chunk_id: str) -> float:
    """Compute a score boost for a RAG chunk based on execution history.

    Each pass within 90 days: +0.05
    Each fail within 90 days: -0.03
    Older results: half weight.
    """
    conn = sqlite3.connect(str(COVERAGE_DB))
    conn.execute("PRAGMA journal_mode=WAL")
    conn.row_factory = sqlite3.Row
    rows = conn.execute(
        "SELECT outcome, timestamp FROM execution_feedback WHERE chunk_id = ?",
        (chunk_id,),
    ).fetchall()
    conn.close()

    if not rows:
        return 0.0

    cutoff = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=90)).isoformat()
    boost = 0.0
    for r in rows:
        weight = 1.0 if r["timestamp"] >= cutoff else 0.5
        if r["outcome"] == "pass":
            boost += 0.05 * weight
        elif r["outcome"] == "fail":
            boost -= 0.03 * weight
    return round(boost, 4)


def _enrich_system(system: str, user_text: str) -> str:
    """Append RAG context and few-shot examples to the system prompt."""
    # Few-shot examples from approved pairs (Roadmap 1.2)
    try:
        examples = fine_tuning.get_approved_examples(limit=3)
        if examples:
            shots = []
            for ex in examples:
                msgs = ex["messages"]
                user_msg = next((m["content"] for m in msgs if m["role"] == "user"), "")
                asst_msg = next((m["content"] for m in msgs if m["role"] == "assistant"), "")
                if user_msg and asst_msg:
                    shots.append(f"User: {user_msg}\nAssistant: {asst_msg}")
            if shots:
                system += "\n\n## FEW-SHOT EXAMPLES (approved quality — match this style):\n\n" + "\n\n---\n\n".join(shots)
                print(f"  📚 Few-shot: {len(shots)} approved examples injected")
    except Exception as e:
        print(f"  ⚠  Few-shot injection failed: {e}")

    # RAG enrichment — query all sources (Jira, Confluence, Swagger, web, codebase)
    rag_query = user_text[:800] if len(user_text) > 800 else user_text
    hits = _query_rag(rag_query)
    context = _hits_to_context(hits)
    if not context:
        return system
    print(f"  🔍 RAG: {len(hits)} chunks injected (top score={hits[0].get('score',0):.2f})")
    return (
        system
        + "\n\n## RETRIEVED CONTEXT (Jira / Confluence / Swagger / Web / Codebase — use this to enrich your output):\n\n"
        + context
    )


# ── Multi-pass self-review (delegated to generation.py) ──────────────


_WEBHOOK_MODULE_SCHEMAS = {
    "cas": (
        '"cas": [{"title":"","preconditions":"","steps":[""],"expectedResult":"","priority":"Medium","type":"Functional"}]'
    ),
    "gherkin": (
        '"gherkin": {"feature":"","scenarios":[{"title":"","tags":[""],"steps":{"given":[""],"when":[""],"then":[""]}}]}'
    ),
    "risques": (
        '"risques": [{"category":"","risk":"","severity":"","mitigation":""}]'
    ),
    "code": (
        '"code": {"framework":"","language":"","body":""}'
    ),
}

def _build_webhook_system(style: str, framework: str, modules: list) -> str:
    """Build a compact system prompt for webhook (non-SSE) generation."""
    module_keys = [m for m in modules if m in _WEBHOOK_MODULE_SCHEMAS]
    if not module_keys:
        module_keys = ["cas"]
    schema_parts = ", ".join(_WEBHOOK_MODULE_SCHEMAS[m] for m in module_keys)
    schema = "{" + schema_parts + "}"

    style_note = {
        "bdd":      "Use BDD style (Given/When/Then) for test case steps.",
        "atdd":      "Use ATDD style (Action/TestData/Expected Result) for test case steps.",
        "tdd":      "Use TDD style focusing on unit-level test cases.",
        "standard": "Use standard test case format step by step.",
    }.get(style, "Use standard test case format step by step.")

    fw_note = f"Target framework: {framework}." if "code" in module_keys else ""

    return (
        f"Act as a expert QA Engineer with strong experience in:\n"
        f"- Boundary Value Analysis\n"
        f"- Equivalence Partitioning\n"
        f"- Negative Testing\n"
        f"- Edge Case Identification\n\n"

        f"{style_note} {fw_note}\n\n"

        f"Analyze the user story provided in the user message and generate comprehensive test artifacts.\n\n"

        f"Instructions:\n"
        f"- Ensure full coverage of all acceptance criteria\n"
        f"- Include positive, negative, and edge test cases\n"
        f"- Include security and validation scenarios where applicable\n"
        f"- Identify missing or ambiguous requirements\n"
        f"- Think step-by-step before generating the final output\n\n"

        f"Return ONLY valid JSON matching exactly this schema:\n{schema}\n"
        f"No markdown, no explanation — raw JSON only."
    )


# ── Fine-tuning state ─────────────────────────────────────────────────────────
_ft: dict = {
    "status": "idle",    # idle | assembling | training | done | error
    "log": [],           # ring buffer, max 500 lines
    "proc": None,        # subprocess.Popen | None
    "lock": threading.Lock(),
}


# ── GDPR data export / deletion ────────────────────────────────────────────

def _gdpr_export_data(user: dict) -> dict:
    """Export all personal data for a user as a JSON-serialisable dict."""
    uid = user["id"]
    export = {
        "user": {
            "id": uid,
            "username": user.get("username", ""),
            "email": user.get("email", ""),
            "role": user.get("role", ""),
            "created_at": user.get("created_at", ""),
        },
        "history": [],
        "stats": {},
        "contact_messages": [],
    }
    conn = sqlite3.connect(str(USERDATA_DB))
    try:
        rows = conn.execute("SELECT id, entry_id, created_at, payload FROM history WHERE user_id = ?", (uid,)).fetchall()
        export["history"] = [{"id": r[0], "entry_id": r[1], "created_at": r[2], "payload": r[3]} for r in rows]
        row = conn.execute("SELECT analyses, cas, risques, total_gen_time, updated_at FROM stats WHERE user_id = ?", (uid,)).fetchone()
        if row:
            export["stats"] = {"analyses": row[0], "cas": row[1], "risques": row[2], "total_gen_time": row[3], "updated_at": row[4]}
        msgs = conn.execute("SELECT id, name, email, message, created_at FROM contact_messages WHERE email = ?", (user.get("email", ""),)).fetchall()
        export["contact_messages"] = [{"id": m[0], "name": m[1], "email": m[2], "message": m[3], "created_at": m[4]} for m in msgs]
    finally:
        conn.close()
    return export


def _gdpr_delete_data(user: dict) -> dict:
    """Delete all personal data for a user. Returns summary of what was deleted."""
    uid = user["id"]
    conn = sqlite3.connect(str(USERDATA_DB))
    try:
        conn.execute("DELETE FROM history WHERE user_id = ?", (uid,))
        conn.execute("DELETE FROM stats WHERE user_id = ?", (uid,))
        conn.execute("DELETE FROM contact_messages WHERE email = ?", (user.get("email", ""),))
        conn.execute("DELETE FROM password_resets WHERE user_id = ?", (uid,))
        conn.commit()
    finally:
        conn.close()
    with _USERS_LOCK:
        users = _load_users()
        users = [u for u in users if u["id"] != uid]
        _save_users(users)
    return {"deleted": True, "user_id": uid}


def _sso_discover_for_email(email: str) -> dict:
    """Look up an org by email domain. Returns:
    - {"kind": "redirect", "url": "/auth/<slug>/login"} if SSO is configured.
    - {"kind": "local"} otherwise.
    """
    if "@" not in email:
        return {"kind": "local"}
    domain = email.split("@", 1)[1].lower().strip()
    if not domain:
        return {"kind": "local"}
    import sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT slug FROM organizations WHERE LOWER(domain)=? AND deleted_at IS NULL",
            (domain,),
        ).fetchone()
    finally:
        conn.close()
    if not row:
        return {"kind": "local"}
    slug = row[0]
    cfg = identity._get_idp_config_or_none(slug)
    if cfg and cfg["enabled"]:
        return {"kind": "redirect", "url": f"/auth/{slug}/login"}
    return {"kind": "local"}


def _scim_json_response(handler, payload: dict | None, *, status: int = 200) -> None:
    """Write a SCIM-shaped JSON response with the right content-type."""
    handler.send_response(status)
    handler.send_header("Content-Type", "application/scim+json")
    body = json.dumps(payload or {}).encode("utf-8")
    handler.send_header("Content-Length", str(len(body)))
    handler.end_headers()
    handler.wfile.write(body)


def _compute_me_payload(*, user_id: str) -> dict:
    """Return the /api/me payload — user identity + orgs membership + current org.

    The "current org" is the user's org with the lowest-id active membership
    (deterministic). When the user is in multiple orgs the frontend can switch
    via /api/projects?org=<slug>.
    """
    import sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        urow = conn.execute(
            "SELECT id, username, email, is_system_admin FROM users "
            "WHERE id=? AND deleted_at IS NULL",
            (user_id,),
        ).fetchone()
        if urow is None:
            return {}
        org_rows = conn.execute(
            "SELECT o.id, o.slug, o.name FROM organizations o "
            "JOIN org_members om ON om.org_id = o.id "
            "WHERE om.user_id=? AND om.deleted_at IS NULL "
            "AND o.deleted_at IS NULL ORDER BY o.created_at",
            (user_id,),
        ).fetchall()
    finally:
        conn.close()
    is_sysadmin = bool(urow[3])
    orgs: list[dict] = []
    for r in org_rows:
        oid, oslug, oname = r[0], r[1], r[2]
        # Sysadmins have everything regardless of role membership; short-circuit
        # to avoid the per-code authz round-trip.
        if is_sysadmin:
            perms = {code: True for code in _ORG_PERM_CODES_FOR_FRONTEND}
        else:
            perms = _compute_org_permissions(user_id=user_id, org_id=oid)
        orgs.append({"id": oid, "slug": oslug, "name": oname, "permissions": perms})
    return {
        "user": {
            "id": urow[0], "username": urow[1] or "",
            "email": urow[2] or "", "is_system_admin": is_sysadmin,
        },
        "orgs": orgs,
        "current_org": orgs[0] if orgs else None,
    }


# The set of permission codes whose state we report on a per-project basis
# in /api/projects. Frontend uses these to gate tabs/cards. Adding a code
# here is cheap; removing one is a frontend-breaking change.
_PROJECT_PERM_CODES_FOR_FRONTEND = (
    "agent.view", "agent.use",
    "generation.view", "generation.use",
    "tests.view", "tests.use",
    "runs.view", "runs.use",
    "coverage.view", "coverage.use",
    "perf.view", "perf.use",
    "visual_regression.view", "visual_regression.use",
    "mobile.view", "mobile.use",
    "fine_tuning.view", "fine_tuning.use",
    "mcp.view", "mcp.use",
    "workspace.view", "workspace.use",
    "reports.view", "reports.use",
    "members.view", "members.manage",
    "settings.view", "settings.manage",
)


def _compute_project_permissions(*, user_id: str, project_id: str) -> dict[str, bool]:
    import authz
    out: dict[str, bool] = {}
    for code in _PROJECT_PERM_CODES_FOR_FRONTEND:
        out[code] = authz.can(user_id=user_id, action=code, project_id=project_id)
    return out


# Per-org permission codes the frontend uses to gate menu/page visibility.
# Anything an admin/owner can see in the Settings dropdown should be listed
# here; pages further restrict by re-checking on the API call itself.
_ORG_PERM_CODES_FOR_FRONTEND: tuple[str, ...] = (
    "org.settings.view", "org.settings.manage",
    "org.projects.create",
    "org.roles.view", "org.roles.manage",
    "org.sso.manage",
    "org.group_mappings.view", "org.group_mappings.manage",
    "org.audit.view",
    "org.trash.view",
    "org.members.view", "org.members.manage",
)


def _compute_org_permissions(*, user_id: str, org_id: str) -> dict[str, bool]:
    """Map every org-level perm code to ``True`` / ``False`` for the user.

    Returned in /api/me so the frontend can hide menu items the user can't use.
    """
    import authz
    return {
        code: authz.can(user_id=user_id, action=code, org_id=org_id)
        for code in _ORG_PERM_CODES_FOR_FRONTEND
    }


def _project_health(project_id: str, project_slug: str) -> dict:
    """Compute health metrics. Today's test_runs.db uses a loose 'project' string
    column, not project_id (Plan 1 carry-over). We match by project slug; if no
    matching rows exist we return zeros."""
    import sqlite3 as _s
    import os as _os
    test_runs_db = _os.path.join(
        _os.path.dirname(USERDATA_DB), "test_runs.db",
    )
    health = {"pass_rate": 0.0, "runs_7d": 0, "flaky_count": 0}
    if not _os.path.exists(test_runs_db):
        return health
    try:
        conn = _s.connect(test_runs_db)
        try:
            now_iso = datetime.datetime.now(datetime.UTC).isoformat()
            seven_days_ago = (
                datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=7)
            ).isoformat()
            row = conn.execute(
                "SELECT COUNT(*), "
                "       SUM(CASE WHEN passed=1 THEN 1 ELSE 0 END) "
                "FROM test_runs WHERE project=? AND created_at >= ?",
                (project_slug, seven_days_ago),
            ).fetchone()
            if row and row[0]:
                health["runs_7d"] = row[0]
                health["pass_rate"] = (row[1] or 0) / row[0]
            health["flaky_count"] = conn.execute(
                "SELECT COUNT(DISTINCT test_name) FROM test_runs "
                "WHERE project=? AND flaky=1 AND created_at >= ?",
                (project_slug, seven_days_ago),
            ).fetchone()[0] or 0
        finally:
            conn.close()
    except _s.Error:
        pass
    return health


def _compute_projects_payload(*, user_id: str) -> dict:
    import authz, sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        urow = conn.execute(
            "SELECT is_system_admin FROM users WHERE id=? AND deleted_at IS NULL",
            (user_id,),
        ).fetchone()
        is_sys_admin = bool(urow and urow[0])
        if is_sys_admin:
            rows = conn.execute(
                "SELECT p.id, p.slug, p.name, o.slug, o.name "
                "FROM projects p JOIN organizations o ON o.id=p.org_id "
                "WHERE p.deleted_at IS NULL AND o.deleted_at IS NULL "
                "ORDER BY p.created_at"
            ).fetchall()
            roles_by_proj: dict[str, list[str]] = {}
        else:
            rows = conn.execute(
                "SELECT DISTINCT p.id, p.slug, p.name, o.slug, o.name "
                "FROM projects p "
                "JOIN organizations o ON o.id=p.org_id "
                "JOIN project_member_roles pmr ON pmr.project_id=p.id "
                "WHERE pmr.user_id=? AND pmr.deleted_at IS NULL "
                "AND p.deleted_at IS NULL AND o.deleted_at IS NULL "
                "ORDER BY p.created_at",
                (user_id,),
            ).fetchall()
            role_rows = conn.execute(
                "SELECT pmr.project_id, r.name FROM project_member_roles pmr "
                "JOIN roles r ON r.id=pmr.role_id "
                "WHERE pmr.user_id=? AND pmr.deleted_at IS NULL",
                (user_id,),
            ).fetchall()
            roles_by_proj = {}
            for pid, rname in role_rows:
                roles_by_proj.setdefault(pid, []).append(rname)
    finally:
        conn.close()

    projects: list[dict] = []
    for pid, pslug, pname, oslug, oname in rows:
        my_roles = roles_by_proj.get(pid, ["System Admin"] if is_sys_admin else [])
        projects.append({
            "id": pid, "slug": pslug, "name": pname,
            "org_slug": oslug, "org_name": oname,
            "my_roles": my_roles,
            "health": _project_health(pid, pslug),
            "permissions": _compute_project_permissions(user_id=user_id, project_id=pid),
        })

    # KPIs aggregate across all returned projects
    kpis = {
        "projects": len(projects),
        "tests_run_7d": sum(p["health"]["runs_7d"] for p in projects),
        "pass_rate": (
            sum(p["health"]["pass_rate"] * p["health"]["runs_7d"] for p in projects)
            / max(1, sum(p["health"]["runs_7d"] for p in projects))
        ),
        "flaky": sum(p["health"]["flaky_count"] for p in projects),
    }
    return {"projects": projects, "kpis": kpis}


def _resolve_project_by_slug_for_user(*, user_id: str, project_slug: str) -> tuple[str, str] | None:
    """Returns (project_id, org_id) if the user can access the project, else None.

    Access = (a) is_system_admin, OR (b) has at least one role in the project.
    Returns None for unknown slugs OR projects the user has no role in (don't leak existence).
    """
    import sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        urow = conn.execute(
            "SELECT is_system_admin FROM users WHERE id=? AND deleted_at IS NULL", (user_id,),
        ).fetchone()
        is_sys = bool(urow and urow[0])
        prow = conn.execute(
            "SELECT id, org_id FROM projects WHERE slug=? AND deleted_at IS NULL",
            (project_slug,),
        ).fetchone()
        if not prow:
            return None
        pid, oid = prow
        if is_sys:
            return pid, oid
        member = conn.execute(
            "SELECT 1 FROM project_member_roles WHERE project_id=? AND user_id=? "
            "AND deleted_at IS NULL LIMIT 1",
            (pid, user_id),
        ).fetchone()
        return (pid, oid) if member else None
    finally:
        conn.close()


def _compute_project_detail_payload(*, user_id: str, project_slug: str) -> dict | None:
    import sqlite3 as _s
    pair = _resolve_project_by_slug_for_user(user_id=user_id, project_slug=project_slug)
    if not pair:
        return None
    pid, oid = pair
    conn = _s.connect(USERDATA_DB)
    try:
        prow = conn.execute(
            "SELECT id, slug, name, description, created_at, org_id FROM projects WHERE id=?",
            (pid,),
        ).fetchone()
        orow = conn.execute(
            "SELECT slug, name FROM organizations WHERE id=?", (oid,),
        ).fetchone()
        rows = conn.execute(
            "SELECT r.name FROM project_member_roles pmr "
            "JOIN roles r ON r.id=pmr.role_id "
            "WHERE pmr.project_id=? AND pmr.user_id=? AND pmr.deleted_at IS NULL",
            (pid, user_id),
        ).fetchall()
    finally:
        conn.close()
    return {
        "id": prow[0], "slug": prow[1], "name": prow[2],
        "description": prow[3] or "", "created_at": prow[4],
        "org_slug": orow[0], "org_name": orow[1],
        "my_roles": [r[0] for r in rows] or (["System Admin"] if user_id else []),
        "health": _project_health(pid, prow[1]),
        "permissions": _compute_project_permissions(user_id=user_id, project_id=pid),
    }


def _compute_project_metrics_payload(
    *, user_id: str, project_slug: str, range_days: int = 30,
) -> dict | None:
    """Returns trend + kpis. Requires tests.view; 403-equivalent if not."""
    import authz, datetime as _dt, os as _os, sqlite3 as _s
    pair = _resolve_project_by_slug_for_user(user_id=user_id, project_slug=project_slug)
    if not pair:
        return None
    pid, _oid = pair
    if not authz.can(user_id=user_id, action="tests.view", project_id=pid):
        return {"_forbidden": True, "missing": "tests.view"}

    test_runs_db = _os.path.join(_os.path.dirname(USERDATA_DB), "test_runs.db")
    pass_rate_trend: list[dict] = []
    runs_per_day: list[dict] = []
    kpis = {"pass_rate": 0.0, "runs_7d": 0, "flaky_count": 0, "coverage_pct": 0}
    if _os.path.exists(test_runs_db):
        conn = _s.connect(test_runs_db)
        try:
            now = _dt.datetime.now(_dt.UTC)
            for d in range(range_days, -1, -1):
                day = (now - _dt.timedelta(days=d)).date().isoformat()
                row = conn.execute(
                    "SELECT COUNT(*), SUM(CASE WHEN passed=1 THEN 1 ELSE 0 END) "
                    "FROM test_runs WHERE project=? AND DATE(created_at)=?",
                    (project_slug, day),
                ).fetchone()
                cnt = row[0] or 0
                pr = (row[1] or 0) / cnt if cnt else 0.0
                pass_rate_trend.append({"date": day, "rate": round(pr, 3)})
                runs_per_day.append({"date": day, "count": cnt})
            kpis = {
                **_project_health(pid, project_slug),
                "coverage_pct": 0,  # coverage metric not yet wired (out of scope for Plan 5)
            }
        finally:
            conn.close()
    return {
        "pass_rate_trend": pass_rate_trend,
        "runs_per_day": runs_per_day,
        "kpis": kpis,
    }


def _compute_project_members_payload(
    *, user_id: str, project_slug: str,
) -> dict | None:
    """Returns the members list. Requires members.view.

    The response also includes ``can_manage`` (whether the calling user has
    ``members.manage`` on this project) so the UI can hide/show invite +
    role-edit controls without a second round-trip. Each member's ``roles``
    is a list of ``{id, name}`` so the frontend can PATCH by role_id.
    """
    import authz, sqlite3 as _s
    pair = _resolve_project_by_slug_for_user(user_id=user_id, project_slug=project_slug)
    if not pair:
        return None
    pid, _oid = pair
    if not authz.can(user_id=user_id, action="members.view", project_id=pid):
        return {"_forbidden": True, "missing": "members.view"}
    can_manage = authz.can(user_id=user_id, action="members.manage", project_id=pid)
    conn = _s.connect(USERDATA_DB)
    try:
        rows = conn.execute(
            """
            SELECT u.id, u.username, u.email,
                   pm.joined_at,
                   (SELECT idp_kind FROM external_identities ei
                     WHERE ei.user_id=u.id LIMIT 1) AS idp_kind
              FROM project_members pm
              JOIN users u ON u.id=pm.user_id
             WHERE pm.project_id=? AND pm.deleted_at IS NULL AND u.deleted_at IS NULL
             ORDER BY pm.joined_at
            """,
            (pid,),
        ).fetchall()
        out: list[dict] = []
        for uid, uname, email, joined, idp_kind in rows:
            role_rows = conn.execute(
                "SELECT r.id, r.name FROM project_member_roles pmr "
                "JOIN roles r ON r.id=pmr.role_id "
                "WHERE pmr.project_id=? AND pmr.user_id=? AND pmr.deleted_at IS NULL "
                "ORDER BY r.name",
                (pid, uid),
            ).fetchall()
            out.append({
                "user_id": uid, "username": uname or "",
                "email": email or "",
                "roles": [r[1] for r in role_rows],
                "role_ids": [r[0] for r in role_rows],
                "added_at": joined, "last_seen_at": None,  # populated in Plan 6
                "idp_kind": idp_kind or "local",
            })
    finally:
        conn.close()
    return {"members": out, "can_manage": can_manage}


def _compute_assignable_roles_payload(
    *, user_id: str, project_slug: str,
) -> dict | None:
    """Roles available to assign on this project: system roles + org roles for the project's org.

    Requires ``members.manage`` on the project.
    """
    import authz, sqlite3 as _s
    pair = _resolve_project_by_slug_for_user(user_id=user_id, project_slug=project_slug)
    if not pair:
        return None
    pid, oid = pair
    if not authz.can(user_id=user_id, action="members.manage", project_id=pid):
        return {"_forbidden": True, "missing": "members.manage"}
    conn = _s.connect(USERDATA_DB)
    try:
        rows = conn.execute(
            "SELECT id, name, description, is_system FROM roles "
            "WHERE deleted_at IS NULL AND (org_id IS NULL OR org_id=?) "
            "ORDER BY is_system DESC, name",
            (oid,),
        ).fetchall()
    finally:
        conn.close()
    return {"roles": [{"id": r[0], "name": r[1], "description": r[2], "is_system": bool(r[3])} for r in rows]}


_INVITE_TOKEN_TTL_DAYS = 7
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")


_MIN_PASSWORD_LEN = 8


def _accept_invite(*, raw_token: str, password: str,
                   ip: str | None, user_agent: str | None) -> dict:
    """Verify a password-setup token, set the user's password, activate the
    account, and create a session. Returns ``{user, session_token}`` on success
    or ``{"_error": "..."}`` on failure (bad/expired token, weak password, etc.).
    """
    import hashlib as _h
    import sqlite3 as _s
    if not raw_token or not password:
        return {"_error": "token and password are required"}
    if len(password) < _MIN_PASSWORD_LEN:
        return {"_error": f"password must be at least {_MIN_PASSWORD_LEN} characters"}
    token_hash = _h.sha256(raw_token.encode()).hexdigest()
    now_dt = datetime.datetime.now(datetime.UTC)
    now_iso = now_dt.isoformat().replace("+00:00", "Z")
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT t.id, t.user_id, t.expires_at, t.used_at, "
            "       u.username, u.email, u.deleted_at "
            "  FROM password_setup_tokens t "
            "  JOIN users u ON u.id = t.user_id "
            " WHERE t.token_hash=?",
            (token_hash,),
        ).fetchone()
        if not row:
            return {"_error": "invalid or unknown token"}
        tid, uid, expires_at, used_at, username, email, deleted_at = row
        if used_at is not None:
            return {"_error": "this invite has already been used"}
        if deleted_at is not None:
            return {"_error": "user no longer exists"}
        if expires_at < now_iso:
            return {"_error": "this invite has expired"}
        # Set the password, activate the user, mark token used.
        conn.execute(
            "UPDATE users SET password_hash=?, active=1 WHERE id=?",
            (_hash_password(password), uid),
        )
        conn.execute(
            "UPDATE password_setup_tokens SET used_at=? WHERE id=?",
            (now_iso, tid),
        )
        conn.commit()
    finally:
        conn.close()
    # Mint a session token so the user is logged in immediately.
    session_token = identity._sessions.create_session(  # type: ignore[attr-defined]
        USERDATA_DB,
        user_id=uid,
        lifetime_seconds=30 * 24 * 60 * 60,
        ip=ip, user_agent=user_agent,
    )
    try:
        authz.audit(
            actor_user_id=uid, action="auth.invite_accepted",
            target_type="user", target_id=uid,
        )
    except Exception:
        pass
    return {
        "user": {"id": uid, "username": username, "email": email},
        "session_token": session_token,
    }


def _mint_password_setup_token(*, user_id: str, created_by: str | None) -> tuple[str, str]:
    """Mint a single-use password-setup token for ``user_id``.

    Returns ``(raw_token, expires_at_iso)``. Only the sha256 hash is persisted
    so the raw token must be returned to the caller now or it's lost.
    """
    import hashlib as _h
    import secrets as _secrets
    import sqlite3 as _s
    raw = _secrets.token_urlsafe(32)
    token_hash = _h.sha256(raw.encode()).hexdigest()
    now = datetime.datetime.now(datetime.UTC)
    expires = now + datetime.timedelta(days=_INVITE_TOKEN_TTL_DAYS)
    iso_now = now.isoformat().replace("+00:00", "Z")
    iso_exp = expires.isoformat().replace("+00:00", "Z")
    conn = _s.connect(USERDATA_DB)
    try:
        # Invalidate any prior outstanding tokens for this user — a fresh
        # invite supersedes the old link.
        conn.execute(
            "UPDATE password_setup_tokens SET used_at=? "
            "WHERE user_id=? AND used_at IS NULL",
            (iso_now, user_id),
        )
        conn.execute(
            "INSERT INTO password_setup_tokens"
            "(id, user_id, token_hash, created_at, created_by, expires_at) "
            "VALUES (?,?,?,?,?,?)",
            (str(uuid.uuid4()), user_id, token_hash, iso_now, created_by, iso_exp),
        )
        conn.commit()
    finally:
        conn.close()
    return raw, iso_exp


def _invite_project_member(
    *, user_id: str, project_slug: str, body: dict,
) -> dict | None:
    """Add a user to a project and assign one or more roles.

    Body: ``{"identifier": "email or username", "role_ids": ["..."]}``.
    Gated by ``members.manage``.

    If ``identifier`` matches an existing active user, they're added directly.
    If no match AND identifier looks like an email, a brand-new inactive user
    row is created and a one-time password-setup token is minted; the response
    includes ``invite_url`` (raw token in the URL) so the admin can share the
    link until SMTP delivery exists.

    Returns the new member dict, ``{"_error":"..."}`` on validation failure,
    ``{"_forbidden": True, ...}`` on permission denial, or ``None`` if the
    project is not visible to the caller.
    """
    import authz
    import authz.models as _models
    import sqlite3 as _s
    pair = _resolve_project_by_slug_for_user(user_id=user_id, project_slug=project_slug)
    if not pair:
        return None
    pid, oid = pair
    if not authz.can(user_id=user_id, action="members.manage", project_id=pid):
        return {"_forbidden": True, "missing": "members.manage"}

    identifier = (body.get("identifier") or "").strip()
    role_ids = body.get("role_ids") or []
    if not identifier:
        return {"_error": "identifier (email or username) is required"}
    if not isinstance(role_ids, list) or not role_ids:
        return {"_error": "role_ids must be a non-empty list"}

    # Look up by email or username INCLUDING soft-deleted rows so we can
    # restore them — the username column is globally UNIQUE, so a brand-new
    # INSERT with a recycled email would fail with IntegrityError.
    conn = _s.connect(USERDATA_DB)
    try:
        urow = conn.execute(
            "SELECT id, username, email, active, deleted_at, password_hash FROM users "
            "WHERE LOWER(email)=LOWER(?) OR LOWER(username)=LOWER(?)",
            (identifier, identifier),
        ).fetchone()
        valid_roles = {r[0] for r in conn.execute(
            "SELECT id FROM roles WHERE deleted_at IS NULL "
            "AND (org_id IS NULL OR org_id=?)",
            (oid,),
        ).fetchall()}
    finally:
        conn.close()

    bad = [rid for rid in role_ids if rid not in valid_roles]
    if bad:
        return {"_error": f"unknown or out-of-scope role_ids: {bad}"}

    invite_url: str | None = None
    raw_token: str | None = None
    expires_at: str | None = None
    new_user = False
    needs_invite = False  # True when the user has no password yet (new or restored)

    if urow:
        target_uid, target_username, target_email, active, deleted_at, pw_hash = urow
        # Three sub-cases:
        #   a) active + has password    → add directly, no link
        #   b) soft-deleted             → restore, deactivate, blank pw → invite again
        #   c) inactive (prior stub)    → mint a fresh token (replaces any old one)
        if deleted_at is not None or not active or not pw_hash:
            now_iso = datetime.datetime.now(datetime.UTC).isoformat().replace("+00:00", "Z")
            conn = _s.connect(USERDATA_DB)
            try:
                conn.execute(
                    "UPDATE users SET deleted_at=NULL, active=0, password_hash='' "
                    "WHERE id=?",
                    (target_uid,),
                )
                conn.commit()
            finally:
                conn.close()
            needs_invite = True
            new_user = bool(deleted_at)  # treat restored deleted users as "new" so the banner shows
    else:
        # Brand new user — only allow if identifier looks like an email so we
        # don't accidentally create stub accounts with junk usernames.
        if not _EMAIL_RE.match(identifier):
            return {"_error": f"no user matches '{identifier}'. Pass an email to invite a new user."}
        target_email = identifier.lower()
        target_username = target_email  # username is unique; email is a sane default
        target_uid = str(uuid.uuid4())
        now_iso = datetime.datetime.now(datetime.UTC).isoformat().replace("+00:00", "Z")
        conn = _s.connect(USERDATA_DB)
        try:
            # Empty password_hash is a sentinel — _verify_password will reject any input,
            # so the account is locked until accept-invite sets a real hash.
            conn.execute(
                "INSERT INTO users(id, username, email, password_hash, role, active, created_at) "
                "VALUES (?,?,?,?,?,?,?)",
                (target_uid, target_username, target_email, "", "user", 0, now_iso),
            )
            conn.commit()
        finally:
            conn.close()
        needs_invite = True
        new_user = True

    if needs_invite:
        raw_token, expires_at = _mint_password_setup_token(
            user_id=target_uid, created_by=user_id,
        )
        invite_url = f"/accept-invite?token={raw_token}"

    _models.ensure_project_member(USERDATA_DB, project_id=pid, user_id=target_uid, added_by=user_id)
    for rid in role_ids:
        _models.assign_project_role(
            USERDATA_DB, project_id=pid, user_id=target_uid, role_id=rid, assigned_by=user_id,
        )
    _models.ensure_org_member(USERDATA_DB, org_id=oid, user_id=target_uid, added_by=user_id)

    authz.audit(
        actor_user_id=user_id, org_id=oid, project_id=pid,
        action="project.member.invited",
        target_type="user", target_id=target_uid,
        details={"identifier": identifier, "role_ids": list(role_ids),
                 "new_user": new_user, "needs_invite": needs_invite},
    )
    out: dict = {
        "ok": True, "user_id": target_uid,
        "username": target_username, "email": target_email,
        "new_user": new_user,
        "needs_invite": needs_invite,
    }
    if invite_url:
        out["invite_url"] = invite_url
        out["raw_token"] = raw_token
        out["expires_at"] = expires_at
    return out


def _set_project_member_roles(
    *, user_id: str, project_slug: str, target_user_id: str, body: dict,
) -> dict | None:
    """Replace the role set for a project member. Gated by ``members.manage``.

    Body: ``{"role_ids": [...]}`` — empty list = strip all roles (caller should
    use DELETE for full removal, but we accept it as a no-op equivalent).
    """
    import authz
    import authz.models as _models
    import sqlite3 as _s
    pair = _resolve_project_by_slug_for_user(user_id=user_id, project_slug=project_slug)
    if not pair:
        return None
    pid, oid = pair
    if not authz.can(user_id=user_id, action="members.manage", project_id=pid):
        return {"_forbidden": True, "missing": "members.manage"}

    role_ids = body.get("role_ids")
    if not isinstance(role_ids, list):
        return {"_error": "role_ids must be a list"}

    now = datetime.datetime.now(datetime.UTC).isoformat().replace("+00:00", "Z")
    conn = _s.connect(USERDATA_DB)
    try:
        urow = conn.execute(
            "SELECT id FROM users WHERE id=? AND deleted_at IS NULL", (target_user_id,),
        ).fetchone()
        if not urow:
            return {"_error": "user not found"}
        valid_roles = {r[0] for r in conn.execute(
            "SELECT id FROM roles WHERE deleted_at IS NULL "
            "AND (org_id IS NULL OR org_id=?)", (oid,),
        ).fetchall()}
        bad = [rid for rid in role_ids if rid not in valid_roles]
        if bad:
            return {"_error": f"unknown or out-of-scope role_ids: {bad}"}
        current = {r[0] for r in conn.execute(
            "SELECT role_id FROM project_member_roles "
            "WHERE project_id=? AND user_id=? AND deleted_at IS NULL",
            (pid, target_user_id),
        ).fetchall()}
        to_remove = current - set(role_ids)
        for rid in to_remove:
            conn.execute(
                "UPDATE project_member_roles SET deleted_at=? "
                "WHERE project_id=? AND user_id=? AND role_id=? AND deleted_at IS NULL",
                (now, pid, target_user_id, rid),
            )
        conn.commit()
    finally:
        conn.close()

    # Adds (idempotent — restores soft-deleted rows too)
    to_add = set(role_ids) - current
    if to_add:
        _models.ensure_project_member(
            USERDATA_DB, project_id=pid, user_id=target_user_id, added_by=user_id,
        )
    for rid in to_add:
        _models.assign_project_role(
            USERDATA_DB, project_id=pid, user_id=target_user_id, role_id=rid, assigned_by=user_id,
        )

    authz.audit(
        actor_user_id=user_id, org_id=oid, project_id=pid,
        action="project.member.roles_updated",
        target_type="user", target_id=target_user_id,
        details={"added": sorted(to_add), "removed": sorted(to_remove)},
    )
    return {"ok": True, "user_id": target_user_id,
            "added": sorted(to_add), "removed": sorted(to_remove)}


def _remove_project_member(
    *, user_id: str, project_slug: str, target_user_id: str,
) -> dict | None:
    """Soft-delete a project membership and all role assignments.

    Gated by ``members.manage``. Refuses to remove the last Admin to avoid
    leaving the project unmanageable (system admins always have a back door).
    """
    import authz
    import sqlite3 as _s
    pair = _resolve_project_by_slug_for_user(user_id=user_id, project_slug=project_slug)
    if not pair:
        return None
    pid, oid = pair
    if not authz.can(user_id=user_id, action="members.manage", project_id=pid):
        return {"_forbidden": True, "missing": "members.manage"}

    now = datetime.datetime.now(datetime.UTC).isoformat().replace("+00:00", "Z")
    conn = _s.connect(USERDATA_DB)
    try:
        urow = conn.execute(
            "SELECT id FROM users WHERE id=? AND deleted_at IS NULL", (target_user_id,),
        ).fetchone()
        if not urow:
            return {"_error": "user not found"}
        # Refuse to drop the last Admin
        admin_count = conn.execute(
            "SELECT COUNT(DISTINCT pmr.user_id) FROM project_member_roles pmr "
            "JOIN roles r ON r.id=pmr.role_id "
            "WHERE pmr.project_id=? AND r.name='Admin' AND pmr.deleted_at IS NULL",
            (pid,),
        ).fetchone()[0]
        target_is_admin = conn.execute(
            "SELECT 1 FROM project_member_roles pmr JOIN roles r ON r.id=pmr.role_id "
            "WHERE pmr.project_id=? AND pmr.user_id=? AND r.name='Admin' "
            "AND pmr.deleted_at IS NULL LIMIT 1",
            (pid, target_user_id),
        ).fetchone()
        if target_is_admin and admin_count <= 1:
            return {"_error": "cannot remove the last Admin of the project"}
        conn.execute(
            "UPDATE project_member_roles SET deleted_at=? "
            "WHERE project_id=? AND user_id=? AND deleted_at IS NULL",
            (now, pid, target_user_id),
        )
        conn.execute(
            "UPDATE project_members SET deleted_at=? "
            "WHERE project_id=? AND user_id=? AND deleted_at IS NULL",
            (now, pid, target_user_id),
        )
        conn.commit()
    finally:
        conn.close()

    authz.audit(
        actor_user_id=user_id, org_id=oid, project_id=pid,
        action="project.member.removed",
        target_type="user", target_id=target_user_id,
    )
    return {"ok": True, "user_id": target_user_id}


def _compute_org_payload(*, user_id: str, org_slug: str) -> dict | None:
    import authz, sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id, name, slug, created_at, settings_json FROM organizations "
            "WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
    finally:
        conn.close()
    if not row:
        return None
    org_id = row[0]
    if not authz.can(user_id=user_id, action="org.settings.view", org_id=org_id):
        return {"_forbidden": True, "missing": "org.settings.view"}
    return {
        "id": row[0], "name": row[1], "slug": row[2],
        "created_at": row[3], "settings": json.loads(row[4] or "{}"),
    }


def _patch_org(*, user_id: str, org_slug: str, patch: dict) -> dict | None:
    import authz, sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id, name, settings_json FROM organizations "
            "WHERE slug=? AND deleted_at IS NULL", (org_slug,),
        ).fetchone()
        if not row:
            return None
        org_id = row[0]
        if not authz.can(user_id=user_id, action="org.settings.manage", org_id=org_id):
            return {"_forbidden": True, "missing": "org.settings.manage"}
        new_name = patch.get("name", row[1])
        existing = json.loads(row[2] or "{}")
        for k, v in (patch.get("settings_patch") or {}).items():
            existing[k] = v
        with conn:
            conn.execute(
                "UPDATE organizations SET name=?, settings_json=? WHERE id=?",
                (new_name, json.dumps(existing, separators=(",", ":")), org_id),
            )
    finally:
        conn.close()
    authz.audit(actor_user_id=user_id, org_id=org_id,
                action="org.settings.updated",
                details={"patch_keys": list(patch.keys())})
    return _compute_org_payload(user_id=user_id, org_slug=org_slug)


def _create_project(*, user_id: str, org_slug: str, body: dict) -> dict | None:
    """Create a new project under an org. Gated by ``org.projects.create``.

    Body shape: ``{"name", "slug", "description"?}``. Returns the new project
    dict, ``{"_forbidden", "missing"}`` on permission denial, ``{"_error":
    "..."}`` on validation failure, or ``None`` if the org doesn't exist.
    """
    import authz
    import authz.models as _models
    import sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
    finally:
        conn.close()
    if not row:
        return None
    org_id = row[0]
    if not authz.can(user_id=user_id, action="org.projects.create", org_id=org_id):
        return {"_forbidden": True, "missing": "org.projects.create"}

    name = (body.get("name") or "").strip()
    slug = (body.get("slug") or "").strip().lower()
    description = (body.get("description") or "").strip()
    if not name:
        return {"_error": "name is required"}
    if not slug:
        # auto-derive a simple slug from the name
        slug = re.sub(r"[^a-z0-9-]+", "-", name.lower()).strip("-") or "project"
    if not re.match(r"^[a-z0-9][a-z0-9-]{0,62}$", slug):
        return {"_error": "slug must match [a-z0-9][a-z0-9-]{0,62}"}

    try:
        project_id = _models.create_project(
            USERDATA_DB, org_id=org_id, name=name, slug=slug, description=description,
        )
    except _s.IntegrityError as e:
        return {"_error": f"slug already in use: {e}"}

    # Add the creator as a project member with the Admin system role so they
    # can immediately use the project.
    conn = _s.connect(USERDATA_DB)
    try:
        admin_role_id = conn.execute(
            "SELECT id FROM roles WHERE name='Admin' AND is_system=1 AND org_id IS NULL"
        ).fetchone()
        if admin_role_id:
            _models.ensure_project_member(
                USERDATA_DB, project_id=project_id, user_id=user_id, added_by=user_id,
            )
            _models.assign_project_role(
                USERDATA_DB, project_id=project_id, user_id=user_id,
                role_id=admin_role_id[0], assigned_by=user_id,
            )
    finally:
        conn.close()

    authz.audit(actor_user_id=user_id, org_id=org_id, project_id=project_id,
                action="project.created", target_type="project", target_id=project_id,
                details={"name": name, "slug": slug})
    return {
        "id": project_id, "slug": slug, "name": name, "description": description,
        "org_slug": org_slug, "my_roles": ["Admin"],
    }


def _compute_roles_payload(*, user_id: str, org_slug: str) -> dict | None:
    import authz, sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
        if not row:
            return None
        org_id = row[0]
        if not authz.can(user_id=user_id, action="org.roles.view", org_id=org_id):
            return {"_forbidden": True, "missing": "org.roles.view"}
        rows = conn.execute(
            "SELECT id, org_id, name, description, is_system, created_at "
            "FROM roles WHERE deleted_at IS NULL "
            "AND (org_id IS NULL OR org_id=?) ORDER BY is_system DESC, name",
            (org_id,),
        ).fetchall()
        roles: list[dict] = []
        for rid, r_org_id, name, desc, is_system, created_at in rows:
            perms = [p[0] for p in conn.execute(
                "SELECT permission_code FROM role_permissions WHERE role_id=?",
                (rid,),
            ).fetchall()]
            n_members = conn.execute(
                "SELECT COUNT(*) FROM project_member_roles WHERE role_id=? AND deleted_at IS NULL",
                (rid,),
            ).fetchone()[0] + conn.execute(
                "SELECT COUNT(*) FROM org_member_roles WHERE role_id=? AND deleted_at IS NULL",
                (rid,),
            ).fetchone()[0]
            roles.append({
                "id": rid, "name": name, "description": desc,
                "is_system": bool(is_system), "scope": "org" if r_org_id else "system",
                "perms": perms, "member_count": n_members,
            })
    finally:
        conn.close()
    return {"roles": roles}


def _create_role(*, user_id: str, org_slug: str, body: dict) -> dict | None:
    import authz, sqlite3 as _s, uuid
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
        if not row:
            return None
        org_id = row[0]
        if not authz.can(user_id=user_id, action="org.roles.manage", org_id=org_id):
            return {"_forbidden": True, "missing": "org.roles.manage"}
        name = (body.get("name") or "").strip()
        if not name:
            return {"_error": "name is required"}
        desc = body.get("description") or ""
        perms = body.get("perms") or []
        rid = str(uuid.uuid4())
        now = datetime.datetime.now(datetime.UTC).isoformat().replace("+00:00", "Z")
        with conn:
            conn.execute(
                "INSERT INTO roles(id, org_id, name, description, is_system, created_at) "
                "VALUES (?, ?, ?, ?, 0, ?)",
                (rid, org_id, name, desc, now),
            )
            for code in perms:
                conn.execute(
                    "INSERT OR IGNORE INTO role_permissions(role_id, permission_code) VALUES (?, ?)",
                    (rid, code),
                )
    finally:
        conn.close()
    authz.audit(actor_user_id=user_id, org_id=org_id, action="org.role.created",
                target_type="role", target_id=rid, details={"name": name, "perms": perms})
    return {"id": rid, "name": name, "description": desc, "perms": perms,
            "is_system": False, "scope": "org", "member_count": 0}


def _patch_role(*, user_id: str, org_slug: str, role_id: str, body: dict) -> dict | None:
    import authz, sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
        if not row:
            return None
        org_id = row[0]
        if not authz.can(user_id=user_id, action="org.roles.manage", org_id=org_id):
            return {"_forbidden": True, "missing": "org.roles.manage"}
        rrow = conn.execute(
            "SELECT name, description, is_system, org_id FROM roles "
            "WHERE id=? AND deleted_at IS NULL", (role_id,),
        ).fetchone()
        if not rrow or rrow[2] == 1:
            # Cannot patch system roles or unknown roles
            return None
        if rrow[3] != org_id:
            return None  # foreign-org role; don't leak
        new_name = body.get("name", rrow[0])
        new_desc = body.get("description", rrow[1])
        new_perms = body.get("perms")
        with conn:
            conn.execute(
                "UPDATE roles SET name=?, description=? WHERE id=?",
                (new_name, new_desc, role_id),
            )
            if new_perms is not None:
                conn.execute("DELETE FROM role_permissions WHERE role_id=?", (role_id,))
                conn.executemany(
                    "INSERT INTO role_permissions(role_id, permission_code) VALUES (?, ?)",
                    [(role_id, c) for c in new_perms],
                )
    finally:
        conn.close()
    authz.audit(actor_user_id=user_id, org_id=org_id, action="org.role.updated",
                target_type="role", target_id=role_id,
                details={"keys": list(body.keys())})
    # Re-read to return the current state
    payload = _compute_roles_payload(user_id=user_id, org_slug=org_slug)
    if not payload or payload.get("_forbidden"):
        return payload
    for r in payload["roles"]:
        if r["id"] == role_id:
            return r
    return None


def _delete_role(*, user_id: str, org_slug: str, role_id: str) -> dict | None:
    import authz, sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
        if not row:
            return None
        org_id = row[0]
        if not authz.can(user_id=user_id, action="org.roles.manage", org_id=org_id):
            return {"_forbidden": True, "missing": "org.roles.manage"}
        rrow = conn.execute(
            "SELECT is_system, org_id FROM roles WHERE id=? AND deleted_at IS NULL",
            (role_id,),
        ).fetchone()
        if not rrow or rrow[0] == 1 or rrow[1] != org_id:
            return None
        now = datetime.datetime.now(datetime.UTC).isoformat().replace("+00:00", "Z")
        with conn:
            conn.execute("UPDATE roles SET deleted_at=? WHERE id=?", (now, role_id))
            conn.execute("UPDATE project_member_roles SET deleted_at=? WHERE role_id=? AND deleted_at IS NULL",
                         (now, role_id))
            conn.execute("UPDATE org_member_roles SET deleted_at=? WHERE role_id=? AND deleted_at IS NULL",
                         (now, role_id))
    finally:
        conn.close()
    authz.audit(actor_user_id=user_id, org_id=org_id, action="org.role.deleted",
                target_type="role", target_id=role_id)
    return {"deleted": True}


# ── Task 4: IdP config + group_role_mappings CRUD ────────────────────────────

_REDACT_SUFFIXES = ("secret", "password", "key")


def _redact_config(config: dict) -> dict:
    """Return a copy of config with secret-like keys replaced by ***REDACTED***."""
    out = {}
    for k, v in config.items():
        if k.lower().endswith(_REDACT_SUFFIXES):
            out[k] = "***REDACTED***"
        else:
            out[k] = v
    return out


def _compute_idp_config_payload(*, user_id: str, org_slug: str) -> dict | None:
    import authz, sqlite3 as _s
    import identity.idp_config as _idp
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
    finally:
        conn.close()
    if not row:
        return None
    org_id = row[0]
    if not authz.can(user_id=user_id, action="org.sso.view", org_id=org_id):
        return {"_forbidden": True, "missing": "org.sso.view"}
    cfg = _idp.get_idp_config(USERDATA_DB, org_id=org_id)
    if cfg is None:
        return {"kind": None}
    return {
        "kind": cfg["kind"],
        "config": _redact_config(cfg["config"]),
        "enabled": cfg["enabled"],
    }


def _put_idp_config(*, user_id: str, org_slug: str, body: dict) -> dict | None:
    import authz, sqlite3 as _s
    import identity.idp_config as _idp
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
    finally:
        conn.close()
    if not row:
        return None
    org_id = row[0]
    if not authz.can(user_id=user_id, action="org.sso.manage", org_id=org_id):
        return {"_forbidden": True, "missing": "org.sso.manage"}
    kind = body.get("kind", "oidc")
    config = body.get("config") or {}
    enabled = bool(body.get("enabled", True))
    _idp.set_idp_config(USERDATA_DB, org_id=org_id, kind=kind, config=config, enabled=enabled)
    authz.audit(actor_user_id=user_id, org_id=org_id, action="org.idp.configured",
                details={"kind": kind})
    return {"kind": kind, "enabled": enabled}


def _disable_idp_config_helper(*, user_id: str, org_slug: str) -> dict | None:
    import authz, sqlite3 as _s
    import identity.idp_config as _idp
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
    finally:
        conn.close()
    if not row:
        return None
    org_id = row[0]
    if not authz.can(user_id=user_id, action="org.sso.manage", org_id=org_id):
        return {"_forbidden": True, "missing": "org.sso.manage"}
    _idp.disable_idp_config(USERDATA_DB, org_id=org_id)
    authz.audit(actor_user_id=user_id, org_id=org_id, action="org.idp.disabled")
    return {"disabled": True}


def _compute_group_mappings_payload(*, user_id: str, org_slug: str) -> dict | None:
    import authz, sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
        if not row:
            return None
        org_id = row[0]
        if not authz.can(user_id=user_id, action="org.sso.view", org_id=org_id):
            return {"_forbidden": True, "missing": "org.sso.view"}
        rows = conn.execute(
            "SELECT grm.id, grm.external_group_id, grm.project_id, "
            "       p.slug AS project_slug, grm.role_id, r.name AS role_name "
            "FROM group_role_mappings grm "
            "JOIN projects p ON p.id = grm.project_id "
            "JOIN roles r ON r.id = grm.role_id "
            "WHERE grm.org_id=? AND grm.deleted_at IS NULL",
            (org_id,),
        ).fetchall()
    finally:
        conn.close()
    mappings = [
        {
            "id": r[0],
            "external_group_id": r[1],
            "project_id": r[2],
            "project_slug": r[3],
            "role_id": r[4],
            "role_name": r[5],
        }
        for r in rows
    ]
    return {"mappings": mappings}


def _create_group_mapping(*, user_id: str, org_slug: str, body: dict) -> dict | None:
    import authz, sqlite3 as _s, uuid
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
        if not row:
            return None
        org_id = row[0]
        if not authz.can(user_id=user_id, action="org.sso.manage", org_id=org_id):
            return {"_forbidden": True, "missing": "org.sso.manage"}
        external_group_id = body.get("external_group_id", "")
        project_slug = body.get("project_slug", "")
        role_id = body.get("role_id", "")
        # Resolve project_slug → project_id
        proj_row = conn.execute(
            "SELECT id FROM projects WHERE slug=? AND org_id=? AND deleted_at IS NULL",
            (project_slug, org_id),
        ).fetchone()
        if not proj_row:
            return {"_error": "project not found"}
        project_id = proj_row[0]
        # Validate role belongs to org or is a system role
        role_row = conn.execute(
            "SELECT id FROM roles WHERE id=? AND deleted_at IS NULL "
            "AND (org_id IS NULL OR org_id=?)",
            (role_id, org_id),
        ).fetchone()
        if not role_row:
            return {"_error": "role not found"}
        mapping_id = str(uuid.uuid4())
        with conn:
            conn.execute(
                "INSERT INTO group_role_mappings(id, org_id, external_group_id, project_id, role_id) "
                "VALUES (?, ?, ?, ?, ?)",
                (mapping_id, org_id, external_group_id, project_id, role_id),
            )
    finally:
        conn.close()
    authz.audit(actor_user_id=user_id, org_id=org_id, action="org.group_mapping.added",
                target_type="group_mapping", target_id=mapping_id,
                details={"external_group_id": external_group_id})
    return {
        "id": mapping_id,
        "external_group_id": external_group_id,
        "project_id": project_id,
        "role_id": role_id,
    }


def _delete_group_mapping(*, user_id: str, org_slug: str, mapping_id: str) -> dict | None:
    import authz, sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
        if not row:
            return None
        org_id = row[0]
        if not authz.can(user_id=user_id, action="org.sso.manage", org_id=org_id):
            return {"_forbidden": True, "missing": "org.sso.manage"}
        mrow = conn.execute(
            "SELECT id FROM group_role_mappings WHERE id=? AND org_id=? AND deleted_at IS NULL",
            (mapping_id, org_id),
        ).fetchone()
        if not mrow:
            return None
        now = datetime.datetime.now(datetime.UTC).isoformat().replace("+00:00", "Z")
        with conn:
            conn.execute(
                "UPDATE group_role_mappings SET deleted_at=? WHERE id=?",
                (now, mapping_id),
            )
    finally:
        conn.close()
    authz.audit(actor_user_id=user_id, org_id=org_id, action="org.group_mapping.removed",
                target_type="group_mapping", target_id=mapping_id)
    return {"deleted": True}


# ── Audit query + export helpers ──────────────────────────────────────────────

def _compute_audit_payload(
    *,
    user_id: str,
    org_slug: str,
    action_prefix: str | None = None,
    actor: str | None = None,
    since: str | None = None,
    until: str | None = None,
    limit: int = 100,
) -> dict | None:
    import authz as _authz, sqlite3 as _s
    from authz.audit import query_audit as _query_audit
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
    finally:
        conn.close()
    if not row:
        return None
    org_id = row[0]
    if not _authz.can(user_id=user_id, action="org.audit.view", org_id=org_id):
        return {"_forbidden": True, "missing": "org.audit.view"}
    rows = list(_query_audit(
        USERDATA_DB,
        org_id=org_id,
        action_prefix=action_prefix,
        actor_user_id=actor,
        since=since,
        until=until,
        limit=limit,
    ))
    next_cursor = rows[-1]["created_at"] if rows else None
    return {"rows": rows, "next_cursor": next_cursor}


def _export_audit_csv(*, user_id: str, org_slug: str, **filters) -> bytes:
    import authz as _authz, sqlite3 as _s, csv as _csv, io as _io
    from authz.audit import query_audit as _query_audit
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
    finally:
        conn.close()
    if not row:
        return b""
    org_id = row[0]
    if not _authz.can(user_id=user_id, action="org.audit.export", org_id=org_id):
        return b""
    buf = _io.StringIO()
    writer = _csv.writer(buf)
    _CSV_HEADER = [
        "id", "org_id", "project_id", "actor_user_id", "action",
        "target_type", "target_id", "details_json", "ip", "user_agent", "created_at",
    ]
    writer.writerow(_CSV_HEADER)
    limit = min(int(filters.pop("limit", 100)), 100_000)
    for audit_row in _query_audit(USERDATA_DB, org_id=org_id, limit=limit, **filters):
        writer.writerow([
            audit_row.get("id"), audit_row.get("org_id"), audit_row.get("project_id"),
            audit_row.get("actor_user_id"), audit_row.get("action"),
            audit_row.get("target_type"), audit_row.get("target_id"),
            json.dumps(audit_row.get("details") or {}, separators=(",", ":")),
            audit_row.get("ip"), audit_row.get("user_agent"), audit_row.get("created_at"),
        ])
    return buf.getvalue().encode("utf-8")


def _export_audit_ndjson(*, user_id: str, org_slug: str, **filters) -> bytes:
    import authz as _authz, sqlite3 as _s
    from authz.audit import query_audit as _query_audit
    conn = _s.connect(USERDATA_DB)
    try:
        row = conn.execute(
            "SELECT id FROM organizations WHERE slug=? AND deleted_at IS NULL",
            (org_slug,),
        ).fetchone()
    finally:
        conn.close()
    if not row:
        return b""
    org_id = row[0]
    if not _authz.can(user_id=user_id, action="org.audit.export", org_id=org_id):
        return b""
    limit = min(int(filters.pop("limit", 100)), 100_000)
    lines = []
    for audit_row in _query_audit(USERDATA_DB, org_id=org_id, limit=limit, **filters):
        lines.append(json.dumps(audit_row, separators=(",", ":")))
    return ("\n".join(lines) + ("\n" if lines else "")).encode("utf-8")


# ---------------------------------------------------------------------------
# System admin helpers — gated by users.is_system_admin=1 flag (NOT authz.can)
# ---------------------------------------------------------------------------

def _create_org(*, user_id: str, body: dict) -> dict | None:
    """POST /api/system/orgs — create a new organization (system-admin only).

    Body: ``{"name", "slug"?}``. Slug auto-derived from name if absent.
    The creator is added as an org_member with the Admin role so they can
    immediately manage the org without a manual step.

    Returns:
      - the new org dict on success.
      - ``{"_forbidden": True, "missing": "system.orgs.create"}`` if not sysadmin.
      - ``{"_error": "..."}``                                     on validation failure.
    """
    import sqlite3 as _s
    import authz as _authz
    import authz.models as _models
    conn = _s.connect(USERDATA_DB)
    try:
        sysadmin_row = conn.execute(
            "SELECT is_system_admin FROM users WHERE id=? AND deleted_at IS NULL",
            (user_id,),
        ).fetchone()
    finally:
        conn.close()
    if not sysadmin_row or not sysadmin_row[0]:
        return {"_forbidden": True, "missing": "system.orgs.create"}

    name = (body.get("name") or "").strip()
    slug = (body.get("slug") or "").strip().lower()
    if not name:
        return {"_error": "name is required"}
    if not slug:
        slug = re.sub(r"[^a-z0-9-]+", "-", name.lower()).strip("-") or "org"
    if not re.match(r"^[a-z0-9][a-z0-9-]{0,62}$", slug):
        return {"_error": "slug must match [a-z0-9][a-z0-9-]{0,62}"}

    conn = _s.connect(USERDATA_DB)
    try:
        clash = conn.execute(
            "SELECT id FROM organizations WHERE slug=?", (slug,),
        ).fetchone()
    finally:
        conn.close()
    if clash:
        return {"_error": f"slug already in use: {slug}"}

    try:
        org_id = _models.create_org(USERDATA_DB, name=name, slug=slug)
    except _s.IntegrityError as e:
        return {"_error": f"create failed: {e}"}

    # Add the creator as org_member with the Admin role so they can manage it.
    conn = _s.connect(USERDATA_DB)
    try:
        admin_role = conn.execute(
            "SELECT id FROM roles WHERE name='Admin' AND is_system=1 AND org_id IS NULL"
        ).fetchone()
    finally:
        conn.close()
    _models.ensure_org_member(USERDATA_DB, org_id=org_id, user_id=user_id, added_by=user_id)
    if admin_role:
        _models.assign_org_role(
            USERDATA_DB, org_id=org_id, user_id=user_id,
            role_id=admin_role[0], assigned_by=user_id,
        )

    _authz.audit(
        actor_user_id=user_id, org_id=org_id,
        action="system.orgs.created", target_type="organization", target_id=org_id,
        details={"name": name, "slug": slug},
    )
    return {"id": org_id, "slug": slug, "name": name, "member_count": 1, "created_at": None, "deleted_at": None}


def _compute_system_orgs_payload(*, user_id: str) -> dict:
    """GET /api/system/orgs — return every org (active + suspended) with member count.

    Returns ``{_forbidden: True, missing: "system.orgs.view"}`` when the calling
    user is not a system admin.
    """
    import sqlite3 as _s
    conn = _s.connect(USERDATA_DB)
    try:
        sysadmin_row = conn.execute(
            "SELECT is_system_admin FROM users WHERE id=? AND deleted_at IS NULL",
            (user_id,),
        ).fetchone()
        if not sysadmin_row or not sysadmin_row[0]:
            return {"_forbidden": True, "missing": "system.orgs.view"}
        rows = conn.execute(
            "SELECT id, slug, name, created_at, deleted_at FROM organizations "
            "ORDER BY created_at",
        ).fetchall()
        orgs: list[dict] = []
        for org_id, slug, name, created_at, deleted_at in rows:
            member_count = conn.execute(
                "SELECT COUNT(*) FROM org_members "
                "WHERE org_id=? AND deleted_at IS NULL",
                (org_id,),
            ).fetchone()[0]
            orgs.append({
                "id": org_id,
                "slug": slug,
                "name": name,
                "member_count": member_count,
                "created_at": created_at,
                "deleted_at": deleted_at,
            })
    finally:
        conn.close()
    return {"orgs": orgs}


def _suspend_org(*, user_id: str, org_slug: str) -> dict | None:
    """POST /api/system/orgs/<slug>/suspend — soft-delete the org via models.soft_delete_org.

    Returns:
      - ``None``                     if the org slug does not exist.
      - ``{_forbidden: True, …}``    if caller is not a system admin.
      - ``{"ok": True}``             on success.
    """
    import sqlite3 as _s
    import authz as _authz
    import authz.models as _models
    conn = _s.connect(USERDATA_DB)
    try:
        sysadmin_row = conn.execute(
            "SELECT is_system_admin FROM users WHERE id=? AND deleted_at IS NULL",
            (user_id,),
        ).fetchone()
        if not sysadmin_row or not sysadmin_row[0]:
            return {"_forbidden": True, "missing": "system.orgs.suspend"}
        org_row = conn.execute(
            "SELECT id FROM organizations WHERE slug=?",
            (org_slug,),
        ).fetchone()
        if not org_row:
            return None
        org_id = org_row[0]
    finally:
        conn.close()
    _models.soft_delete_org(USERDATA_DB, org_id)
    _authz.audit(
        actor_user_id=user_id,
        org_id=org_id,
        action="system.orgs.suspended",
        details={"slug": org_slug},
    )
    return {"ok": True}


def _restore_org(*, user_id: str, org_slug: str) -> dict | None:
    """POST /api/system/orgs/<slug>/restore — clear deleted_at on the org and cascade rows.

    Cascade-restores: organizations, org_members, org_member_roles, scim_tokens,
    group_role_mappings (all rows whose deleted_at was set by the original
    soft_delete_org are cleared).

    Returns:
      - ``None``                     if the org slug does not exist.
      - ``{_forbidden: True, …}``    if caller is not a system admin.
      - ``{"ok": True}``             on success.
    """
    import sqlite3 as _s
    import authz as _authz
    conn = _s.connect(USERDATA_DB)
    try:
        sysadmin_row = conn.execute(
            "SELECT is_system_admin FROM users WHERE id=? AND deleted_at IS NULL",
            (user_id,),
        ).fetchone()
        if not sysadmin_row or not sysadmin_row[0]:
            return {"_forbidden": True, "missing": "system.orgs.suspend"}
        org_row = conn.execute(
            "SELECT id FROM organizations WHERE slug=?",
            (org_slug,),
        ).fetchone()
        if not org_row:
            return None
        org_id = org_row[0]
        with conn:
            conn.execute(
                "UPDATE organizations SET deleted_at=NULL WHERE id=?",
                (org_id,),
            )
            conn.execute(
                "UPDATE org_members SET deleted_at=NULL WHERE org_id=?",
                (org_id,),
            )
            conn.execute(
                "UPDATE org_member_roles SET deleted_at=NULL WHERE org_id=?",
                (org_id,),
            )
            conn.execute(
                "UPDATE scim_tokens SET deleted_at=NULL WHERE org_id=?",
                (org_id,),
            )
            conn.execute(
                "UPDATE group_role_mappings SET deleted_at=NULL WHERE org_id=?",
                (org_id,),
            )
            conn.execute(
                "UPDATE idp_configs SET enabled=1 WHERE org_id=?",
                (org_id,),
            )
    finally:
        conn.close()
    _authz.audit(
        actor_user_id=user_id,
        org_id=org_id,
        action="system.orgs.restored",
        details={"slug": org_slug},
    )
    return {"ok": True}


def _send_file_response(handler, body: bytes, content_type: str, filename: str) -> None:
    """Send raw bytes with Content-Type and Content-Disposition: attachment headers."""
    try:
        handler.send_response(200)
        handler._cors_headers()
        handler.send_header("Content-Type", content_type)
        handler.send_header("Content-Disposition", f"attachment; filename={filename}")
        handler.send_header("Content-Length", str(len(body)))
        handler.end_headers()
        handler.wfile.write(body)
    except BrokenPipeError:
        pass  # client disconnected — safe to ignore


class QACopilotHandler(http.server.SimpleHTTPRequestHandler):
    def __init__(self, *args, **kwargs):
        self._req_start = 0.0
        self._req_status = 200
        self._req_user = None
        super().__init__(*args, directory=str(SCRIPT_DIR), **kwargs)

    def log_message(self, format, *args):
        if args and "/api/" in str(args[0]):
            elapsed = ""
            print(f"  → API {args[0]} {elapsed}")

    def send_response(self, code, message=None):
        self._req_status = code
        super().send_response(code, message)

    def _auto_log(self):
        """Log the current request to monitoring. Called after each do_* method."""
        if not hasattr(self, '_req_start') or self._req_start == 0:
            return
        if self.path.startswith("/api/monitoring"):
            return
        duration_ms = (time.time() - self._req_start) * 1000
        try:
            monitoring.insert_request_log(
                method=self.command, path=self.path.split("?")[0],
                status_code=self._req_status, duration_ms=duration_ms,
                user_id=self._req_user.get("id") if self._req_user else None,
                username=self._req_user.get("username") if self._req_user else None,
            )
        except Exception:
            pass

    def _handle_scim_request(self, method: str) -> bool:
        """Dispatch /scim/v2/* requests. Returns True if the request was handled
        (so the outer router knows not to fall through). Returns False if the
        path doesn't match SCIM routes.
        """
        if not self.path.startswith("/scim/v2/"):
            return False
        if not MULTI_PROJECT_ENABLED:
            _scim_json_response(self, {
                "schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
                "status": "404", "detail": "Multi-project mode disabled"},
                status=404)
            return True

        # Authenticate via Bearer token
        from scim.auth import verify_scim_token
        from scim import handlers as scim_handlers
        auth_header = self.headers.get("Authorization") or ""
        if not auth_header.lower().startswith("bearer "):
            _scim_json_response(self, {
                "schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
                "status": "401", "detail": "Missing Authorization: Bearer <token>"},
                status=401)
            return True
        token = auth_header.split(" ", 1)[1].strip()
        token_info = verify_scim_token(USERDATA_DB, raw_token=token)
        if not token_info:
            _scim_json_response(self, {
                "schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
                "status": "401", "detail": "Invalid or revoked token"},
                status=401)
            return True
        org_id = token_info["org_id"]

        # Route within /scim/v2/...
        path = self.path.split("?", 1)[0]
        from urllib.parse import urlparse, parse_qs
        qs = parse_qs(urlparse(self.path).query)
        try:
            if method == "GET" and path == "/scim/v2/ServiceProviderConfig":
                _scim_json_response(self, scim_handlers.handle_service_provider_config())
            elif method == "GET" and path == "/scim/v2/Schemas":
                _scim_json_response(self, scim_handlers.handle_schemas())
            elif method == "GET" and path == "/scim/v2/ResourceTypes":
                _scim_json_response(self, scim_handlers.handle_resource_types())
            elif method == "GET" and path == "/scim/v2/Users":
                payload = scim_handlers.handle_users_get(
                    USERDATA_DB, org_id=org_id,
                    filter_=(qs.get("filter") or [None])[0],
                    start_index=int((qs.get("startIndex") or ["1"])[0]),
                    count=int((qs.get("count") or ["100"])[0]),
                )
                _scim_json_response(self, payload)
            elif method == "POST" and path == "/scim/v2/Users":
                body = self._read_json_body()
                payload = scim_handlers.handle_users_post(
                    USERDATA_DB, org_id=org_id, body=body, actor_user_id=None,
                )
                _scim_json_response(self, payload, status=201)
            elif (m := re.match(r"^/scim/v2/Users/(?P<id>[^/]+)$", path)):
                user_id = m.group("id")
                if method == "GET":
                    payload = scim_handlers.handle_users_get_one(
                        USERDATA_DB, org_id=org_id, user_id=user_id)
                    if payload is None:
                        _scim_json_response(self, {
                            "schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
                            "status": "404", "detail": "User not found"}, status=404)
                    else:
                        _scim_json_response(self, payload)
                elif method == "PATCH":
                    body = self._read_json_body()
                    payload = scim_handlers.handle_users_patch(
                        USERDATA_DB, org_id=org_id, user_id=user_id,
                        body=body, actor_user_id=None,
                    )
                    _scim_json_response(self, payload)
                elif method == "DELETE":
                    scim_handlers.handle_users_delete(
                        USERDATA_DB, org_id=org_id, user_id=user_id, actor_user_id=None,
                    )
                    self.send_response(204)
                    self.end_headers()
                else:
                    self.send_response(405)
                    self.end_headers()
            elif method == "GET" and path == "/scim/v2/Groups":
                payload = scim_handlers.handle_groups_get(
                    USERDATA_DB, org_id=org_id,
                    filter_=(qs.get("filter") or [None])[0],
                    start_index=int((qs.get("startIndex") or ["1"])[0]),
                    count=int((qs.get("count") or ["100"])[0]),
                )
                _scim_json_response(self, payload)
            elif method == "POST" and path == "/scim/v2/Groups":
                body = self._read_json_body()
                payload = scim_handlers.handle_groups_post(
                    USERDATA_DB, org_id=org_id, body=body, actor_user_id=None,
                )
                _scim_json_response(self, payload, status=201)
            elif (m := re.match(r"^/scim/v2/Groups/(?P<id>[^/]+)$", path)):
                group_id = m.group("id")
                if method == "GET":
                    payload = scim_handlers.handle_groups_get_one(
                        USERDATA_DB, org_id=org_id, group_id=group_id)
                    if payload is None:
                        _scim_json_response(self, {
                            "schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
                            "status": "404", "detail": "Group not found"}, status=404)
                    else:
                        _scim_json_response(self, payload)
                elif method == "PATCH":
                    body = self._read_json_body()
                    payload = scim_handlers.handle_groups_patch(
                        USERDATA_DB, org_id=org_id, group_id=group_id,
                        body=body, actor_user_id=None,
                    )
                    _scim_json_response(self, payload)
                elif method == "DELETE":
                    scim_handlers.handle_groups_delete(
                        USERDATA_DB, org_id=org_id, group_id=group_id, actor_user_id=None,
                    )
                    self.send_response(204)
                    self.end_headers()
                else:
                    self.send_response(405)
                    self.end_headers()
            else:
                _scim_json_response(self, {
                    "schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
                    "status": "404", "detail": "Unknown SCIM endpoint"}, status=404)
        except scim_handlers.SCIMError as e:
            _scim_json_response(self, e.to_resource(), status=e.status)
        return True

    # ── Routing ──────────────────────────────────────────────────────────

    def do_OPTIONS(self):
        with authz.request_scope():
            self._do_options_inner()

    def _do_options_inner(self):
        self.send_response(200)
        self._cors_headers()
        self.end_headers()

    def do_POST(self):
        with authz.request_scope():
            self._do_post_inner()

    def _do_post_inner(self):
        self._req_start = time.time()
        if self._handle_scim_request("POST"):
            return
        # Public auth endpoints — no token required
        if self.path == "/api/auth/login":
            self._auth_login()
        elif re.match(r"^/auth/[^/]+/login$", self.path):
            self._handle_org_login()
        elif re.match(r"^/auth/[^/]+/saml/acs$", self.path):
            self._handle_saml_acs()
        elif self.path == "/auth/discover":
            self._handle_sso_discover()
        elif self.path == "/auth/accept-invite":
            self._handle_accept_invite()
        elif self.path == "/api/auth/register":
            self._auth_register()
        # ── Control plane gated endpoints ──────────────────────────────────
        elif self.path == "/api/chat":
            if not self._check_cp_jwt():
                return
            if self._block_reader(): return
            self._proxy_ollama()
        elif self.path == "/api/stream":
            if not self._check_cp_jwt():
                return
            if self._block_reader(): return
            self._proxy_ollama_stream()
        elif self.path == "/api/json":
            if not self._check_cp_jwt():
                return
            if self._block_reader(): return
            self._proxy_json()
        elif self.path == "/api/xray-push":
            if self._block_reader(): return
            self._xray_push()
        # Standard authenticated endpoints
        elif self.path == "/api/rag-context":
            if self._require_auth() is None: return
            self._rag_context()
        elif self.path == "/api/rag-codebase":
            if self._require_auth() is None: return
            self._rag_codebase()
        elif self.path == "/api/jira-expand":
            if self._require_auth() is None: return
            self._jira_expand()
        elif self.path == "/api/config":
            if self._require_auth() is None: return
            self._config_update()
        elif self.path == "/api/sources" or re.match(r"^/api/sources/[^/]+/filters$", self.path):
            if self._require_auth() is None: return
            self._proxy_rag(self.path)
        elif self.path == "/api/rag-sync":
            if self._require_auth() is None: return
            self._proxy_rag("/api/ingest/jira")
        elif self.path == "/api/swagger-sync":
            if self._require_auth() is None: return
            self._proxy_rag("/api/ingest/swagger")
        elif self.path == "/api/swagger-sources/save":
            if self._require_auth() is None: return
            self._proxy_rag("/api/swagger-sources")
        elif self.path == "/api/web-sources/save":
            if self._require_auth() is None: return
            self._proxy_rag("/api/web-sources")
        elif self.path == "/api/web-sync":
            if self._require_auth() is None: return
            self._proxy_rag("/api/ingest/web")
        elif self.path == "/api/repo-sources":
            if self._require_auth() is None: return
            self._proxy_rag("/api/repo-sources")
        elif self.path == "/api/repo-sync":
            if self._require_auth() is None: return
            self._proxy_rag("/api/ingest/repos")
        elif self.path == "/api/codebase-sync":
            if self._require_auth() is None: return
            self._proxy_rag("/api/ingest/codebase")
        # ── History / stats persistence ──────────────────────────────────
        elif self.path == "/api/history":
            user = self._require_auth()
            if user is None: return
            length = int(self.headers.get("Content-Length", 0))
            try:
                entry = json.loads(self.rfile.read(length))
            except Exception:
                self._json_response({"error": "Invalid JSON"}, status=400); return
            _history_save(user["id"], entry)
            self._json_response({"ok": True})
        elif self.path == "/api/stats":
            user = self._require_auth()
            if user is None: return
            length = int(self.headers.get("Content-Length", 0))
            try:
                body = json.loads(self.rfile.read(length))
            except Exception:
                self._json_response({"error": "Invalid JSON"}, status=400); return
            _stats_save(user["id"], body)
            self._json_response({"ok": True})
        elif self.path == "/api/auth/apikey":
            self._auth_generate_apikey()
        elif self.path == "/api/webhook/generate":
            self._webhook_generate()
        elif self.path == "/api/export-repo":
            if self._require_auth() is None: return
            self._proxy_rag("/api/export-repo")
        # ── Fine-tuning routes ───────────────────────────────────
        elif self.path == "/api/finetune/collect":
            if self._require_auth(
                required_roles=["admin"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_collect()
        elif self.path == "/api/finetune/filter":
            if self._require_auth(
                required_roles=["admin"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_filter()
        elif self.path == "/api/finetune/assemble":
            if self._require_auth(
                required_roles=["admin"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_assemble()
        elif re.match(r"^/api/finetune/pairs/\d+/review$", self.path):
            if self._require_auth(
                required_roles=["admin", "trainer"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_review_pair()
        elif self.path == "/api/finetune/train":
            if self._require_auth(
                required_roles=["admin"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_train()
        elif self.path == "/api/finetune/train/stop":
            if self._require_auth(
                required_roles=["admin"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_train_stop()
        elif self.path == "/api/finetune/cloud/terminate":
            if self._require_auth(
                required_roles=["admin"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_cloud_terminate()
        elif self.path == "/api/finetune/deploy":
            if self._require_auth(
                required_roles=["admin"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_deploy()
        elif self.path == "/api/finetune/import":
            if self._require_auth(
                required_roles=["admin"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_import()
        elif self.path == "/api/webhook/xray-results":
            self._webhook_xray_results()
        elif self.path == "/api/agent/run":
            self._agent_run_v2()
        elif re.match(r"^/api/agent/[^/]+/approve$", self.path):
            job_id = self.path.split("/")[3]
            self._agent_approve(job_id)
        elif re.match(r"^/api/agent/[^/]+/override$", self.path):
            job_id = self.path.split("/")[3]
            self._agent_override(job_id)
        elif self.path == "/api/contact":
            self._contact_submit()
        elif self.path == "/api/auth/forgot-password":
            self._auth_forgot_password()
        elif self.path == "/api/auth/reset-password":
            self._auth_reset_password()
        elif self.path == "/api/billing/checkout":
            self._billing_checkout()
        # ── New module routes ──────────────────────────────────────────
        elif self.path == "/api/contract/generate":
            if self._block_reader(): return
            import contract_gen
            length = int(self.headers.get("Content-Length", 0))
            try:
                body = json.loads(self.rfile.read(length))
            except Exception:
                self._json_response({"error": "Invalid JSON"}, status=400); return
            spec = body.get("spec")
            if not spec or not isinstance(spec, dict):
                self._json_response({"error": "OpenAPI spec required in 'spec' field"}, status=400); return
            framework = body.get("framework", "restassured")
            provider = body.get("provider", get_default_provider())
            model = body.get("model", DEFAULT_MODEL)
            endpoints = contract_gen.extract_endpoints(spec)
            if not endpoints:
                self._json_response({"error": "No endpoints found in spec"}, status=400); return
            result = contract_gen.generate_contract_tests(endpoints, provider=provider, model=model, framework=framework)
            self._json_response(result)
        elif self.path == "/api/onboarding/complete-step":
            user = self._require_auth()
            if user is None: return
            length = int(self.headers.get("Content-Length", 0))
            try:
                body = json.loads(self.rfile.read(length))
            except Exception:
                self._json_response({"error": "Invalid JSON"}, status=400); return
            state = onboarding.complete_step(user["id"], body.get("step", 0))
            self._json_response(state)
        elif self.path == "/api/onboarding/skip":
            user = self._require_auth()
            if user is None: return
            state = onboarding.skip_onboarding(user["id"])
            self._json_response(state)
        elif self.path == "/api/test-runs":
            user = self._require_auth()
            if user is None: return
            length = int(self.headers.get("Content-Length", 0))
            body = json.loads(self.rfile.read(length))
            run_id = test_runs.record_run(
                project=body.get("project", "default"),
                suite=body.get("suite", ""),
                results=body.get("results", []),
            )
            self._json_response({"run_id": run_id})
        elif self.path == "/api/workspace/share":
            user = self._require_auth()
            if user is None: return
            length = int(self.headers.get("Content-Length", 0))
            body = json.loads(self.rfile.read(length))
            gen_id = workspace.share_generation(
                user_id=user["id"], story_key=body.get("story_key", ""),
                generated_json=json.dumps(body.get("generated", {})),
                module=body.get("module", ""),
            )
            self._json_response({"gen_id": gen_id})
        elif self.path == "/api/workspace/review":
            user = self._require_auth()
            if user is None: return
            length = int(self.headers.get("Content-Length", 0))
            body = json.loads(self.rfile.read(length))
            review_id = workspace.submit_review(
                gen_id=body.get("gen_id", ""), reviewer_id=user["id"],
                verdict=body.get("verdict", ""), comment=body.get("comment", ""),
            )
            self._json_response({"review_id": review_id})
        elif self.path == "/api/workspace/assign":
            user = self._require_auth()
            if user is None: return
            length = int(self.headers.get("Content-Length", 0))
            body = json.loads(self.rfile.read(length))
            workspace.assign_story(body.get("story_key", ""), body.get("user_id", ""))
            self._json_response({"ok": True})
        elif self.path == "/api/perf-results":
            user = self._require_auth()
            if user is None: return
            length = int(self.headers.get("Content-Length", 0))
            body = json.loads(self.rfile.read(length))
            run_id = perf_results.ingest_result(
                tool=body.get("tool", "unknown"),
                scenario=body.get("scenario", ""),
                metrics=body.get("metrics", {}),
            )
            self._json_response({"run_id": run_id})
        elif self.path == "/api/visual-regression/compare":
            if self._block_reader(): return
            import visual_regression
            length = int(self.headers.get("Content-Length", 0))
            body = json.loads(self.rfile.read(length))
            name = body.get("name", "")
            screenshot_path = body.get("screenshot_path", "")
            threshold = body.get("threshold", 0.5)
            if not name or not screenshot_path:
                self._json_response({"error": "name and screenshot_path required"}, status=400); return
            result = visual_regression.compare_files(name, screenshot_path, threshold=threshold)
            self._json_response(result)
        elif self.path == "/api/visual-regression/baseline":
            if self._block_reader(): return
            import visual_regression
            from PIL import Image
            length = int(self.headers.get("Content-Length", 0))
            body = json.loads(self.rfile.read(length))
            name = body.get("name", "")
            screenshot_path = body.get("screenshot_path", "")
            if not name or not screenshot_path:
                self._json_response({"error": "name and screenshot_path required"}, status=400); return
            img = Image.open(screenshot_path)
            path = visual_regression.save_baseline(name, img)
            self._json_response({"baseline_path": path})
        elif self.path == "/api/mobile/generate":
            if self._block_reader(): return
            import mobile_recorder
            length = int(self.headers.get("Content-Length", 0))
            body = json.loads(self.rfile.read(length))
            actions = body.get("actions", [])
            if not actions:
                self._json_response({"error": "actions array required"}, status=400); return
            result = mobile_recorder.generate_from_actions(
                actions=actions, app_name=body.get("app_name", ""),
                platform=body.get("platform", "android"),
                provider=body.get("provider", get_default_provider()),
                model=body.get("model", DEFAULT_MODEL),
            )
            self._json_response(result)
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/roles$", self.path)):
            self._handle_create_role(m.group("slug"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/projects$", self.path)):
            self._handle_create_project(m.group("slug"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/group-mappings$", self.path)):
            self._handle_create_group_mapping(m.group("slug"))
        elif re.match(r"^/api/orgs/[^/]+/scim/tokens$", self.path):
            self._handle_scim_token_mint()
        elif self.path == "/api/system/orgs":
            self._handle_create_org()
        elif (m := re.match(r"^/api/projects/(?P<slug>[^/]+)/members$", self.path)):
            self._handle_invite_project_member(m.group("slug"))
        elif (m := re.match(r"^/api/system/orgs/(?P<slug>[^/]+)/suspend$", self.path)):
            self._handle_suspend_org(m.group("slug"))
        elif (m := re.match(r"^/api/system/orgs/(?P<slug>[^/]+)/restore$", self.path)):
            self._handle_restore_org(m.group("slug"))
        elif self.path == "/api/auth/logout":
            self._handle_auth_logout()
        else:
            self.send_error(404)
        self._auto_log()

    def _billing_checkout(self):
        """POST /api/billing/checkout — create Stripe checkout session."""
        user = self._require_auth()
        if user is None:
            return
        import billing
        if not billing.is_configured():
            self._json_response({"error": "Billing not configured"}, status=503)
            return
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        price_id = body.get("price_id", os.environ.get("QA_STRIPE_PRICE_ID_PRO", ""))
        success_url = body.get("success_url", "http://localhost:5173/billing/success")
        cancel_url = body.get("cancel_url", "http://localhost:5173/billing/cancel")
        if not price_id:
            self._json_response({"error": "price_id required"}, status=400)
            return
        try:
            result = billing.create_checkout_session(
                price_id=price_id,
                customer_email=user.get("email", ""),
                success_url=success_url,
                cancel_url=cancel_url,
            )
            self._json_response(result)
        except RuntimeError as e:
            self._json_response({"error": str(e)}, status=502)

    def do_DELETE(self):
        with authz.request_scope():
            self._do_delete_inner()

    def _do_delete_inner(self):
        self._req_start = time.time()
        if self._handle_scim_request("DELETE"):
            return
        m_hist = re.match(r"^/api/history/(\d+)$", self.path)
        m = re.match(r"^/api/auth/users/([^/]+)$", self.path)
        if m_hist:
            user = self._require_auth()
            if user is None: return
            deleted = _history_delete(user["id"], int(m_hist.group(1)))
            self._json_response({"ok": deleted}, status=200 if deleted else 404)
        elif m:
            self._auth_delete_user(m.group(1))
        elif re.match(r"^/api/repo-sources/[^/]+$", self.path):
            if self._require_auth() is None: return
            rag_path = self.path.replace("/api/repo-sources", "/repo-sources")
            self._proxy_rag(rag_path, method="DELETE")
        elif re.match(r"^/api/sources/[^/]+(|/filters/[^/]+)$", self.path):
            if self._require_auth() is None: return
            self._proxy_rag(self.path, method="DELETE")
        elif re.match(r"^/api/swagger-sources/[^/]+$", self.path):
            if self._require_auth() is None: return
            rag_path = self.path.replace("/api/swagger-sources", "/swagger-sources")
            self._proxy_rag(rag_path, method="DELETE")
        elif re.match(r"^/api/web-sources/[^/]+$", self.path):
            if self._require_auth() is None: return
            self._proxy_rag(self.path, method="DELETE")
        elif self.path == "/api/user/data":
            user = self._require_auth()
            if user is None:
                return
            result = _gdpr_delete_data(user)
            self._json_response(result)
            return
        elif re.match(r"^/api/orgs/[^/]+/scim/tokens/[^/]+$", self.path):
            self._handle_scim_token_revoke()
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/roles/(?P<id>[^/]+)$", self.path)):
            self._handle_delete_role(m.group("slug"), m.group("id"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/idp-config$", self.path)):
            self._handle_delete_idp_config(m.group("slug"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/group-mappings/(?P<id>[^/]+)$", self.path)):
            self._handle_delete_group_mapping(m.group("slug"), m.group("id"))
        elif (m := re.match(r"^/api/projects/(?P<slug>[^/]+)/members/(?P<uid>[^/]+)$", self.path)):
            self._handle_remove_project_member(m.group("slug"), m.group("uid"))
        else:
            self.send_error(404)
        self._auto_log()

    def do_PUT(self):
        with authz.request_scope():
            self._do_put_inner()

    def _do_put_inner(self):
        self._req_start = time.time()
        if (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/idp-config$", self.path)):
            self._handle_put_idp_config(m.group("slug"))
        else:
            self.send_error(404)
        self._auto_log()

    def do_PATCH(self):
        with authz.request_scope():
            self._do_patch_inner()

    def _do_patch_inner(self):
        self._req_start = time.time()
        if self._handle_scim_request("PATCH"):
            return
        m = re.match(r"^/api/auth/users/([^/]+)$", self.path)
        if m:
            self._auth_update_user(m.group(1))
        elif re.match(r"^/api/repo-sources/[^/]+$", self.path):
            if self._require_auth() is None: return
            rag_path = self.path.replace("/api/repo-sources", "/repo-sources")
            self._proxy_rag(rag_path, method="PATCH")
        elif re.match(r"^/api/sources/[^/]+/filters/[^/]+$", self.path):
            if self._require_auth() is None: return
            self._proxy_rag(self.path, method="PATCH")
        elif self.path == "/api/monitoring/config":
            if self._require_auth(
                required_roles=["admin"],
                permission="system.diag.access",
            ) is None: return
            self._monitoring_config_update()
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)$", self.path)):
            self._handle_patch_org(m.group("slug"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/roles/(?P<id>[^/]+)$", self.path)):
            self._handle_patch_role(m.group("slug"), m.group("id"))
        elif (m := re.match(r"^/api/projects/(?P<slug>[^/]+)/members/(?P<uid>[^/]+)$", self.path)):
            self._handle_set_project_member_roles(m.group("slug"), m.group("uid"))
        else:
            self.send_error(404)
        self._auto_log()

    def do_GET(self):
        with authz.request_scope():
            self._do_get_inner()

    def _do_get_inner(self):
        self._req_start = time.time()
        if self._handle_scim_request("GET"):
            return
        # Auth endpoints
        if self.path == "/api/auth/me":
            self._auth_me()
        elif self.path == "/api/me":
            self._handle_get_me()
        elif self.path == "/api/projects":
            self._handle_get_projects()
        elif (m := re.match(r"^/api/projects/(?P<slug>[^/]+)$", self.path)):
            self._handle_get_project_detail(m.group("slug"))
        elif (m := re.match(r"^/api/projects/(?P<slug>[^/]+)/metrics(?:\?.*)?$", self.path)):
            self._handle_get_project_metrics(m.group("slug"))
        elif (m := re.match(r"^/api/projects/(?P<slug>[^/]+)/recent-failures(?:\?.*)?$", self.path)):
            self._handle_get_project_recent_failures(m.group("slug"))
        elif (m := re.match(r"^/api/projects/(?P<slug>[^/]+)/members$", self.path)):
            self._handle_get_project_members(m.group("slug"))
        elif (m := re.match(r"^/api/projects/(?P<slug>[^/]+)/assignable-roles$", self.path)):
            self._handle_get_assignable_roles(m.group("slug"))
        elif self.path == "/api/auth/users":
            self._auth_list_users()
        # Public
        elif self.path == "/api/health":
            self._health_check()
        # ── History / stats persistence ──────────────────────────────────
        elif self.path == "/api/history":
            user = self._require_auth()
            if user is None: return
            self._json_response({"history": _history_list(user["id"])})
        elif self.path == "/api/stats":
            user = self._require_auth()
            if user is None: return
            self._json_response(_stats_get(user["id"]))
        # Authenticated GET endpoints
        elif self.path == "/api/models":
            if self._require_auth() is None: return
            self._list_models()
        elif self.path == "/api/providers":
            if self._require_auth() is None: return
            self._list_providers()
        elif self.path == "/api/xray-meta":
            if self._require_auth() is None: return
            self._xray_meta()
        elif self.path == "/api/config":
            if self._require_auth() is None: return
            self._config_get()
        elif self.path == "/api/sources":
            if self._require_auth() is None: return
            self._proxy_rag(self.path)
        elif self.path == "/api/repo-sources":
            if self._require_auth() is None: return
            self._proxy_rag("/api/repo-sources")
        elif self.path == "/api/rag-sync/status":
            if self._require_auth() is None: return
            self._proxy_rag("/api/ingest/jira/status")
        elif self.path == "/api/swagger-sync/status":
            if self._require_auth() is None: return
            self._proxy_rag("/api/ingest/swagger/status")
        elif self.path == "/api/swagger-sources":
            if self._require_auth() is None: return
            self._proxy_rag("/api/swagger-sources")
        elif self.path == "/api/web-sources":
            if self._require_auth() is None: return
            self._proxy_rag("/api/web-sources")
        elif self.path == "/api/web-sync/status":
            if self._require_auth() is None: return
            self._proxy_rag("/api/ingest/web/status")
        elif self.path == "/api/repo-sync/status":
            if self._require_auth() is None: return
            self._proxy_rag("/api/ingest/repos/status")
        elif self.path == "/api/codebase-sync/status":
            if self._require_auth() is None: return
            self._proxy_rag("/api/ingest/codebase/status")
        elif self.path == "/api/schedule/status":
            if self._require_auth() is None: return
            self._proxy_rag("/schedule/status")
        elif self.path == "/api/auth/apikey":
            self._auth_get_apikey()
        elif re.match(r"^/api/webhook/jobs/[^/]+$", self.path):
            if self._require_auth() is None: return
            self._webhook_job_status(self.path.split("/")[-1])
        # ── Monitoring (admin-only) ──────────────────────────────────────
        elif self.path.startswith("/api/feedback/scores"):
            if self._require_auth() is None: return
            self._feedback_scores()
        elif self.path == "/api/monitoring/stats":
            if self._require_auth(
                required_roles=["admin"],
                permission="system.diag.access",
            ) is None: return
            self._monitoring_stats()
        elif self.path == "/api/monitoring/health":
            if self._require_auth(
                required_roles=["admin"],
                permission="system.diag.access",
            ) is None: return
            self._monitoring_health()
        elif self.path.startswith("/api/monitoring/health/history"):
            if self._require_auth(
                required_roles=["admin"],
                permission="system.diag.access",
            ) is None: return
            self._monitoring_health_history()
        elif self.path.startswith("/api/monitoring/usage"):
            if self._require_auth(
                required_roles=["admin"],
                permission="system.diag.access",
            ) is None: return
            self._monitoring_usage()
        elif self.path.startswith("/api/monitoring/users"):
            if self._require_auth(
                required_roles=["admin"],
                permission="system.diag.access",
            ) is None: return
            self._monitoring_users()
        elif self.path.startswith("/api/monitoring/errors"):
            if self._require_auth(
                required_roles=["admin"],
                permission="system.diag.access",
            ) is None: return
            self._monitoring_errors()
        elif self.path.startswith("/api/monitoring/logs/stream"):
            self._monitoring_logs_stream()  # handles own auth
        elif self.path.startswith("/api/monitoring/logs"):
            if self._require_auth(
                required_roles=["admin"],
                permission="system.diag.access",
            ) is None: return
            self._monitoring_logs()
        elif self.path == "/api/monitoring/config":
            if self._require_auth(
                required_roles=["admin"],
                permission="system.diag.access",
            ) is None: return
            self._monitoring_config_get()
        # ── Fine-tuning GET routes ───────────────────────────────
        elif self.path == "/api/finetune/stats":
            if self._require_auth(
                required_roles=["admin", "trainer"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._json_response(fine_tuning.get_pair_stats())
        elif self.path.startswith("/api/finetune/pairs"):
            if self._require_auth(
                required_roles=["admin", "trainer"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_pairs_list()
        elif self.path == "/api/finetune/jobs":
            if self._require_auth(
                required_roles=["admin", "trainer"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._json_response(fine_tuning.list_jobs())
        elif self.path == "/api/finetune/cloud/status":
            if self._require_auth(
                required_roles=["admin"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._json_response(fine_tuning.get_cloud_status() or {})
        elif self.path == "/api/finetune/export":
            if self._require_auth(
                required_roles=["admin"],
                permission="fine_tuning.use",
                project_id=self._current_project_id(),
            ) is None: return
            self._ft_export()
        elif self.path.startswith("/api/finetune/train/status"):
            self._ft_train_sse()  # Auth handled inside via ?token= query param
        elif self.path.startswith("/api/finetune/collect/status"):
            self._ft_train_sse()
        elif self.path.startswith("/api/finetune/deploy/status"):
            self._ft_train_sse()
        elif re.match(r"^/api/agent/[^/]+/trace(\?|$)", self.path):
            job_id = self.path.split("/")[3]
            self._agent_trace(job_id)
        elif re.match(r"^/api/agent/[^/]+/stream(\?|$)", self.path):
            job_id = self.path.split("/")[3]
            self._agent_stream(job_id)
        elif re.match(r"^/api/agent/jobs/[^/]+$", self.path):
            job_id = self.path.split("/")[-1]
            self._agent_job_status(job_id)
        elif re.match(r"^/api/legal/[a-z-]+$", self.path):
            self._serve_legal(self.path.split("/")[-1])
        elif self.path == "/api/docs":
            self._serve_openapi()
        elif self.path == "/api/user/data/export":
            user = self._require_auth()
            if user is None:
                return
            result = _gdpr_export_data(user)
            self._json_response(result)
            return
        # ── New module GET routes ──────────────────────────────────────
        elif self.path == "/api/onboarding":
            user = self._require_auth()
            if user is None: return
            self._json_response(onboarding.get_state(user["id"]))
        elif self.path.startswith("/api/test-runs"):
            user = self._require_auth()
            if user is None: return
            params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
            project = params.get("project", [None])[0]
            action = params.get("action", ["list"])[0]
            if action == "trend":
                self._json_response(test_runs.get_trend(project or "default"))
            elif action == "flaky":
                self._json_response(test_runs.get_flaky_tests(project or "default"))
            elif action == "healing":
                self._json_response(test_runs.get_healing_rate(project or "default"))
            else:
                self._json_response(test_runs.get_runs(project=project))
        elif self.path == "/api/workspace/history":
            user = self._require_auth()
            if user is None: return
            self._json_response(workspace.get_shared_history())
        elif self.path == "/api/workspace/pending-reviews":
            user = self._require_auth()
            if user is None: return
            self._json_response(workspace.get_pending_reviews())
        elif self.path.startswith("/api/perf-results"):
            user = self._require_auth()
            if user is None: return
            params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
            action = params.get("action", ["list"])[0]
            scenario = params.get("scenario", [None])[0]
            tool = params.get("tool", [None])[0]
            if action == "trend" and scenario:
                self._json_response(perf_results.get_trend(scenario))
            else:
                self._json_response(perf_results.get_results(tool=tool, scenario=scenario))
        elif re.match(r"^/auth/[^/]+/login$", self.path.split("?", 1)[0]):
            self._handle_sso_login_get()
        elif re.match(r"^/auth/[^/]+/oidc/callback$", self.path.split("?", 1)[0]):
            self._handle_oidc_callback()
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)$", self.path)):
            self._handle_get_org(m.group("slug"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/roles$", self.path)):
            self._handle_list_roles(m.group("slug"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/idp-config$", self.path)):
            self._handle_get_idp_config(m.group("slug"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/group-mappings$", self.path)):
            self._handle_list_group_mappings(m.group("slug"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/audit/export\.csv$", self.path.split("?", 1)[0])):
            self._handle_export_audit_csv(m.group("slug"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/audit/export\.ndjson$", self.path.split("?", 1)[0])):
            self._handle_export_audit_ndjson(m.group("slug"))
        elif (m := re.match(r"^/api/orgs/(?P<slug>[^/]+)/audit$", self.path.split("?", 1)[0])):
            self._handle_query_audit(m.group("slug"))
        elif re.match(r"^/api/orgs/[^/]+/scim/tokens$", self.path):
            self._handle_scim_token_list()
        elif self.path == "/api/system/orgs":
            self._handle_list_system_orgs()
        else:
            # Serve static files with no-cache so the browser always gets
            # the latest HTML/JS after a server restart.
            file_path = self.translate_path(self.path)
            if os.path.isfile(file_path) and file_path.endswith((".html", ".js", ".css")):
                try:
                    with open(file_path, "rb") as f:
                        content = f.read()
                    ext = file_path.rsplit(".", 1)[-1]
                    ctype = {"html": "text/html; charset=utf-8",
                             "js":   "application/javascript",
                             "css":  "text/css"}.get(ext, "application/octet-stream")
                    self.send_response(200)
                    self.send_header("Content-Type", ctype)
                    self.send_header("Content-Length", str(len(content)))
                    self.send_header("Cache-Control", "no-cache, no-store, must-revalidate")
                    self.send_header("Pragma", "no-cache")
                    self.end_headers()
                    self.wfile.write(content)
                except BrokenPipeError:
                    pass  # client disconnected — safe to ignore
                except OSError:
                    self.send_error(404)
            else:
                try:
                    super().do_GET()
                except BrokenPipeError:
                    pass  # client (e.g. Chrome DevTools probe) disconnected
        self._auto_log()

    # ── CORS ─────────────────────────────────────────────────────────────

    def _cors_headers(self):
        origin = self.headers.get("Origin", "")
        if "*" in _CORS_ORIGINS:
            self.send_header("Access-Control-Allow-Origin", "*")
        elif origin and origin in _CORS_ORIGINS:
            self.send_header("Access-Control-Allow-Origin", origin)
            self.send_header("Vary", "Origin")
        self.send_header("Access-Control-Allow-Methods", "GET, POST, PATCH, DELETE, OPTIONS")
        self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Api-Key")

    # ── Auth middleware ───────────────────────────────────────────────────────

    def _authenticate(self) -> dict | None:
        """Verify the request credential.

        Three paths, checked in order:
          1. ``qac_session`` cookie (set by the org-aware /auth/<slug>/login
             route from Plan 2/3). Looks up the session via
             ``identity.session_for_token`` and resolves the user from the
             ``users`` table. This is the path the new dashboards + admin UI
             rely on.
          2. ``X-Api-Key`` header (legacy programmatic clients).
          3. ``Authorization: Bearer <jwt>`` (legacy JWT clients).

        Returns the user dict on success, or writes a 401 and returns None.
        Sets ``self._req_user`` as a side-effect on success.
        """
        # ── Session cookie path (qac_session=...) ─────────────────────────
        cookie_header = self.headers.get("Cookie") or ""
        session_token: str | None = None
        for piece in cookie_header.split(";"):
            piece = piece.strip()
            if piece.startswith("qac_session="):
                session_token = piece[len("qac_session="):]
                break
        if session_token:
            try:
                sess = identity.session_for_token(token=session_token)
            except RuntimeError:
                # identity.set_db_path() not called yet — fall through
                sess = None
            if sess:
                with _USERS_LOCK:
                    users = _load_users()
                user = next((u for u in users if u["id"] == sess["user_id"]), None)
                if user and user.get("active"):
                    self._req_user = user
                    return user
                # Cookie present but user not found / inactive — try other paths
                # before giving up (don't 401 yet).
        # ── API key path (X-Api-Key header) ───────────────────────────────
        api_key_header = self.headers.get("X-Api-Key", "").strip()
        if api_key_header:
            user = _require_api_key(api_key_header)
            if not user:
                self._json_response({"error": "Unauthorized"}, status=401)
                return None
            self._req_user = user
            return user
        # ── JWT path (Authorization: Bearer <token>) ───────────────────────
        auth_header = self.headers.get("Authorization", "")
        if not auth_header.startswith("Bearer "):
            self._json_response({"error": "Unauthorized"}, status=401)
            return None
        token = auth_header[7:]
        payload = _jwt_decode(token)
        if not payload:
            self._json_response({"error": "Unauthorized"}, status=401)
            return None
        with _USERS_LOCK:
            users = _load_users()
        user = next((u for u in users if u["id"] == payload.get("sub")), None)
        if not user or not user.get("active"):
            self._json_response({"error": "Unauthorized"}, status=401)
            return None
        self._req_user = user
        return user

    def _current_project_id(self) -> str | None:
        """Resolve the project for this request: query param, body field, or default."""
        from urllib.parse import urlparse, parse_qs
        qs = parse_qs(urlparse(self.path).query)
        pid = qs.get("project_id", [None])[0]
        if pid:
            return pid
        # Fallback to the Default Project's id (until per-project routing lands in Plan 5).
        return "default-project"

    def _current_org_id(self) -> str:
        """Resolve the org for this request: query param org_id, or default-org fallback."""
        from urllib.parse import urlparse, parse_qs
        qs = parse_qs(urlparse(self.path).query)
        oid = qs.get("org_id", [None])[0]
        return oid or "default-org"

    def _require_auth(
        self,
        required_roles: list | None = None,
        *,
        permission: str | None = None,
        project_id: str | None = None,
        org_id: str | None = None,
    ) -> dict | None:
        """Authenticate + authorize the request.

        Returns the ``user`` dict on success, or None (and writes a 401/403)
        on failure.

        - When MULTI_PROJECT_ENABLED is off: behaves exactly as before —
          checks the user's flat ``role`` column against ``required_roles``.
        - When MULTI_PROJECT_ENABLED is on: if ``permission`` is provided,
          consults ``authz.require`` against the project_id/org_id scope.
          If only ``required_roles`` is provided, falls back to the legacy
          path so unmigrated call sites keep working during the cutover.
        """
        # --- 1. Authenticate (unchanged) -----------------------------------
        user = self._authenticate()
        if user is None:
            return None

        # --- 2. Authorize --------------------------------------------------
        if MULTI_PROJECT_ENABLED and permission is not None:
            try:
                authz.require(
                    user_id=user["id"],
                    action=permission,
                    project_id=project_id,
                    org_id=org_id,
                )
            except authz.PermissionDenied as e:
                self._json_response(
                    {"error": "Forbidden", "code": "permission_denied",
                     "missing": e.action, "project_id": e.project_id,
                     "org_id": e.org_id},
                    status=403,
                )
                return None
            return user

        # Legacy path (flag off OR call site not yet migrated)
        if required_roles and user["role"] not in required_roles:
            self._json_response({"error": "Forbidden"}, status=403)
            return None
        return user

    def _block_reader(self) -> bool:
        """Guard for write/generate actions — readers are not allowed. Returns True if blocked."""
        user = self._require_auth()
        if user is None:
            return True
        if user["role"] == "reader":
            self._json_response({"error": "Forbidden — reader role cannot perform this action"}, status=403)
            return True
        return False

    # ── Monitoring log helper ────────────────────────────────────────────────

    # ── Monitoring route handlers ────────────────────────────────────────────

    def _feedback_scores(self):
        """Return feedback boost data for given chunk IDs."""
        qs = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(qs)
        chunk_ids = params.get("ids", [""])[0].split(",")
        chunk_ids = [c.strip() for c in chunk_ids if c.strip()]
        if not chunk_ids:
            self._json_response({"boosts": {}})
            return

        now = datetime.datetime.now(datetime.UTC)
        cutoff_90 = (now - datetime.timedelta(days=90)).isoformat() + "Z"
        conn = sqlite3.connect(str(COVERAGE_DB))
        conn.execute("PRAGMA journal_mode=WAL")
        try:
            boosts = {}
            placeholders = ",".join("?" for _ in chunk_ids)
            rows = conn.execute(
                f"SELECT chunk_id, MAX(timestamp) as latest FROM feedback "
                f"WHERE chunk_id IN ({placeholders}) GROUP BY chunk_id",
                chunk_ids,
            ).fetchall()
            for row in rows:
                cid, latest = row[0], row[1]
                boost = 0.05 if latest >= cutoff_90 else 0.02
                boosts[cid] = boost
        finally:
            conn.close()
        self._json_response({"boosts": boosts})

    def _monitoring_stats(self):
        self._json_response(monitoring.get_stats())

    def _monitoring_health(self):
        snap = monitoring.get_latest_snapshot()
        self._json_response(snap or {"error": "no snapshots yet"})

    def _monitoring_health_history(self):
        qs = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(qs)
        hours = int(params.get("hours", ["24"])[0])
        self._json_response(monitoring.get_snapshot_history(hours=hours))

    def _monitoring_usage(self):
        qs = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(qs)
        days = int(params.get("days", ["7"])[0])
        self._json_response(monitoring.get_usage_breakdown(days=days))

    def _monitoring_users(self):
        qs = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(qs)
        days = int(params.get("days", ["7"])[0])
        self._json_response(monitoring.get_user_activity(days=days))

    def _monitoring_errors(self):
        qs = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(qs)
        self._json_response(monitoring.query_error_logs(
            page=int(params.get("page", ["1"])[0]),
            limit=int(params.get("limit", ["50"])[0]),
            level_filter=params.get("level", [None])[0],
            source_filter=params.get("source", [None])[0],
            date_from=params.get("from", [None])[0],
            date_to=params.get("to", [None])[0],
        ))

    def _monitoring_logs(self):
        qs = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(qs)
        self._json_response(monitoring.query_request_logs(
            page=int(params.get("page", ["1"])[0]),
            limit=int(params.get("limit", ["100"])[0]),
            path_filter=params.get("path", [None])[0],
            user_filter=params.get("user", [None])[0],
            status_min=int(params["status_min"][0]) if "status_min" in params else None,
            status_max=int(params["status_max"][0]) if "status_max" in params else None,
            date_from=params.get("from", [None])[0],
            date_to=params.get("to", [None])[0],
        ))

    def _monitoring_logs_stream(self):
        """SSE endpoint for real-time log tailing. Accepts ?token= for EventSource auth."""
        qs = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(qs)
        token_param = params.get("token", [None])[0]
        if token_param:
            payload = _jwt_decode(token_param)
            if not payload:
                self._json_response({"error": "Unauthorized"}, status=401)
                return
            with _USERS_LOCK:
                users = _load_users()
            user = next((u for u in users if u["id"] == payload.get("sub")), None)
            if not user or user.get("role") != "admin":
                self._json_response({"error": "Forbidden"}, status=403)
                return
        else:
            if self._require_auth(required_roles=["admin"], permission="org.members.manage", org_id=self._current_org_id()) is None:
                return
        self.send_response(200)
        self._cors_headers()
        self.send_header("Content-Type", "text/event-stream; charset=utf-8")
        self.send_header("Cache-Control", "no-cache")
        self.send_header("X-Accel-Buffering", "no")
        self.end_headers()
        monitoring.add_log_subscriber(self.wfile)
        try:
            while True:
                time.sleep(30)
                self.wfile.write(b": keepalive\n\n")
                self.wfile.flush()
        except (BrokenPipeError, ConnectionResetError, OSError):
            pass
        finally:
            monitoring.remove_log_subscriber(self.wfile)

    def _monitoring_config_get(self):
        self._json_response({"retention_days": monitoring.get_retention_days()})

    def _monitoring_config_update(self):
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        days = body.get("retention_days")
        if days is None or not isinstance(days, int) or days < 1:
            self._json_response({"error": "retention_days must be a positive integer"}, status=400)
            return
        monitoring.set_retention_days(days)
        self._json_response({"retention_days": days})

    # ── Fine-tuning handlers ─────────────────────────────────────

    def _ft_collect(self):
        body = json.loads(self.rfile.read(int(self.headers.get("Content-Length", 0))))
        tracks = body.get("tracks", ["istqb"])
        jql = body.get("jql", "")
        limit = body.get("limit", 1000)
        config = body.get("config", {})
        try:
            job_id = fine_tuning.collect_dataset(tracks=tracks, jql=jql, limit=limit, config=config)
            self._json_response({"job_id": job_id})
        except Exception as e:
            self._json_response({"error": str(e)}, status=500)

    def _ft_filter(self):
        body = json.loads(self.rfile.read(int(self.headers.get("Content-Length", 0))))
        min_chars = body.get("min_chars", 100)
        result = fine_tuning.filter_pairs(min_chars=min_chars)
        self._json_response(result)

    def _ft_assemble(self):
        result = fine_tuning.assemble_final()
        self._json_response(result)

    def _ft_review_pair(self):
        m = re.match(r"/api/finetune/pairs/(\d+)/review", self.path)
        if not m:
            self._json_response({"error": "Invalid pair ID"}, status=400)
            return
        pair_id = int(m.group(1))
        body = json.loads(self.rfile.read(int(self.headers.get("Content-Length", 0))))
        action = body.get("action")
        if action == "approve":
            fine_tuning.update_pair(pair_id=pair_id, status="approved", reviewer=self._req_user.get("username", ""))
        elif action == "reject":
            fine_tuning.update_pair(pair_id=pair_id, status="rejected", reviewer=self._req_user.get("username", ""), notes=body.get("notes", ""))
        elif action == "edit":
            fine_tuning.update_pair(pair_id=pair_id, status="approved", reviewer=self._req_user.get("username", ""), messages=body.get("messages"))
        else:
            self._json_response({"error": "Invalid action"}, status=400)
            return
        self._json_response({"ok": True})

    def _ft_pairs_list(self):
        from urllib.parse import urlparse, parse_qs
        qs = parse_qs(urlparse(self.path).query)
        status = qs.get("status", [None])[0]
        track = qs.get("track", [None])[0]
        page = int(qs.get("page", [1])[0])
        limit = int(qs.get("limit", [50])[0])
        result = fine_tuning.query_pairs(status=status, track=track, page=page, limit=limit)
        self._json_response(result)

    def _ft_train(self):
        body = json.loads(self.rfile.read(int(self.headers.get("Content-Length", 0))))
        backend = body.get("backend", "mlx")
        params = body.get("params", {})
        try:
            if backend in ("mlx", "qlora"):
                job_id = fine_tuning.start_local_training(backend=backend, params=params)
            elif backend == "cloud":
                job_id = fine_tuning.start_cloud_training(
                    provider=params.get("provider", "runpod"), api_key=params.get("api_key", ""),
                    gpu_tier=params.get("gpu_tier", "NVIDIA RTX 4090"), params=params,
                    max_budget=params.get("max_budget", 10.0), idle_timeout=params.get("idle_timeout", 1800))
            else:
                self._json_response({"error": f"Unknown backend: {backend}"}, status=400)
                return
            self._json_response({"job_id": job_id})
        except RuntimeError as e:
            self._json_response({"error": str(e)}, status=409)

    def _ft_train_stop(self):
        fine_tuning.cancel_training()
        self._json_response({"ok": True})

    def _ft_cloud_terminate(self):
        body = json.loads(self.rfile.read(int(self.headers.get("Content-Length", 0))))
        fine_tuning.terminate_cloud_instance(provider=body["provider"], api_key=body["api_key"], instance_id=body["instance_id"])
        self._json_response({"ok": True})

    def _ft_deploy(self):
        body = json.loads(self.rfile.read(int(self.headers.get("Content-Length", 0))))
        model_name = body.get("model_name", "qa-copilot:v1")
        model_path = body.get("model_path")
        job_id = fine_tuning.deploy_to_ollama(model_name=model_name, model_path=model_path)
        self._json_response({"job_id": job_id})

    def _ft_import(self):
        result = fine_tuning.import_jsonl(input_dir=fine_tuning._DATASET_DIR)
        self._json_response(result)

    def _webhook_xray_results(self):
        """POST /api/webhook/xray-results — receive Xray execution results."""
        user = self._require_auth()
        if user is None:
            return
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return

        story_key = (body.get("story_key") or "").strip()
        chunk_ids = body.get("chunk_ids") or []
        results   = body.get("results") or []

        if not story_key or not chunk_ids:
            self._json_response({"error": "story_key and chunk_ids required"}, status=400)
            return

        pass_count = sum(1 for r in results if r.get("status") == "pass")
        fail_count = sum(1 for r in results if r.get("status") == "fail")
        outcome = "pass" if pass_count >= fail_count else "fail"

        _db_record_execution(story_key, chunk_ids, outcome)
        print(f"  📊 Xray feedback: {story_key} — {pass_count} pass, {fail_count} fail → {outcome}")
        self._json_response({"ok": True, "outcome": outcome, "chunks_recorded": len(chunk_ids)})

    def _ft_export(self):
        result = fine_tuning.export_jsonl(output_dir=fine_tuning._DATASET_DIR)
        self._json_response(result)

    def _ft_train_sse(self):
        from urllib.parse import urlparse, parse_qs
        qs = parse_qs(urlparse(self.path).query)
        token = qs.get("token", [None])[0]
        if token:
            payload = _jwt_decode(token)
            if not payload:
                self._json_response({"error": "Unauthorized"}, status=401)
                return
            with _USERS_LOCK:
                users = _load_users()
            user = next((u for u in users if u["id"] == payload.get("sub")), None)
            if not user or user.get("role") not in ("admin", "trainer"):
                self._json_response({"error": "Forbidden"}, status=403)
                return
        elif self._require_auth(required_roles=["admin", "trainer"], permission="fine_tuning.use", project_id=self._current_project_id()) is None:
            return
        self.send_response(200)
        self._cors_headers()
        self.send_header("Content-Type", "text/event-stream; charset=utf-8")
        self.send_header("Cache-Control", "no-cache")
        self.send_header("Connection", "keep-alive")
        self.send_header("X-Accel-Buffering", "no")
        self.end_headers()
        # Replay existing log from the most recent job
        active = fine_tuning.get_active_job_log()
        if active and active.get("log"):
            for line in active["log"].strip().split("\n"):
                if line:
                    sse_line = "data: " + json.dumps({"type": "log", "job_id": active["id"], "line": line}) + "\n\n"
                    self.wfile.write(sse_line.encode())
            self.wfile.flush()
        # If the job already finished, send done/error event
        if active and active.get("status") == "completed":
            self.wfile.write(("data: " + json.dumps({"type": "done", "job_id": active["id"]}) + "\n\n").encode())
            self.wfile.flush()
        elif active and active.get("status") == "failed":
            self.wfile.write(("data: " + json.dumps({"type": "error", "job_id": active["id"], "message": active.get("error", "Unknown error")}) + "\n\n").encode())
            self.wfile.flush()

        fine_tuning.add_subscriber(self.wfile)
        try:
            while True:
                time.sleep(30)
                self.wfile.write(b": keepalive\n\n")
                self.wfile.flush()
        except (BrokenPipeError, ConnectionResetError, OSError):
            pass
        finally:
            fine_tuning.remove_subscriber(self.wfile)

    # ── Auth route handlers ───────────────────────────────────────────────────

    def _auth_login(self):
        client_ip = self.client_address[0] if self.client_address else "unknown"
        if not rate_limiter.check_rate_limit(f"auth:{client_ip}", rate_limiter.RATE_LIMIT_AUTH, 60):
            retry = rate_limiter.get_retry_after(f"auth:{client_ip}", 60)
            self._json_response({"error": "Rate limit exceeded", "retry_after": retry}, status=429)
            return
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        val      = (body.get("username_or_email") or "").strip()
        password = body.get("password") or ""
        if not val or not password:
            self._json_response({"error": "Username/email and password are required"}, status=400)
            return
        with _USERS_LOCK:
            users = _load_users()
        user = next(
            (u for u in users if u["username"] == val or u["email"].lower() == val.lower()),
            None
        )
        if not user or not user.get("active") or not _verify_password(password, user["password_hash"]):
            self._json_response({"error": "Invalid credentials"}, status=401)
            return
        self._json_response({"token": _make_token(user), "user": _safe_user(user)})

    def _handle_org_login(self) -> None:
        """Org-aware login: POST /auth/<org_slug>/login with username+password."""
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        m = re.match(r"^/auth/(?P<slug>[^/]+)/login$", self.path)
        if m is None:
            self._json_response({"error": "Bad request"}, status=400)
            return
        org_slug = m.group("slug")  # currently unused except for logging; the
                                    # provider matches user → external_identities
                                    # in Plan 3 (OIDC/SAML); local provider just
                                    # validates username+password against users table.
        body = self._read_json_body()
        username = body.get("username", "").strip()
        password = body.get("password", "")
        if not username or not password:
            self._json_response({"error": "Missing credentials"}, status=400)
            return
        try:
            token, result = identity.authenticate_local(
                username=username,
                password=password,
                ip=self.client_address[0] if self.client_address else None,
                user_agent=self.headers.get("User-Agent") or None,
            )
        except identity.InvalidCredentials:
            self._json_response({"error": "Invalid credentials"}, status=401)
            return
        # Set HttpOnly cookie + return user (sans hash).
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.send_header(
            "Set-Cookie",
            f"qac_session={token}; HttpOnly; Path=/; SameSite=Lax",
        )
        self.end_headers()
        body_out = {
            "id": result["external_subject"],
            "username": result["display_name"],
            "email": result["email"],
            "org_slug": org_slug,
        }
        self.wfile.write(json.dumps(body_out).encode())

    def _handle_accept_invite(self) -> None:
        """POST /auth/accept-invite — set password from a single-use token.

        Body: ``{"token": str, "password": str}``. On success sets a
        ``qac_session`` cookie and returns ``{user}`` so the caller can route
        to /dashboard immediately.
        """
        body = self._read_json_body()
        out = _accept_invite(
            raw_token=(body.get("token") or "").strip(),
            password=body.get("password") or "",
            ip=self.client_address[0] if self.client_address else None,
            user_agent=self.headers.get("User-Agent") or None,
        )
        if out.get("_error"):
            self._json_response({"error": out["_error"]}, status=400)
            return
        token = out["session_token"]
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.send_header(
            "Set-Cookie",
            f"qac_session={token}; HttpOnly; Path=/; SameSite=Lax",
        )
        self.end_headers()
        self.wfile.write(json.dumps({"user": out["user"]}).encode())

    def _handle_sso_login_get(self) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        m = re.match(r"^/auth/(?P<slug>[^/]+)/login$", self.path.split("?", 1)[0])
        if m is None:
            self._json_response({"error": "Bad request"}, status=400)
            return
        org_slug = m.group("slug")
        from urllib.parse import urlparse, parse_qs
        qs = parse_qs(urlparse(self.path).query)
        return_to = qs.get("return_to", ["/"])[0]

        login = identity.start_sso_login(org_slug=org_slug, return_to=return_to)

        if login["kind"] == "local":
            # Local mode — return JSON pointer to the form.
            self._json_response({"kind": "local", "url": login["url"]})
            return

        if login["kind"] == "oidc":
            # Persist state + verifier so the callback can match.
            from identity import sso_state
            org_id = identity._resolve_org_id_by_slug(org_slug)
            sso_state.save(
                USERDATA_DB,
                state=login["state"],
                org_id=org_id,
                code_verifier=login["code_verifier"],
                nonce=None,  # nonce optional in our impl; could derive from state
                return_to=return_to,
            )

        # 302 → IdP authorization endpoint
        self.send_response(302)
        self.send_header("Location", login["url"])
        self.end_headers()

    def _handle_oidc_callback(self) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        m = re.match(r"^/auth/(?P<slug>[^/]+)/oidc/callback$",
                     self.path.split("?", 1)[0])
        if m is None:
            self._json_response({"error": "Bad request"}, status=400)
            return
        org_slug = m.group("slug")
        from urllib.parse import urlparse, parse_qs
        qs = parse_qs(urlparse(self.path).query)
        code = qs.get("code", [None])[0]
        state = qs.get("state", [None])[0]
        if not code or not state:
            self._json_response({"error": "Missing code or state"}, status=400)
            return

        from identity import sso_state
        rec = sso_state.consume(USERDATA_DB, state=state)
        if rec is None:
            self._json_response({"error": "Invalid or expired state"}, status=400)
            return
        try:
            token, result, user_id = identity.complete_oidc_callback(
                org_slug=org_slug,
                code=code,
                code_verifier=rec["code_verifier"],
                expected_nonce=rec["nonce"],
                ip=self.client_address[0] if self.client_address else None,
                user_agent=self.headers.get("User-Agent") or None,
            )
        except Exception as e:
            self._json_response(
                {"error": "SSO callback failed", "detail": str(e)}, status=400,
            )
            return

        # 302 to return_to with the session cookie set
        self.send_response(302)
        self.send_header(
            "Set-Cookie",
            f"qac_session={token}; HttpOnly; Path=/; SameSite=Lax",
        )
        self.send_header("Location", rec["return_to"] or "/")
        self.end_headers()

    def _handle_saml_acs(self) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        m = re.match(r"^/auth/(?P<slug>[^/]+)/saml/acs$", self.path)
        if m is None:
            self._json_response({"error": "Bad request"}, status=400)
            return
        org_slug = m.group("slug")
        # SAML ACS arrives as application/x-www-form-urlencoded with SAMLResponse=...
        length = int(self.headers.get("Content-Length") or "0")
        from urllib.parse import parse_qs
        raw = self.rfile.read(length).decode() if length else ""
        form = parse_qs(raw)
        saml_b64 = (form.get("SAMLResponse") or [None])[0]
        relay_state = (form.get("RelayState") or ["/"])[0]
        if not saml_b64:
            self._json_response({"error": "Missing SAMLResponse"}, status=400)
            return
        try:
            token, result, user_id = identity.complete_saml_acs(
                org_slug=org_slug,
                saml_response_b64=saml_b64,
                ip=self.client_address[0] if self.client_address else None,
                user_agent=self.headers.get("User-Agent") or None,
            )
        except Exception as e:
            self._json_response(
                {"error": "SAML ACS failed", "detail": str(e)}, status=400,
            )
            return

        self.send_response(302)
        self.send_header(
            "Set-Cookie",
            f"qac_session={token}; HttpOnly; Path=/; SameSite=Lax",
        )
        self.send_header("Location", relay_state or "/")
        self.end_headers()

    def _handle_sso_discover(self) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        body = self._read_json_body()
        email = (body.get("email") or "").strip()
        if not email:
            self._json_response({"error": "Missing email"}, status=400)
            return
        self._json_response(_sso_discover_for_email(email))

    def _scim_token_org_id_from_path(self) -> tuple[str | None, str | None]:
        """Parse /api/orgs/<slug>/... and resolve to org_id. Returns (org_id, slug)."""
        m = re.match(r"^/api/orgs/(?P<slug>[^/]+)/", self.path)
        if not m:
            return None, None
        slug = m.group("slug")
        oid = identity._resolve_org_id_by_slug(slug)
        return oid, slug

    def _handle_scim_token_mint(self) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        org_id, slug = self._scim_token_org_id_from_path()
        if not org_id:
            self._json_response({"error": "Unknown org"}, status=404)
            return
        user = self._require_auth(
            permission="org.sso.manage", org_id=org_id,
        )
        if user is None:
            return
        from scim.auth import mint_scim_token
        body = self._read_json_body()
        scope = (body.get("scope") or "scim").strip()
        raw, token_id = mint_scim_token(USERDATA_DB, org_id=org_id, scope=scope)
        authz.audit(actor_user_id=user["id"], org_id=org_id,
                    action="org.scim.token_minted",
                    target_type="scim_token", target_id=token_id,
                    details={"scope": scope})
        self._json_response({"id": token_id, "token": raw, "scope": scope}, status=201)

    def _handle_scim_token_list(self) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        org_id, slug = self._scim_token_org_id_from_path()
        if not org_id:
            self._json_response({"error": "Unknown org"}, status=404)
            return
        user = self._require_auth(
            permission="org.sso.manage", org_id=org_id,
        )
        if user is None:
            return
        from scim.auth import list_scim_tokens
        self._json_response({"tokens": list_scim_tokens(USERDATA_DB, org_id=org_id)})

    def _handle_scim_token_revoke(self) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        org_id, slug = self._scim_token_org_id_from_path()
        if not org_id:
            self._json_response({"error": "Unknown org"}, status=404)
            return
        user = self._require_auth(
            permission="org.sso.manage", org_id=org_id,
        )
        if user is None:
            return
        m = re.match(r"^/api/orgs/[^/]+/scim/tokens/(?P<id>[^/]+)$", self.path)
        token_id = m.group("id") if m else ""
        from scim.auth import revoke_scim_token
        revoke_scim_token(USERDATA_DB, token_id=token_id)
        authz.audit(actor_user_id=user["id"], org_id=org_id,
                    action="org.scim.token_revoked",
                    target_type="scim_token", target_id=token_id)
        self._json_response({"id": token_id, "revoked": True})

    def _auth_register(self):
        client_ip = self.client_address[0] if self.client_address else "unknown"
        if not rate_limiter.check_rate_limit(f"auth:{client_ip}", rate_limiter.RATE_LIMIT_AUTH, 60):
            retry = rate_limiter.get_retry_after(f"auth:{client_ip}", 60)
            self._json_response({"error": "Rate limit exceeded", "retry_after": retry}, status=429)
            return
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        username = (body.get("username") or "").strip()
        email    = (body.get("email") or "").strip().lower()
        password = body.get("password") or ""
        if not username or not email or not password:
            self._json_response({"error": "username, email and password are required"}, status=400)
            return
        if "@" not in email:
            self._json_response({"error": "Invalid email address"}, status=400)
            return
        if len(password) < 8:
            self._json_response({"error": "Password must be at least 8 characters"}, status=400)
            return
        with _USERS_LOCK:
            users = _load_users()
            if any(u["username"] == username for u in users):
                self._json_response({"error": "Username already taken"}, status=409)
                return
            if any(u["email"].lower() == email for u in users):
                self._json_response({"error": "Email already registered"}, status=409)
                return
            new_user = {
                "id":            str(uuid.uuid4()),
                "username":      username,
                "email":         email,
                "password_hash": _hash_password(password),
                "role":          "user",
                "active":        True,
                "created_at":    datetime.datetime.now(datetime.timezone.utc).isoformat() + "Z",
            }
            users.append(new_user)
            _save_users(users)
        self._json_response({"token": _make_token(new_user), "user": _safe_user(new_user)})

    def _auth_me(self):
        user = self._require_auth()
        if user is None:
            return
        self._json_response({"user": _safe_user(user)})

    def _handle_get_me(self) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None:
            return  # _authenticate already wrote 401
        self._json_response(_compute_me_payload(user_id=user["id"]))

    def _handle_get_projects(self) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None:
            return
        self._json_response(_compute_projects_payload(user_id=user["id"]))

    def _handle_get_project_detail(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _compute_project_detail_payload(user_id=user["id"], project_slug=slug)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        self._json_response(out)

    def _handle_get_project_metrics(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        from urllib.parse import urlparse, parse_qs
        qs = parse_qs(urlparse(self.path).query)
        range_str = qs.get("range", ["30d"])[0]
        days = 30
        if range_str.endswith("d"):
            try:
                days = int(range_str[:-1])
            except ValueError:
                pass
        out = _compute_project_metrics_payload(user_id=user["id"],
                                               project_slug=slug, range_days=days)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden",
                                 "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_get_project_recent_failures(self, slug: str) -> None:
        # Same gating as metrics; shape is { "failures": [...] }.
        # Plan 5 stub: returns empty list (Plan 5 doesn't yet have the failures table
        # joined to projects; future plan will populate).
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        pair = _resolve_project_by_slug_for_user(user_id=user["id"], project_slug=slug)
        if not pair:
            self._json_response({"error": "Not found"}, status=404)
            return
        import authz
        if not authz.can(user_id=user["id"], action="tests.view", project_id=pair[0]):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": "tests.view"}, status=403)
            return
        self._json_response({"failures": []})

    def _handle_get_project_members(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _compute_project_members_payload(user_id=user["id"], project_slug=slug)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_get_assignable_roles(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _compute_assignable_roles_payload(user_id=user["id"], project_slug=slug)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_invite_project_member(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        body = self._read_json_body()
        out = _invite_project_member(user_id=user["id"], project_slug=slug, body=body)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        if out.get("_error"):
            self._json_response({"error": out["_error"]}, status=400)
            return
        self._json_response(out, status=201)

    def _handle_set_project_member_roles(self, slug: str, target_uid: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        body = self._read_json_body()
        out = _set_project_member_roles(
            user_id=user["id"], project_slug=slug,
            target_user_id=target_uid, body=body,
        )
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        if out.get("_error"):
            self._json_response({"error": out["_error"]}, status=400)
            return
        self._json_response(out)

    def _handle_remove_project_member(self, slug: str, target_uid: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _remove_project_member(
            user_id=user["id"], project_slug=slug, target_user_id=target_uid,
        )
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        if out.get("_error"):
            self._json_response({"error": out["_error"]}, status=400)
            return
        self._json_response(out)

    def _handle_get_org(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _compute_org_payload(user_id=user["id"], org_slug=slug)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_patch_org(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        body = self._read_json_body()
        out = _patch_org(user_id=user["id"], org_slug=slug, patch=body)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_list_roles(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _compute_roles_payload(user_id=user["id"], org_slug=slug)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_create_role(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        body = self._read_json_body()
        out = _create_role(user_id=user["id"], org_slug=slug, body=body)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        if out.get("_error"):
            self._json_response({"error": out["_error"]}, status=400)
            return
        self._json_response(out, status=201)

    def _handle_create_project(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        body = self._read_json_body()
        out = _create_project(user_id=user["id"], org_slug=slug, body=body)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        if out.get("_error"):
            self._json_response({"error": out["_error"]}, status=400)
            return
        self._json_response(out, status=201)

    def _handle_patch_role(self, slug: str, role_id: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        body = self._read_json_body()
        out = _patch_role(user_id=user["id"], org_slug=slug, role_id=role_id, body=body)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_delete_role(self, slug: str, role_id: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _delete_role(user_id=user["id"], org_slug=slug, role_id=role_id)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_get_idp_config(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _compute_idp_config_payload(user_id=user["id"], org_slug=slug)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_put_idp_config(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        body = self._read_json_body()
        out = _put_idp_config(user_id=user["id"], org_slug=slug, body=body)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_delete_idp_config(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _disable_idp_config_helper(user_id=user["id"], org_slug=slug)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_list_group_mappings(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _compute_group_mappings_payload(user_id=user["id"], org_slug=slug)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_create_group_mapping(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        body = self._read_json_body()
        out = _create_group_mapping(user_id=user["id"], org_slug=slug, body=body)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        if out.get("_error"):
            self._json_response({"error": out["_error"]}, status=400)
            return
        self._json_response(out, status=201)

    def _handle_delete_group_mapping(self, slug: str, mapping_id: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        out = _delete_group_mapping(user_id=user["id"], org_slug=slug, mapping_id=mapping_id)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_query_audit(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
        action_prefix = params.get("action_prefix", [None])[0]
        actor = params.get("actor", [None])[0]
        since = params.get("since", [None])[0]
        until = params.get("until", [None])[0]
        limit = int(params.get("limit", ["100"])[0])
        out = _compute_audit_payload(
            user_id=user["id"], org_slug=slug,
            action_prefix=action_prefix, actor=actor,
            since=since, until=until, limit=limit,
        )
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response({"error": "Forbidden", "code": "permission_denied",
                                 "missing": out["missing"]}, status=403)
            return
        self._json_response(out)

    def _handle_export_audit_csv(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
        filters: dict = {}
        if params.get("action_prefix"):
            filters["action_prefix"] = params["action_prefix"][0]
        if params.get("actor"):
            filters["actor_user_id"] = params["actor"][0]
        if params.get("since"):
            filters["since"] = params["since"][0]
        if params.get("until"):
            filters["until"] = params["until"][0]
        if params.get("limit"):
            filters["limit"] = int(params["limit"][0])
        body = _export_audit_csv(user_id=user["id"], org_slug=slug, **filters)
        _send_file_response(self, body, "text/csv; charset=utf-8", "audit.csv")

    def _handle_export_audit_ndjson(self, slug: str) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None: return
        params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
        filters: dict = {}
        if params.get("action_prefix"):
            filters["action_prefix"] = params["action_prefix"][0]
        if params.get("actor"):
            filters["actor_user_id"] = params["actor"][0]
        if params.get("since"):
            filters["since"] = params["since"][0]
        if params.get("until"):
            filters["until"] = params["until"][0]
        if params.get("limit"):
            filters["limit"] = int(params["limit"][0])
        body = _export_audit_ndjson(user_id=user["id"], org_slug=slug, **filters)
        _send_file_response(self, body, "application/x-ndjson; charset=utf-8", "audit.ndjson")

    # ------------------------------------------------------------------
    # System admin — orgs list / suspend / restore
    # ------------------------------------------------------------------

    def _handle_list_system_orgs(self) -> None:
        """GET /api/system/orgs — system admin only."""
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None:
            return
        out = _compute_system_orgs_payload(user_id=user["id"])
        if out.get("_forbidden"):
            self._json_response(
                {"error": "Forbidden", "code": "permission_denied",
                 "missing": out["missing"]},
                status=403,
            )
            return
        self._json_response(out)

    def _handle_create_org(self) -> None:
        """POST /api/system/orgs — system admin only."""
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None:
            return
        body = self._read_json_body()
        out = _create_org(user_id=user["id"], body=body)
        if out.get("_forbidden"):
            self._json_response(
                {"error": "Forbidden", "code": "permission_denied",
                 "missing": out["missing"]},
                status=403,
            )
            return
        if out.get("_error"):
            self._json_response({"error": out["_error"]}, status=400)
            return
        self._json_response(out, status=201)

    def _handle_suspend_org(self, slug: str) -> None:
        """POST /api/system/orgs/<slug>/suspend — system admin only."""
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None:
            return
        out = _suspend_org(user_id=user["id"], org_slug=slug)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response(
                {"error": "Forbidden", "code": "permission_denied",
                 "missing": out["missing"]},
                status=403,
            )
            return
        self._json_response(out)

    def _handle_restore_org(self, slug: str) -> None:
        """POST /api/system/orgs/<slug>/restore — system admin only."""
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        user = self._authenticate()
        if user is None:
            return
        out = _restore_org(user_id=user["id"], org_slug=slug)
        if out is None:
            self._json_response({"error": "Not found"}, status=404)
            return
        if out.get("_forbidden"):
            self._json_response(
                {"error": "Forbidden", "code": "permission_denied",
                 "missing": out["missing"]},
                status=403,
            )
            return
        self._json_response(out)

    def _handle_auth_logout(self) -> None:
        if not MULTI_PROJECT_ENABLED:
            self._json_response({"error": "Multi-project mode disabled"}, status=404)
            return
        # Read the cookie from the request header
        cookie_header = self.headers.get("Cookie") or ""
        token: str | None = None
        for piece in cookie_header.split(";"):
            piece = piece.strip()
            if piece.startswith("qac_session="):
                token = piece[len("qac_session="):]
                break
        if token:
            identity.sign_out(token=token)
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.send_header(
            "Set-Cookie",
            "qac_session=; HttpOnly; Path=/; SameSite=Lax; Max-Age=0",
        )
        self.end_headers()
        self.wfile.write(b"{}")

    def _auth_list_users(self):
        if self._require_auth(required_roles=["admin"], permission="org.members.manage", org_id=self._current_org_id()) is None:
            return
        with _USERS_LOCK:
            users = _load_users()
        self._json_response({"users": [_safe_user(u) for u in users]})

    def _auth_update_user(self, user_id: str):
        current = self._require_auth(required_roles=["admin"], permission="org.members.manage", org_id=self._current_org_id())
        if current is None:
            return
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        with _USERS_LOCK:
            users = _load_users()
            target = next((u for u in users if u["id"] == user_id), None)
            if not target:
                self._json_response({"error": "User not found"}, status=404)
                return
            if "role" in body:
                if body["role"] not in ("admin", "user", "reader", "trainer"):
                    self._json_response({"error": "Invalid role"}, status=400)
                    return
                target["role"] = body["role"]
            if "active" in body:
                if user_id == current["id"] and not body["active"]:
                    self._json_response({"error": "Cannot deactivate your own account"}, status=400)
                    return
                target["active"] = bool(body["active"])
            _save_users(users)
        self._json_response({"user": _safe_user(target)})

    def _auth_delete_user(self, user_id: str):
        current = self._require_auth(required_roles=["admin"], permission="org.members.manage", org_id=self._current_org_id())
        if current is None:
            return
        if user_id == current["id"]:
            self._json_response({"error": "Cannot delete your own account"}, status=400)
            return
        with _USERS_LOCK:
            users = _load_users()
            if not any(u["id"] == user_id for u in users):
                self._json_response({"error": "User not found"}, status=404)
                return
            conn = sqlite3.connect(str(USERDATA_DB))
            conn.execute("PRAGMA journal_mode=WAL")
            try:
                conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
                conn.commit()
            finally:
                conn.close()
        self._json_response({"ok": True})

    # ── API key management ────────────────────────────────────────────────

    def _auth_generate_apikey(self):
        """POST /api/auth/apikey — generate (or regenerate) an API key for the current user."""
        user = self._require_auth()
        if user is None:
            return
        new_key = _generate_api_key()
        with _USERS_LOCK:
            users = _load_users()
            target = next((u for u in users if u["id"] == user["id"]), None)
            if not target:
                self._json_response({"error": "User not found"}, status=404)
                return
            target["api_key"] = new_key
            _save_users(users)
        print(f"  🔑 API key generated for {user['username']}")
        self._json_response({"api_key": new_key})

    def _auth_get_apikey(self):
        """GET /api/auth/apikey — return the current user's API key (masked)."""
        user = self._require_auth()
        if user is None:
            return
        with _USERS_LOCK:
            users = _load_users()
            target = next((u for u in users if u["id"] == user["id"]), None)
        key = (target or {}).get("api_key", "")
        if not key:
            self._json_response({"api_key": None})
        else:
            # Return last 8 chars visible so user can identify which key is active
            self._json_response({"api_key": "••••••••" + key[-8:]})

    # ── CI/CD webhook ─────────────────────────────────────────────────────

    def _webhook_generate(self):
        """
        POST /api/webhook/generate
        Trigger async test generation from a CI/CD pipeline.

        Auth: X-Api-Key header (API key) OR Authorization: Bearer <jwt>

        Body (JSON):
          story_key   str   required  e.g. "PROJ-123"
          story_text  str   required  User story / feature description
          model       str   optional  Ollama model override
          framework   str   optional  selenium|playwright|cypress|appium|tosca (default: selenium)
          style       str   optional  standard|bdd|atd|tdd (default: standard)
          modules     list  optional  ["cas","code","gherkin","risques"] (default: ["cas"])
          max_tokens  int   optional  Override max tokens cap

        Returns: {"job_id": "...", "status": "queued"}
        Poll:    GET /api/webhook/jobs/{job_id}
        """
        user = self._require_auth()
        if user is None:
            return
        if user["role"] == "reader":
            self._json_response({"error": "Forbidden — reader role cannot trigger generation"}, status=403)
            return
        api_key = self.headers.get("X-Api-Key", "unknown")
        if not rate_limiter.check_rate_limit(f"webhook:{api_key[:8]}", rate_limiter.RATE_LIMIT_WEBHOOK, 60):
            retry = rate_limiter.get_retry_after(f"webhook:{api_key[:8]}", 60)
            self._json_response({"error": "Rate limit exceeded", "retry_after": retry}, status=429)
            return

        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return

        story_key  = (body.get("story_key") or "").strip()
        story_text = (body.get("story_text") or "").strip()
        if not story_text:
            self._json_response({"error": "story_text is required"}, status=400)
            return

        model      = body.get("model") or DEFAULT_MODEL
        framework  = body.get("framework", "selenium")
        style      = body.get("style", "standard")
        modules    = body.get("modules") or ["cas"]
        max_tokens = min(int(body.get("max_tokens", MAX_TOKENS_CAP)), MAX_TOKENS_CAP)

        job_id = str(uuid.uuid4())
        now    = time.time()
        with _JOBS_LOCK:
            _JOBS[job_id] = {
                "status":     "queued",
                "created_at": now,
                "story_key":  story_key,
                "result":     None,
                "error":      None,
            }
            # Prune expired jobs while we hold the lock
            expired = [jid for jid, j in _JOBS.items()
                       if now - j["created_at"] > _JOB_TTL_SECONDS and jid != job_id]
            for jid in expired:
                del _JOBS[jid]
        _job_save(job_id, "webhook", "queued", story_key=story_key)

        print(f"  🚀 Webhook job {job_id[:8]}… queued — story={story_key or '(none)'} modules={modules}")

        def _run():
            _JOBS[job_id]["status"] = "running"
            try:
                system_prompt   = _build_webhook_system(style, framework, modules)
                user_content    = f"[{story_key}] {story_text}" if story_key else story_text
                messages        = [{"role": "user", "content": user_content}]
                enriched_system = _enrich_system(system_prompt, user_content)
                job_provider    = body.get("provider") or get_default_provider()

                gen_result = generation.generate(
                    messages=messages, system=enriched_system, model=model,
                    provider=job_provider, module=modules[0] if modules else "cas",
                    user_text=user_content, framework=framework,
                    temperature=0.1, max_tokens=max_tokens,
                )
                raw = json.dumps(gen_result["result"], ensure_ascii=False)
                with _JOBS_LOCK:
                    _JOBS[job_id]["status"] = "done"
                    _JOBS[job_id]["result"] = raw
                    if "confidence" in gen_result:
                        _JOBS[job_id]["confidence"] = gen_result["confidence"]
                _job_save(job_id, "webhook", "done", result=raw)
                print(f"  ✓  Webhook job {job_id[:8]}… done ({len(raw)} chars)")
            except LLMError as exc:
                with _JOBS_LOCK:
                    _JOBS[job_id]["status"] = "error"
                    _JOBS[job_id]["error"]  = str(exc)
                _job_save(job_id, "webhook", "error", error=str(exc))
                print(f"  ✗  Webhook job {job_id[:8]}… error (provider): {exc}")
            except Exception as exc:
                with _JOBS_LOCK:
                    _JOBS[job_id]["status"] = "error"
                    _JOBS[job_id]["error"]  = str(exc)
                _job_save(job_id, "webhook", "error", error=str(exc))
                print(f"  ✗  Webhook job {job_id[:8]}… error: {exc}")
                _log.error(str(exc), path="/api/webhook/generate", job_id=job_id[:8])

        threading.Thread(target=_run, daemon=True).start()
        self._json_response({"job_id": job_id, "status": "queued"})

    def _webhook_job_status(self, job_id: str):
        """GET /api/webhook/jobs/{job_id} — poll the status of a webhook generation job."""
        with _JOBS_LOCK:
            job = _JOBS.get(job_id)
        if not job:
            job = _job_get(job_id)  # fallback to persistent store
        if not job:
            self._json_response({"error": "Job not found"}, status=404)
            return
        resp = {
            "job_id":     job_id,
            "status":     job["status"],
            "story_key":  job.get("story_key"),
            "created_at": job["created_at"],
        }
        if job["status"] == "done":
            resp["result"] = job["result"]
        elif job["status"] == "error":
            resp["error"] = job["error"]
        self._json_response(resp)

    # ── Agent orchestrator ─────────────────────────────────────────────

    def _contact_submit(self):
        """POST /api/contact — public contact form submission."""
        client_ip = self.client_address[0] if self.client_address else "unknown"
        if not rate_limiter.check_rate_limit(f"contact:{client_ip}", rate_limiter.RATE_LIMIT_CONTACT, 60):
            retry = rate_limiter.get_retry_after(f"contact:{client_ip}", 60)
            self._json_response({"error": "Rate limit exceeded", "retry_after": retry}, status=429)
            return
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        name = (body.get("name") or "").strip()
        email = (body.get("email") or "").strip()
        message = (body.get("message") or "").strip()
        if not name or not email or not message:
            self._json_response({"error": "name, email, and message are required"}, status=400)
            return
        if "@" not in email or "." not in email:
            self._json_response({"error": "Invalid email format"}, status=400)
            return
        if len(message) < 10 or len(message) > 5000:
            self._json_response({"error": "Message must be 10-5000 characters"}, status=400)
            return
        # Store in DB
        now = datetime.datetime.now(datetime.timezone.utc).isoformat()
        try:
            conn = sqlite3.connect(str(USERDATA_DB))
            conn.execute("PRAGMA journal_mode=WAL")
            conn.execute(
                "INSERT INTO contact_messages (name, email, message, ip, created_at) VALUES (?, ?, ?, ?, ?)",
                (name, email, message, client_ip, now),
            )
            conn.commit()
            conn.close()
        except Exception as e:
            print(f"  ⚠  Contact DB save failed: {e}")
        # Send email if configured
        try:
            import email_service
            if email_service.is_configured():
                email_service.send_email(
                    os.environ.get("QA_CONTACT_EMAIL", "support@qacopilot.fr"),
                    f"QA Copilot Contact: {html.escape(name)}",
                    f"<p><strong>From:</strong> {html.escape(name)} ({html.escape(email)})</p><p>{html.escape(message)}</p>",
                )
        except Exception:
            pass
        self._json_response({"ok": True})

    def _auth_forgot_password(self):
        """POST /api/auth/forgot-password — send reset email."""
        client_ip = self.client_address[0] if self.client_address else "unknown"
        if not rate_limiter.check_rate_limit(f"reset:{client_ip}", 3, 60):
            retry = rate_limiter.get_retry_after(f"reset:{client_ip}", 60)
            self._json_response({"error": "Rate limit exceeded", "retry_after": retry}, status=429)
            return
        # Purge expired tokens
        try:
            now = datetime.datetime.now(datetime.timezone.utc).isoformat()
            conn = sqlite3.connect(str(USERDATA_DB))
            conn.execute("PRAGMA journal_mode=WAL")
            conn.execute("DELETE FROM password_resets WHERE expires_at < ?", (now,))
            conn.commit()
            conn.close()
        except Exception:
            pass
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        email = (body.get("email") or "").strip().lower()
        if not email:
            self._json_response({"ok": True})  # Don't reveal if email exists
            return
        # Find user by email
        users = _load_users()
        user = next((u for u in users if u.get("email", "").lower() == email), None)
        if not user:
            self._json_response({"ok": True})  # Don't reveal
            return
        # Generate token
        token = secrets.token_urlsafe(32)
        token_hash = hashlib.sha256(token.encode()).hexdigest()
        expires_at = (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1)).isoformat()
        now = datetime.datetime.now(datetime.timezone.utc).isoformat()
        conn = sqlite3.connect(str(USERDATA_DB))
        conn.execute("PRAGMA journal_mode=WAL")
        try:
            # Invalidate previous tokens for this user
            conn.execute("DELETE FROM password_resets WHERE user_id = ?", (user["id"],))
            conn.execute(
                "INSERT INTO password_resets (user_id, token_hash, expires_at, created_at) VALUES (?, ?, ?, ?)",
                (user["id"], token_hash, expires_at, now),
            )
            conn.commit()
        finally:
            conn.close()
        # Send email
        try:
            import email_service
            reset_url = os.environ.get("QA_RESET_URL", "http://localhost:5173/reset-password")
            email_service.send_password_reset(email, token, reset_url)
        except Exception as e:
            print(f"  ⚠  Reset email failed: {e}")
        self._json_response({"ok": True})

    def _auth_reset_password(self):
        """POST /api/auth/reset-password — reset password with token."""
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        token = (body.get("token") or "").strip()
        new_password = (body.get("new_password") or "").strip()
        if not token or not new_password:
            self._json_response({"error": "token and new_password required"}, status=400)
            return
        if len(new_password) < 8:
            self._json_response({"error": "Password must be at least 8 characters"}, status=400)
            return
        token_hash = hashlib.sha256(token.encode()).hexdigest()
        now = datetime.datetime.now(datetime.timezone.utc).isoformat()
        conn = sqlite3.connect(str(USERDATA_DB))
        conn.execute("PRAGMA journal_mode=WAL")
        conn.row_factory = sqlite3.Row
        try:
            row = conn.execute(
                "SELECT * FROM password_resets WHERE token_hash = ? AND expires_at > ?",
                (token_hash, now),
            ).fetchone()
            if not row:
                self._json_response({"error": "Invalid or expired token"}, status=401)
                return
            user_id = row["user_id"]
            conn.execute("DELETE FROM password_resets WHERE token_hash = ?", (token_hash,))
            conn.commit()
        finally:
            conn.close()
        # Update password
        users = _load_users()
        for u in users:
            if u["id"] == user_id:
                u["password_hash"] = _hash_password(new_password)
                break
        _save_users(users)
        self._json_response({"ok": True})

    def _agent_run_v2(self):
        """POST /api/agent/run — start a v2 ReAct agent job."""
        user = self._require_auth()
        if user is None: return
        if user["role"] == "reader":
            self._json_response({"error": "Forbidden"}, status=403); return
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400); return

        story_key = (body.get("story_key") or "").strip()
        story_text = (body.get("story_text") or "").strip()
        modules = body.get("modules") or ["cas"]
        config = body.get("config") or {}
        goal = body.get("goal") or f"Generate {', '.join(modules)} tests for {story_key}"

        if not story_text:
            self._json_response({"error": "story_text required"}, status=400); return

        import agent as agent_mod
        from agent import run_agent_v2

        job_id = str(uuid.uuid4())
        now = datetime.datetime.now(datetime.timezone.utc).isoformat() + "Z"

        with agent_mod._AGENT_LOCK:
            agent_mod._AGENT_JOBS[job_id] = {"status": "queued", "created_at": now, "result": None, "log": []}

        # Create event queue NOW (before thread starts) so SSE can connect immediately
        import queue as _q
        from agent import _AGENT_EVENT_QUEUES
        eq = _q.Queue()
        _AGENT_EVENT_QUEUES[job_id] = eq

        def _run_v2():
            with agent_mod._AGENT_LOCK:
                agent_mod._AGENT_JOBS[job_id]["status"] = "running"
            try:
                outcome = run_agent_v2(
                    goal=goal, story_key=story_key, story_text=story_text,
                    modules=modules, config=config, job_id=job_id,
                )
                with agent_mod._AGENT_LOCK:
                    agent_mod._AGENT_JOBS[job_id]["status"] = outcome.get("status", "done")
                    agent_mod._AGENT_JOBS[job_id]["result"] = outcome
            except Exception as exc:
                with agent_mod._AGENT_LOCK:
                    agent_mod._AGENT_JOBS[job_id]["status"] = "error"
                    agent_mod._AGENT_JOBS[job_id]["result"] = {"error": str(exc)}
                # Signal SSE that job errored
                eq.put({"type": "error", "error": str(exc)})
                eq.put(None)  # sentinel

        threading.Thread(target=_run_v2, daemon=True).start()
        self._json_response({"job_id": job_id, "status": "queued"}, status=202)

    def _serve_legal(self, doc_name: str):
        """GET /api/legal/{doc} — serve a legal document as JSON."""
        legal_dir = SCRIPT_DIR.parent / "docs" / "legal"
        doc_path = legal_dir / f"{doc_name}.md"
        if not doc_path.exists():
            self._json_response({"error": "Document not found"}, status=404)
            return
        content = doc_path.read_text(encoding="utf-8")
        title = content.split("\n", 1)[0].lstrip("# ").strip()
        self._json_response({"title": title, "content": content})

    def _serve_openapi(self):
        """GET /api/docs — serve OpenAPI spec."""
        spec_path = SCRIPT_DIR / "openapi.yaml"
        if not spec_path.exists():
            self._json_response({"error": "OpenAPI spec not found"}, status=404)
            return
        self.send_response(200)
        self._cors_headers()
        self.send_header("Content-Type", "text/yaml; charset=utf-8")
        self.end_headers()
        self.wfile.write(spec_path.read_bytes())

    def _agent_job_status(self, job_id: str):
        """GET /api/agent/jobs/{job_id} — poll agent job status."""
        import agent as agent_mod
        with agent_mod._AGENT_LOCK:
            job = agent_mod._AGENT_JOBS.get(job_id)
        if not job:
            self._json_response({"error": "Job not found"}, status=404); return
        resp = {"job_id": job_id, "status": job["status"], "log": job.get("log", [])}
        if job.get("result"):
            resp["result"] = job["result"]
        if job.get("confidence"):
            resp["confidence"] = job["confidence"]
        self._json_response(resp)

    def _agent_trace(self, job_id: str):
        """GET /api/agent/{job_id}/trace — return trace for an agent job."""
        user = self._require_auth()
        if user is None:
            return
        import agent as agent_mod
        with agent_mod._AGENT_LOCK:
            job = agent_mod._AGENT_JOBS.get(job_id)
        if not job:
            self._json_response({"error": "Job not found"}, status=404)
            return
        result = job.get("result") or {}
        trace = result.get("trace", [])
        self._json_response({"job_id": job_id, "status": job["status"], "trace": trace})

    def _agent_stream(self, job_id: str):
        """GET /api/agent/{job_id}/stream — SSE stream of agent reasoning events."""
        # Auth via ?token= query param (EventSource can't set headers)
        import urllib.parse
        parsed = urllib.parse.urlparse(self.path)
        qs = urllib.parse.parse_qs(parsed.query)
        token_param = qs.get("token", [None])[0]
        if token_param:
            payload = _jwt_decode(token_param)
            if not payload:
                self._json_response({"error": "Unauthorized"}, status=401)
                return
        else:
            user = self._require_auth()
            if user is None:
                return
        import agent as agent_mod

        # Wait briefly for event queue to be created (race with background thread)
        import time
        eq = None
        for _ in range(20):  # wait up to 2 seconds
            eq = agent_mod.get_event_queue(job_id)
            if eq is not None:
                break
            time.sleep(0.1)

        if eq is None:
            # Fallback: check if job exists but already completed
            with agent_mod._AGENT_LOCK:
                job = agent_mod._AGENT_JOBS.get(job_id)
            if not job:
                self._json_response({"error": "Job not found"}, status=404)
                return
            # Job exists but no queue (already done or v1 job)
            self._json_response({"error": "Stream not available for this job"}, status=404)
            return

        # Start SSE
        self.send_response(200)
        self._cors_headers()
        self.send_header("Content-Type", "text/event-stream; charset=utf-8")
        self.send_header("Cache-Control", "no-cache")
        self.send_header("X-Accel-Buffering", "no")
        self.end_headers()

        import queue
        try:
            while True:
                try:
                    event = eq.get(timeout=30)
                    if event is None:  # Sentinel: job done
                        self.wfile.write(b"data: {\"done\": true}\n\n")
                        self.wfile.flush()
                        break
                    data = json.dumps(event)
                    self.wfile.write(f"data: {data}\n\n".encode())
                    self.wfile.flush()
                except queue.Empty:
                    # Keepalive
                    self.wfile.write(b": keepalive\n\n")
                    self.wfile.flush()
        except (BrokenPipeError, ConnectionResetError, OSError):
            pass

    def _agent_approve(self, job_id: str):
        """POST /api/agent/{job_id}/approve — resolve an approval request."""
        user = self._require_auth()
        if user is None:
            return
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        approval_id = body.get("approval_id")
        decision = body.get("decision", "approve")
        category = body.get("category")
        import agent
        mw = agent.get_approval_middleware(job_id)
        if mw is None:
            self._json_response({"error": "Job not found or no active approval"}, status=404)
            return
        if decision == "approve_all_category" and category:
            mw.approve_all_category(category)
            self._json_response({"status": "ok", "action": "approved_all", "category": category})
        elif approval_id:
            mw.resolve(approval_id, decision)
            self._json_response({"status": "ok", "approval_id": approval_id, "decision": decision})
        else:
            self._json_response({"error": "approval_id or category required"}, status=400)

    def _agent_override(self, job_id: str):
        """POST /api/agent/{job_id}/override — override approval levels."""
        user = self._require_auth()
        if user is None:
            return
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        import agent
        mw = agent.get_approval_middleware(job_id)
        if mw is None:
            self._json_response({"error": "Job not found"}, status=404)
            return
        levels = body.get("approval_levels", {})
        for category, level in levels.items():
            mw.override_level(category, level)
        self._json_response({"status": "ok", "updated_levels": levels})

    # ── Health / Models ──────────────────────────────────────────────────

    def _jira_expand(self):
        """Fetch full stored content for a Jira/Confluence key and return it as plain text."""
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        key = (body.get("key") or "").strip().upper()
        if not key or not RAG_URL:
            self._json_response({"text": "", "key": key})
            return
        try:
            req = urllib.request.Request(f"{RAG_URL}/rag/lookup/{key}", method="GET")
            with urllib.request.urlopen(req, timeout=30) as resp:
                data = json.loads(resp.read())
            print(f"  [rag/lookup] jira-expand/{key}: {json.dumps(data, indent=2)}")
            chunks = data.get("chunks", [])
            if not chunks:
                self._json_response({"text": "", "key": key, "found": False})
                return
            full_text = "\n\n".join(
                c.get("payload", {}).get("chunk", "") for c in chunks
            )
            title = chunks[0].get("payload", {}).get("title", key)
            self._json_response({"text": full_text, "key": key, "title": title, "found": True})
        except Exception as e:
            print(f"  ⚠  jira-expand/{key} failed: {e}")
            self._json_response({"text": "", "key": key, "found": False})

    def _rag_context(self):
        """Return grouped RAG hits for a given text — used by the frontend to show related context chips."""
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        text = (body.get("text") or "").strip()
        if not text or not RAG_URL:
            self._json_response({"groups": {}, "rag_enabled": bool(RAG_URL)})
            return

        # ── Fast path: direct Jira key lookups (no embedding needed) ─────────
        jira_direct: dict = {}  # key → slim hit (score=1.0)
        for key in _JIRA_KEY_RE.findall(text.upper()):
            try:
                req = urllib.request.Request(f"{RAG_URL}/rag/lookup/{key}", method="GET")
                with urllib.request.urlopen(req, timeout=4) as resp:
                    data = json.loads(resp.read())
                print(f"  [rag/lookup] rag-context/{key}: {len(data.get('chunks', []))} chunks")
                for chunk in data.get("chunks", []):
                    p = chunk.get("payload", {})
                    k = p.get("key") or p.get("externalId") or key
                    if k not in jira_direct:
                        jira_direct[k] = {
                            "score":  1.0,
                            "source": "jira",
                            "key":    k,
                            "title":  p.get("title", key),
                            "url":    p.get("url", ""),
                        }
            except Exception as e:
                print(f"  ⚠  RAG lookup/{key} failed: {e}")

        # ── Slow path: cross-source grouped search ────────────────────────────
        raw_groups = _query_rag_groups(text)

        # ── Build slim hits per group, dedup by key within each group ─────────
        slim_groups: dict = {}

        # Process each group from the vector search
        for group_name, hits in raw_groups.items():
            seen_keys: dict = {}
            for h in hits:
                p = h.get("payload") or {}
                k = p.get("key") or p.get("externalId") or p.get("title") or ""
                score = h.get("rerank_score", h.get("score", 0))
                if k not in seen_keys or score > seen_keys[k]["score"]:
                    seen_keys[k] = {
                        "score":  round(score, 3),
                        "source": p.get("source", group_name),
                        "key":    k,
                        "title":  p.get("title", ""),
                        "url":    p.get("url", ""),
                    }
            if seen_keys:
                slim_groups[group_name] = list(seen_keys.values())

        # Merge direct Jira lookups into the jira group (deduplicated)
        if jira_direct:
            existing_jira_keys = {h["key"] for h in slim_groups.get("jira", [])}
            new_jira = [h for k, h in jira_direct.items() if k not in existing_jira_keys]
            if new_jira:
                slim_groups.setdefault("jira", [])
                slim_groups["jira"] = new_jira + slim_groups["jira"]

        self._json_response({"groups": slim_groups, "rag_enabled": True})

    def _rag_codebase(self):
        """
        Return RAG hits from java_codebase source only.
        Used by the frontend to inject real framework code examples into the
        code-generation system prompt so the LLM follows your actual conventions.
        """
        length = int(self.headers.get("Content-Length", 0))
        try:
            body = json.loads(self.rfile.read(length))
        except Exception:
            self._json_response({"error": "Invalid JSON"}, status=400)
            return
        text = (body.get("text") or "").strip()
        top_k = int(body.get("top_k", 5))
        if not text or not RAG_URL:
            self._json_response({"chunks": [], "rag_enabled": bool(RAG_URL)})
            return

        hits = _query_rag_codebase(text, top_k=top_k)
        chunks = []
        for h in hits:
            p = h.get("payload") or {}
            chunk_text = p.get("chunk", "")
            if not chunk_text:
                continue
            chunks.append({
                "file":      p.get("filePath", ""),
                "className": p.get("className", ""),
                "fileType":  p.get("fileType", ""),
                "code":      chunk_text,
                "score":     round(h.get("score", 0), 3),
            })
        print(f"  📦 RAG codebase: {len(chunks)} chunks returned for code generation")
        self._json_response({"chunks": chunks, "rag_enabled": True})

    def _xray_meta(self):
        """Proxy GET /xray/jira-meta — returns available link types and priorities."""
        if not RAG_URL:
            self._json_response({"error": "RAG server not configured"}, status=503)
            return
        try:
            req = urllib.request.Request(f"{RAG_URL}/xray/jira-meta", method="GET")
            with urllib.request.urlopen(req, timeout=15) as resp:
                self._json_response(json.loads(resp.read()))
        except Exception as e:
            self._json_response({"error": str(e)}, status=500)

    def _config_get(self):
        """Proxy GET /config to RAG server — returns current connector settings (masked)."""
        if not RAG_URL:
            self._json_response({"error": "RAG server not configured"}, status=503)
            return
        try:
            req = urllib.request.Request(f"{RAG_URL}/config", method="GET")
            with urllib.request.urlopen(req, timeout=10) as resp:
                self._json_response(json.loads(resp.read()))
        except Exception as e:
            self._json_response({"error": str(e)}, status=500)

    def _config_update(self):
        """Proxy POST /config to RAG server — persists connector settings to .env."""
        length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(length)
        if not RAG_URL:
            self._json_response({"error": "RAG server not configured"}, status=503)
            return
        try:
            req = urllib.request.Request(
                f"{RAG_URL}/config",
                data=body,
                headers={"Content-Type": "application/json"},
                method="POST",
            )
            with urllib.request.urlopen(req, timeout=10) as resp:
                data = json.loads(resp.read())
            print(f"  ⚙  Config saved: {data.get('updated', [])}")
            self._json_response(data)
        except urllib.error.HTTPError as e:
            detail = e.read().decode("utf-8", errors="replace")
            self._json_response({"error": f"Config error {e.code}", "detail": detail[:400]}, status=502)
        except Exception as e:
            self._json_response({"error": str(e)}, status=500)

    def _proxy_rag(self, api_path: str, method: str = None):
        """
        Generic proxy: forwards any /api/* request to the RAG server,
        translating /api/sources/... → /sources/...
        """
        if not RAG_URL:
            self._json_response({"error": "RAG server not configured"}, status=503)
            return
        rag_path = api_path.removeprefix("/api")
        # Preserve query string (e.g. ?enabled=true for PATCH)
        qs = ""
        if "?" in rag_path:
            rag_path, qs = rag_path.split("?", 1)
            qs = "?" + qs
        target_url = f"{RAG_URL}{rag_path}{qs}"
        http_method = method or self.command  # use explicit override or HTTP verb
        length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(length) if length else None
        try:
            req = urllib.request.Request(
                target_url,
                data=body,
                headers={"Content-Type": "application/json"} if body else {},
                method=http_method,
            )
            with urllib.request.urlopen(req, timeout=15) as resp:
                self._json_response(json.loads(resp.read()))
        except urllib.error.HTTPError as e:
            detail = e.read().decode("utf-8", errors="replace")
            self._json_response({"error": f"RAG {e.code}", "detail": detail[:400]}, status=e.code)
        except (urllib.error.URLError, OSError) as e:
            self._json_response({"error": f"RAG server unreachable ({RAG_URL}) — is it running? Detail: {e.reason if hasattr(e, 'reason') else e}"}, status=503)
        except Exception as e:
            self._json_response({"error": str(e)}, status=500)

    def _proxy_review(self, path: str, method: str = "GET", body: bytes | None = None):
        """Proxy a request to the review_server running on REVIEW_SERVER_URL."""
        url = f"{REVIEW_SERVER_URL}{path}"
        try:
            req = urllib.request.Request(
                url, data=body,
                headers={"Content-Type": "application/json"} if body else {},
                method=method,
            )
            with urllib.request.urlopen(req, timeout=10) as resp:
                self._json_response(json.loads(resp.read()))
        except urllib.error.HTTPError as e:
            detail = e.read().decode("utf-8", errors="replace")
            self._json_response({"error": f"review_server {e.code}", "detail": detail[:400]}, status=e.code)
        except (urllib.error.URLError, OSError) as e:
            self._json_response({"error": f"review_server unreachable ({REVIEW_SERVER_URL}): {e}"}, status=502)
        except Exception as e:
            self._json_response({"error": str(e)}, status=500)

    def _ft_train_stream(self):
        """SSE endpoint streaming the fine-tuning log. Accepts ?token= for EventSource auth."""
        qs = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(qs)
        token_param = params.get("token", [None])[0]
        if token_param:
            payload = _jwt_decode(token_param)
            if not payload:
                self._json_response({"error": "Unauthorized"}, status=401)
                return
            with _USERS_LOCK:
                users = _load_users()
            user = next((u for u in users if u["id"] == payload.get("sub")), None)
            if not user or user.get("role") != "admin":
                self._json_response({"error": "Forbidden"}, status=403)
                return
        else:
            if self._require_auth(required_roles=["admin"], permission="fine_tuning.use", project_id=self._current_project_id()) is None:
                return

        self.send_response(200)
        self._cors_headers()
        self.send_header("Content-Type", "text/event-stream; charset=utf-8")
        self.send_header("Cache-Control", "no-cache")
        self.send_header("X-Accel-Buffering", "no")
        self.end_headers()

        sent = 0
        try:
            while True:
                with _ft["lock"]:
                    lines = list(_ft["log"])
                    status = _ft["status"]
                if sent < len(lines):
                    for line in lines[sent:]:
                        msg = json.dumps({"line": line})
                        self.wfile.write(f"data: {msg}\n\n".encode())
                    sent = len(lines)
                    self.wfile.flush()
                if status not in ("assembling", "training"):
                    done_msg = json.dumps({"done": True, "status": status})
                    self.wfile.write(f"data: {done_msg}\n\n".encode())
                    self.wfile.flush()
                    break
                self.wfile.write(b": keepalive\n\n")
                self.wfile.flush()
                time.sleep(1)
        except (BrokenPipeError, ConnectionResetError, OSError):
            pass

    def _xray_push(self):
        """
        Proxy to RAG server POST /xray/create-tests.
        Creates Xray Test issues in Jira and links them to the given user story.
        Body: {tests:[{title,type,priority,preconditions,steps[],expectedResult}], story_key:"PROJ-123"}
        """
        length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(length)
        if not RAG_URL:
            self._json_response({"error": "RAG server not configured (set RAG_URL)"}, status=503)
            return
        # ── Debug: log received payload ───────────────────────────────────
        try:
            _debug = json.loads(body)
            print(f"  🔗 Xray push received: story={_debug.get('story_key')} tests={len(_debug.get('tests', []))}")
            if _debug.get('tests'):
                print(f"     first test title: {_debug['tests'][0].get('title','?')}")
            else:
                print(f"  ⚠  Xray push: tests array is EMPTY — nothing to create")
        except Exception:
            pass
        try:
            req = urllib.request.Request(
                f"{RAG_URL}/xray/create-tests",
                data=body,
                headers={"Content-Type": "application/json"},
                method="POST",
            )
            with urllib.request.urlopen(req, timeout=60) as resp:
                data = json.loads(resp.read())
            n = len(data.get("created", []))
            print(f"  🔗 Xray: {n} test(s) created → {data.get('story_key', '?')}")
            self._json_response(data)
            # Record positive feedback for RAG chunks used in this generation
            try:
                req_body = json.loads(body)
                chunk_ids = req_body.get("chunk_ids", [])
                story_key = req_body.get("story_key", "")
                if chunk_ids and story_key:
                    _db_insert_feedback(story_key, chunk_ids)
                    print(f"  📊 Feedback: {len(chunk_ids)} chunks recorded for {story_key}")
            except Exception:
                pass  # Feedback recording should never break the push
        except urllib.error.HTTPError as e:
            detail = e.read().decode("utf-8", errors="replace")
            print(f"  ✗  Xray push failed: {e.code} {detail[:200]}")
            self._json_response({"error": f"Xray error {e.code}", "detail": detail[:500]}, status=502)
        except Exception as e:
            print(f"  ✗  Xray push error: {e}")
            self._json_response({"error": str(e)}, status=500)

    def _health_check(self):
        ollama_ok, models, ollama_error = False, [], ""
        try:
            req = urllib.request.urlopen(f"{OLLAMA_URL}/api/tags", timeout=5)
            data = json.loads(req.read())
            models = [m["name"] for m in data.get("models", [])]
            ollama_ok = True
        except Exception as e:
            ollama_error = str(e)

        provider_status = {
            "ollama":  {"ok": ollama_ok, **({"error": ollama_error} if not ollama_ok else {})},
            "claude":  {"ok": bool(os.environ.get("ANTHROPIC_API_KEY"))},
            "openai":  {"ok": bool(os.environ.get("OPENAI_API_KEY"))},
            "openai-compat": {
                "ok": bool(os.environ.get("OPENAI_COMPAT_BASE_URL") and
                           os.environ.get("OPENAI_COMPAT_API_KEY")),
                **({"base_url": os.environ.get("OPENAI_COMPAT_BASE_URL")}
                   if os.environ.get("OPENAI_COMPAT_BASE_URL") else {}),
            },
        }
        self._json_response({
            "status": "ok",
            "active_provider": get_default_provider(),
            "ollama": ollama_ok,           # backward compat
            "models": models,              # backward compat
            "default_model": DEFAULT_MODEL,
            "providers": provider_status,
        })

    def _list_providers(self):
        self._json_response(get_providers_info())

    def _list_models(self):
        try:
            req = urllib.request.urlopen(f"{OLLAMA_URL}/api/tags", timeout=5)
            data = json.loads(req.read())
            models = [m["name"] for m in data.get("models", [])]
            self._json_response({"models": models, "default": DEFAULT_MODEL})
        except Exception as e:
            self._json_response({"models": [], "default": DEFAULT_MODEL, "error": str(e)})

    # ── Main LLM proxy ───────────────────────────────────────────────────

    def _proxy_ollama(self):
        length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(length)
        try:
            payload = json.loads(body)
        except Exception:
            self._json_response({"error": "Invalid JSON body"}, status=400)
            return

        provider   = payload.get("provider") or get_default_provider()
        model      = payload.get("model") or DEFAULT_MODEL
        max_tokens = min(int(payload.get("max_tokens", 2000)), MAX_TOKENS_CAP)

        t0 = time.time()
        print(f"  ⚙  provider={provider}  model={model}  max_tokens={max_tokens}")

        if "messages" in payload:
            messages = list(payload["messages"])
            user_text = next(
                (m.get("content", "") for m in reversed(messages) if m.get("role") == "user"), "")
        else:
            user_text = payload.get("prompt", "")
            messages = [{"role": "user", "content": user_text}]

        system = _enrich_system(payload.get("system", ""), user_text)

        try:
            response_text = llm_chat(
                messages, system, model, provider,
                temperature=float(payload.get("temperature", 0.3)),
                max_tokens=max_tokens,
                format_json=True,
            )
            # Multi-pass self-review
            response_text = generation.parse_and_review(response_text, user_text, provider, model)
            elapsed = round(time.time() - t0, 1)
            print(f"  ✓  done in {elapsed}s  ~{len(response_text.split())} words")
            self._json_response({"content": response_text, "done": True})
        except LLMError as e:
            self._json_response(
                {"error": "Provider error", "provider": provider, "detail": str(e)},
                status=e.status_code)
        except Exception as e:
            print(f"  ✗  Error: {e}")
            _log.error(str(e), path=self.path)
            self._json_response({"error": str(e)}, status=500)

    # ── /api/json — non-streaming structured JSON call ────────────────────
    # Uses llm_json() internally so each provider returns a clean parsed dict:
    #   Ollama  → format:"json" grammar constraint
    #   Claude  → tool_use (body arrives as a dict, never needs parsing)
    #   OpenAI  → response_format json_schema / json_object
    # No repairJSON, no markdown-stripping needed.

    def _proxy_json(self):
        user_key = f"gen:{self._req_user.get('id', 'anon') if self._req_user else 'anon'}"
        if not rate_limiter.check_rate_limit(user_key, rate_limiter.RATE_LIMIT_GENERATE, 60):
            retry = rate_limiter.get_retry_after(user_key, 60)
            self._json_response({"error": "Rate limit exceeded", "retry_after": retry}, status=429)
            return
        length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(length)
        try:
            payload = json.loads(body)
        except Exception:
            self._json_response({"error": "Invalid JSON body"}, status=400)
            return

        provider   = payload.get("provider") or get_default_provider()
        model      = payload.get("model") or DEFAULT_MODEL
        max_tokens = min(int(payload.get("max_tokens", 6000)), MAX_TOKENS_CAP)

        messages = list(payload.get("messages", []))
        if not messages:
            user_text = payload.get("prompt", "")
            messages  = [{"role": "user", "content": user_text}]
        else:
            user_text = next(
                (m.get("content", "") for m in reversed(messages) if m.get("role") == "user"), "")

        module      = payload.get("module")   # "code" | "perf" | None
        # For code/perf modules, skip RAG/few-shot enrichment — the frontend
        # already injects codebase examples and the Jira story is in messages.
        # RAG context (Jira stories in CAS format) confuses the LLM into
        # generating test cases instead of code.
        if module in ("code", "perf"):
            system = payload.get("system", "")
        else:
            system = _enrich_system(payload.get("system", ""), user_text)
        temperature = float(payload.get("temperature", 0.1))
        schema      = _get_module_schema(module)

        # CoT and self-review toggles from frontend
        if "chain_of_thought" in payload:
            os.environ["QA_CHAIN_OF_THOUGHT"] = "1" if payload["chain_of_thought"] else "0"
        if "self_review" in payload:
            os.environ["QA_SELF_REVIEW"] = "1" if payload["self_review"] else "0"

        t0 = time.time()
        print(f"  ⚙  [json] provider={provider}  model={model}  module={module}  schema={'YES' if schema else 'NO'}  max_tokens={max_tokens}  cot={os.environ.get('QA_CHAIN_OF_THOUGHT','1')}  review={os.environ.get('QA_SELF_REVIEW','1')}")
        try:
            # Generation pipeline (chain-of-thought, coverage, templates, scoring)
            gen_result = generation.generate(
                messages=messages, system=system, model=model, provider=provider,
                module=module, schema=schema, user_text=user_text,
                framework=payload.get("framework", ""),
                temperature=temperature, max_tokens=max_tokens,
            )
            elapsed = round(time.time() - t0, 1)
            print(f"  ✓  [json/pipeline] done in {elapsed}s")
            response = {"result": gen_result["result"]}
            if "confidence" in gen_result:
                response["confidence"] = gen_result["confidence"]
            self._json_response(response)
            # Fire notification if configured
            story_key = _JIRA_KEY_RE.search(user_text)
            test_count = len(gen_result["result"]) if isinstance(gen_result["result"], list) else 1
            notifications.notify_generation_complete(
                story_key=story_key.group(1) if story_key else "unknown",
                test_count=test_count,
                username=self._req_user.get("username", "") if self._req_user else "",
            )
        except (ValueError, json.JSONDecodeError) as e:
            raw_preview = locals().get('raw_text', '')[:200]
            print(f"  ✗  [json] parse error: {e!r}  raw[:200]={raw_preview!r}")
            self._json_response({"error": f"JSON parse error: {e}"}, status=422)
        except LLMError as e:
            self._json_response(
                {"error": "Provider error", "provider": provider, "detail": str(e)},
                status=e.status_code)
        except Exception as e:
            print(f"  ✗  [json] error: {e}")
            _log.error(str(e), path=self.path)
            self._json_response({"error": str(e)}, status=500)

    # ── /api/stream — SSE endpoint: sends tokens to browser as they arrive ─
    # Browser reads with ReadableStream, renders sections incrementally.
    # Each SSE event: data: {"t":"<token>"}   Final: data: {"done":true}

    def _proxy_ollama_stream(self):
        length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(length)
        try:
            payload = json.loads(body)
        except Exception:
            self.send_error(400)
            return

        provider      = payload.get("provider") or get_default_provider()
        model         = payload.get("model") or DEFAULT_MODEL
        max_tokens    = min(int(payload.get("max_tokens", 2000)), MAX_TOKENS_CAP)
        use_json_fmt  = payload.get("format_json", True)

        if "messages" in payload:
            messages = list(payload["messages"])
            user_text = next(
                (m.get("content", "") for m in reversed(messages) if m.get("role") == "user"), "")
        else:
            user_text = payload.get("prompt", "")
            messages = [{"role": "user", "content": user_text}]

        system = _enrich_system(payload.get("system", ""), user_text)

        _auth   = (self.headers or {}).get("Authorization", "")
        _bearer = _auth[len("Bearer "):] if _auth.startswith("Bearer ") else ""

        def _send(line: str) -> None:
            self.wfile.write((line + "\n\n").encode("utf-8"))
            self.wfile.flush()

        self.send_response(200)
        self._cors_headers()
        self.send_header("Content-Type", "text/event-stream; charset=utf-8")
        self.send_header("Cache-Control", "no-cache")
        self.send_header("X-Accel-Buffering", "no")
        self.end_headers()
        monitoring.inc_connections()
        _accumulated: list[str] = []

        try:
            for token in llm_stream(
                messages, system, model, provider,
                temperature=float(payload.get("temperature", 0.3)),
                max_tokens=max_tokens,
                format_json=use_json_fmt,
            ):
                _accumulated.append(token)
                _send("data: " + json.dumps({"t": token}))
            _send('data: {"done":true}')
            if _bearer:
                report_usage(_bearer, len("".join(_accumulated)) // 4)
        except LLMError as e:
            try:
                _send("data: " + json.dumps({"error": str(e), "provider": provider}))
            except Exception:
                pass
        except BrokenPipeError:
            pass
        except Exception as e:
            print(f"  ✗  Stream error: {e}")
            try:
                _send("data: " + json.dumps({"error": str(e)}))
            except Exception:
                pass
        finally:
            monitoring.dec_connections()

    # ── HTTP helpers ─────────────────────────────────────────────────────

    def _read_json_body(self) -> dict:
        """Read and parse the JSON request body; returns {} on error."""
        length = int(self.headers.get("Content-Length", 0))
        try:
            return json.loads(self.rfile.read(length)) if length else {}
        except Exception:
            return {}

    def _json_response(self, data, status=200):
        body = json.dumps(data, ensure_ascii=False).encode("utf-8")
        try:
            self.send_response(status)
            self._cors_headers()
            self.send_header("Content-Type", "application/json; charset=utf-8")
            self.send_header("Content-Length", str(len(body)))
            self.end_headers()
            self.wfile.write(body)
        except BrokenPipeError:
            pass  # client disconnected — safe to ignore


# Bind the module-level function to the class so self._check_cp_jwt() works
QACopilotHandler._check_cp_jwt = _check_cp_jwt  # type: ignore[attr-defined]


# ── Browser auto-open ─────────────────────────────────────────────────────

def open_browser():
    import webbrowser
    time.sleep(1.2)
    webbrowser.open(f"http://localhost:{PORT}/qa-copilot-local.html")


# ── Entry point ───────────────────────────────────────────────────────────


def _bootstrap_authz() -> None:
    """Run authz schema migrations and initialize the authz/identity packages.

    Called once at process start. Idempotent — running migrations whose
    version is already stored is a no-op.
    """
    import authz
    import authz.migrations as _authz_migrations
    import identity

    _authz_migrations.run_pending(USERDATA_DB)
    authz.set_db_path(USERDATA_DB)
    identity.set_db_path(USERDATA_DB)


def main():
    _init_coverage_db()
    _init_userdata_db()
    _bootstrap_admin()
    monitoring.init_metrics_db()
    monitoring.start_background_threads(OLLAMA_URL, RAG_URL)
    fine_tuning.init_db()
    fine_tuning.mark_orphaned_jobs()
    onboarding._init_onboarding_table()
    test_runs.init_db()
    workspace.init_db()
    _bootstrap_authz()
    perf_results.init_db()
    orphaned = fine_tuning.check_orphaned_cloud()
    if orphaned:
        print(f"  ⚠  Fine-tuning: {len(orphaned)} orphaned cloud instance(s) detected")
    rag_status = f"{RAG_URL} (top_k={RAG_TOP_K})" if RAG_URL else "disabled (set RAG_URL to enable)"
    _log.info("Server started", port=PORT, model=DEFAULT_MODEL, ollama=OLLAMA_URL)
    print(f"\n  Server        : http://localhost:{PORT}")
    print(f"  Ollama        : {OLLAMA_URL}")
    print(f"  Default model : {DEFAULT_MODEL}")
    print(f"  Max tokens    : {MAX_TOKENS_CAP}  (override: QA_MAX_TOKENS=N)")
    print(f"  RAG server    : {rag_status}")
    print(f"\n  Pages:")
    print(f"    http://localhost:{PORT}/qa-copilot-local.html")
    print(f"    http://localhost:{PORT}/qa-copilot-v2.html")
    print(f"    http://localhost:{PORT}/qa-intelligence-platform.html")
    print(f"\n  Press Ctrl+C to stop\n")

    threading.Thread(target=open_browser, daemon=True).start()

    class _Server(socketserver.ThreadingMixIn, socketserver.TCPServer):
        allow_reuse_address = True
        daemon_threads = True   # threads die when main process exits

        def get_request(self):
            conn, addr = super().get_request()
            # Disable Nagle — sends each SSE token immediately instead of
            # waiting for a full 8 KB buffer before flushing to the browser.
            conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
            return conn, addr

        def handle_error(self, request, client_address):
            # Silently drop BrokenPipe / ConnectionReset — browser closed the
            # connection before the response finished (health-check polling,
            # Chrome DevTools probes, tab reloads, etc.).  Everything else
            # gets the default traceback.
            # Use traceback string rather than sys.exc_info() because Python
            # 3.14 chained exceptions can obscure the active exception type.
            import traceback
            tb = traceback.format_exc()
            if "BrokenPipeError" in tb or "ConnectionResetError" in tb:
                return
            super().handle_error(request, client_address)

    with _Server(("", PORT), QACopilotHandler) as httpd:
        try:
            httpd.serve_forever()
        except KeyboardInterrupt:
            print("\n\n  Server stopped.\n")


if __name__ == "__main__":
    _validate_secrets()
    _validate_admin_password()
    _validate_license()
    main()
