package main import ( "bytes" "context" "database/sql" "encoding/json" "fmt" "io" "log" "net/http" "os" "time" _ "github.com/lib/pq" ) func main() { supplyURL := os.Getenv("SUPPLY_URL") if supplyURL == "" { supplyURL = "http://127.0.0.1:8081" } consumer := os.Getenv("CONSUMER") if consumer == "" { consumer = "sub2api-bridge" } dbConn := os.Getenv("SUB2API_DB") if dbConn == "" { dbConn = "postgres://sub2api:***@localhost:5432/sub2api?sslmode=disable" } db, err := sql.Open("postgres", dbConn) if err != nil { log.Fatalf("open db: %v", err) } defer db.Close() if err := db.Ping(); err != nil { log.Fatalf("ping db: %v", err) } log.Println("connected to sub2api db") if err := ensureBridgeTable(db); err != nil { log.Fatalf("ensure table: %v", err) } cursor := "" for { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) events, nextCursor, err := fetchPackageChanges(ctx, supplyURL, cursor) cancel() if err != nil { log.Printf("fetch error: %v", err) time.Sleep(10 * time.Second) continue } for _, evt := range events { if evt.GatewaySyncStatus != "pending" { log.Printf("skip non-pending event: %s status=%s", evt.EventID, evt.GatewaySyncStatus) continue } log.Printf("bridge event: %s package=%d model=%s", evt.EventID, evt.PackageID, evt.Model) if err := bridgeToSub2API(db, evt); err != nil { log.Printf("bridge error: %v", err) continue } ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second) ackErr := ackPackageChange(ctx2, supplyURL, evt.EventID, consumer, "applied", "synced to sub2api") cancel2() if ackErr != nil { log.Printf("ack error for %s: %v", evt.EventID, ackErr) continue } log.Printf("acked event: %s", evt.EventID) } if nextCursor == "" { log.Println("no more events, sleeping 10s") time.Sleep(10 * time.Second) } else { cursor = nextCursor } } } type PackageChangeEvent struct { EventID string `json:"event_id"` AccountID int64 `json:"account_id"` EventType string `json:"event_type"` PackageID int64 `json:"package_id"` Platform string `json:"platform"` Model string `json:"model"` OccurredAt string `json:"occurred_at"` Version int `json:"version"` GatewaySyncStatus string `json:"gateway_sync_status"` RetryCount int `json:"retry_count"` NextRetryAt string `json:"next_retry_at,omitempty"` LastFailureCategory string `json:"last_failure_category,omitempty"` } func fetchPackageChanges(ctx context.Context, baseURL, cursor string) ([]PackageChangeEvent, string, error) { url := fmt.Sprintf("%s/internal/supply-intelligence/gateway/package-changes", baseURL) if cursor != "" { url += "?cursor=" + cursor } req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, "", err } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, "", err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, "", err } if resp.StatusCode != http.StatusOK { return nil, "", fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) } var result struct { Items []PackageChangeEvent `json:"items"` NextCursor string `json:"next_cursor"` } if err := json.Unmarshal(body, &result); err != nil { return nil, "", err } return result.Items, result.NextCursor, nil } func ackPackageChange(ctx context.Context, baseURL, eventID, consumer, result, detail string) error { url := fmt.Sprintf("%s/internal/supply-intelligence/gateway/package-changes/%s/ack", baseURL, eventID) payload := map[string]string{ "consumer": consumer, "result": result, "detail": detail, } body, _ := json.Marshal(payload) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusNoContent { respBody, _ := io.ReadAll(resp.Body) return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(respBody)) } return nil } func ensureBridgeTable(db *sql.DB) error { _, err := db.Exec(`CREATE TABLE IF NOT EXISTS supply_bridge_log ( id SERIAL PRIMARY KEY, event_id TEXT NOT NULL UNIQUE, package_id BIGINT, platform TEXT, model TEXT, status TEXT, result TEXT, detail TEXT, created_at TIMESTAMPTZ DEFAULT NOW() )`) return err } func bridgeToSub2API(db *sql.DB, evt PackageChangeEvent) error { _, err := db.Exec( `INSERT INTO supply_bridge_log (event_id, package_id, platform, model, status, result, detail) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (event_id) DO UPDATE SET status = EXCLUDED.status, result = EXCLUDED.result, detail = EXCLUDED.detail, created_at = NOW()`, evt.EventID, evt.PackageID, evt.Platform, evt.Model, evt.GatewaySyncStatus, "applied", "synced to sub2api", ) return err }