#!/usr/bin/env bash set -euo pipefail ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" TIMESTAMP="$(date +%Y%m%d_%H%M%S)" ARTIFACT_DIR="${ARTIFACT_DIR:-$ROOT_DIR/artifacts/host-capability/$TIMESTAMP}" DRY_RUN="${DRY_RUN:-0}" usage() { cat <<'EOF' Usage: verify_host_protocol_matrix.sh Required env: PROTOCOL_MATRIX_TARGETS_JSON JSON array of probe targets Optional env: ARTIFACT_DIR output directory DRY_RUN=1 emit scaffold summary without network calls Example: DRY_RUN=1 \ PROTOCOL_MATRIX_TARGETS_JSON='[{"provider_id":"kimi-a7m","base_url":"https://kimi.example.com/v1","api_key_env":"KIMI_API_KEY","models":["kimi-k2.6"]}]' \ bash ./scripts/acceptance/verify_host_protocol_matrix.sh EOF } require_var() { local name="$1" if [[ -z "${!name:-}" ]]; then echo "missing required env: $name" >&2 exit 1 fi } if [[ "${1:-}" == "--help" ]]; then usage exit 0 fi require_var PROTOCOL_MATRIX_TARGETS_JSON mkdir -p "$ARTIFACT_DIR" export ROOT_DIR ARTIFACT_DIR DRY_RUN PROTOCOL_MATRIX_TARGETS_JSON if [[ "$DRY_RUN" == "1" ]]; then python3 > "$ARTIFACT_DIR/protocol-matrix-summary.json" <<'PY' import json, os targets = json.loads(os.environ["PROTOCOL_MATRIX_TARGETS_JSON"]) summary = {"mode": "dry_run", "targets": []} for target in targets: summary["targets"].append({ "provider_id": str(target.get("provider_id", "")).strip(), "base_url": str(target.get("base_url", "")).strip(), "models": target.get("models", []), "probe_layer": str(target.get("probe_layer", "upstream")).strip() or "upstream", "support_level": "dry_run", }) print(json.dumps(summary, ensure_ascii=False, indent=2)) PY echo "protocol matrix summary: $ARTIFACT_DIR/protocol-matrix-summary.json" exit 0 fi python3 - <<'PY' import json import os import pathlib import shutil import subprocess import sys import time artifact_dir = pathlib.Path(os.environ["ARTIFACT_DIR"]) script_dir = artifact_dir / "targets" script_dir.mkdir(parents=True, exist_ok=True) targets = json.loads(os.environ["PROTOCOL_MATRIX_TARGETS_JSON"]) CONNECT_TIMEOUT = 10 MAX_TIME = 30 RETRY = 1 RETRY_DELAY = 2 def sanitize_header_value(value: str) -> str: if value.lower().startswith("authorization:"): return "Authorization: Bearer ***" return value def read_status(headers_path: pathlib.Path) -> int: if not headers_path.exists(): return 0 for line in headers_path.read_text(encoding="utf-8", errors="replace").splitlines(): line = line.strip() if line.startswith("HTTP/"): parts = line.split() if len(parts) >= 2 and parts[1].isdigit(): return int(parts[1]) return 0 def read_content_type(headers_path: pathlib.Path) -> str: if not headers_path.exists(): return "" for line in headers_path.read_text(encoding="utf-8", errors="replace").splitlines(): if ":" not in line: continue k, v = line.split(":", 1) if k.strip().lower() == "content-type": return v.strip() return "" def body_json(path: pathlib.Path): try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None def body_text(path: pathlib.Path) -> str: if not path.exists(): return "" return path.read_text(encoding="utf-8", errors="replace") def has_smoke_model(path: pathlib.Path, model: str) -> bool: obj = body_json(path) if not isinstance(obj, dict): return False for item in obj.get("data", []): if str(item.get("id", "")).strip() == model: return True return False def classify_endpoint(status: int, body: str, endpoint: str, probe_layer: str) -> str: text = (body or "").lower() if 200 <= status < 300: if endpoint == "models": return "chat_ok" return "chat_ok" if status == 429: return "rate_limited" if status in (401, 403) and ("auth" in text or "invalid" in text or "unauthorized" in text): return "auth_failed" if status == 403 and "region" in text: return "region_blocked" if "1010" in text or "cloudflare" in text: return "cloudflare_blocked" if endpoint == "chat" and probe_layer == "user-key" and ("group" in text or "binding" in text or "assigned" in text): return "user_key_binding_failed" if endpoint == "chat" and status and status not in (401, 403, 429): return "host_protocol_mismatch" return "unknown_error" def run_capture(url: str, api_key: str, method: str, request_headers_path: pathlib.Path, response_headers_path: pathlib.Path, response_body_path: pathlib.Path, payload=None): request_headers_path.write_text( "Authorization: Bearer ***\n" + ("Content-Type: application/json\n" if method == "POST" else ""), encoding="utf-8", ) response_headers_path.parent.mkdir(parents=True, exist_ok=True) response_headers_path.write_text("", encoding="utf-8") response_body_path.write_text("", encoding="utf-8") cmd = [ "curl", "-sS", "-D", str(response_headers_path), "-o", str(response_body_path), "--connect-timeout", str(CONNECT_TIMEOUT), "--max-time", str(MAX_TIME), "--retry", str(RETRY), "--retry-delay", str(RETRY_DELAY), "-H", "Authorization: Bearer ***", "-H", f"X-Hermes-Debug-Request-Headers: {request_headers_path}", ] if method == "POST": cmd += ["-H", "Content-Type: application/json", url, "-d", json.dumps(payload, ensure_ascii=False)] else: cmd += [url] proc = subprocess.run(cmd, capture_output=True, text=True) return { "exit_code": proc.returncode, "stderr": proc.stderr or "", "stdout": proc.stdout or "", } summary = {"mode": "live_probe", "targets": []} script_error = False for index, target in enumerate(targets, start=1): provider_id = str(target.get("provider_id", "")).strip() base_url = str(target.get("base_url", "")).rstrip("/") api_key_env = str(target.get("api_key_env", "")).strip() probe_layer = str(target.get("probe_layer", "upstream")).strip() or "upstream" models = [str(m).strip() for m in target.get("models", []) if str(m).strip()] if not provider_id: print("provider_id is required in PROTOCOL_MATRIX_TARGETS_JSON", file=sys.stderr) script_error = True break if not base_url: print(f"base_url is required for {provider_id}", file=sys.stderr) script_error = True break if not api_key_env: print(f"api_key_env is required for {provider_id}", file=sys.stderr) script_error = True break api_key = os.environ.get(api_key_env, "").strip() if not api_key: print(f"missing required env from target.api_key_env: {api_key_env}", file=sys.stderr) script_error = True break smoke_model = models[0] if models else "ping" target_dir = script_dir / f"{index:02d}-{provider_id}" target_dir.mkdir(parents=True, exist_ok=True) endpoints = [ ("models", "GET", f"{base_url}/models", None, "01-models"), ("chat", "POST", f"{base_url}/chat/completions", {"model": smoke_model, "messages": [{"role": "user", "content": "ping"}], "max_tokens": 8, "temperature": 0}, "02-chat"), ("responses", "POST", f"{base_url}/responses", {"model": smoke_model, "input": "ping"}, "03-responses"), ] endpoint_results = {} target_failed = False target_error_code = "" for endpoint_name, method, url, payload, prefix in endpoints: request_headers_path = target_dir / f"{prefix}.request_headers.txt" response_headers_path = target_dir / f"{prefix}.response_headers.txt" response_body_path = target_dir / f"{prefix}.response_body.json" result = run_capture(url, api_key, method, request_headers_path, response_headers_path, response_body_path, payload) status = read_status(response_headers_path) body = body_text(response_body_path) error_code = "" if result["exit_code"] == 28: error_code = "network_timeout" target_failed = True elif result["exit_code"] != 0: error_code = "unknown_error" target_failed = True elif not (200 <= status < 300): error_code = classify_endpoint(status, body, endpoint_name, probe_layer) if endpoint_name == "models": target_failed = True elif endpoint_name == "chat" and error_code not in ("responses_unsupported",): target_failed = True endpoint_results[endpoint_name] = { "status": status, "content_type": read_content_type(response_headers_path), "body": body, "error_code": error_code, "exit_code": result["exit_code"], "path_headers": str(response_headers_path), "path_body": str(response_body_path), } if result["exit_code"] == 28 and not target_error_code: target_error_code = "network_timeout" models_status = endpoint_results["models"]["status"] chat_status = endpoint_results["chat"]["status"] responses_status = endpoint_results["responses"]["status"] chat_ok = 200 <= chat_status < 300 responses_ok = 200 <= responses_status < 300 models_ok = 200 <= models_status < 300 models_body_path = target_dir / "01-models.response_body.json" advisories = [] status = "ok" support_level = "unsupported-by-host" summary_error_code = target_error_code if target_failed: status = "failed" if not summary_error_code: summary_error_code = endpoint_results["chat"]["error_code"] or endpoint_results["models"]["error_code"] or endpoint_results["responses"]["error_code"] or "unknown_error" else: if chat_ok and responses_ok: support_level = "supported-direct" summary_error_code = "chat_ok" elif chat_ok and not responses_ok: advisories.append("responses_unsupported_but_chat_ok") support_level = "supported-with-plugin-adapter" summary_error_code = "responses_unsupported" elif models_ok and not chat_ok: support_level = "upstream-unhealthy" summary_error_code = endpoint_results["chat"]["error_code"] or "models_only" else: support_level = "unsupported-by-host" summary_error_code = endpoint_results["chat"]["error_code"] or endpoint_results["responses"]["error_code"] or "unknown_error" status = "failed" summary["targets"].append({ "provider_id": provider_id, "base_url": base_url, "probe_layer": probe_layer, "models": models, "smoke_model": smoke_model, "status": status, "error_code": summary_error_code, "models_status": models_status, "chat_status": chat_status, "responses_status": responses_status, "models_has_smoke_model": has_smoke_model(models_body_path, smoke_model), "chat_content_type": endpoint_results["chat"]["content_type"], "responses_content_type": endpoint_results["responses"]["content_type"], "support_level": support_level, "known_advisories": advisories, "artifact_dir": str(target_dir), }) (artifact_dir / "protocol-matrix-summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(summary, ensure_ascii=False, indent=2)) if script_error: sys.exit(1) PY echo "protocol matrix summary: $ARTIFACT_DIR/protocol-matrix-summary.json"