Files
supply-intelligence/cmd/sub2api-bridge/main.go
2026-05-12 18:49:52 +08:00

182 lines
5.0 KiB
Go

package main
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"time"
_ "github.com/lib/pq"
)
func main() {
supplyURL := os.Getenv("SUPPLY_URL")
if supplyURL == "" {
supplyURL = "http://127.0.0.1:8081"
}
consumer := os.Getenv("CONSUMER")
if consumer == "" {
consumer = "sub2api-bridge"
}
dbConn := os.Getenv("SUB2API_DB")
if dbConn == "" {
dbConn = "postgres://sub2api:***@localhost:5432/sub2api?sslmode=disable"
}
db, err := sql.Open("postgres", dbConn)
if err != nil {
log.Fatalf("open db: %v", err)
}
defer db.Close()
if err := db.Ping(); err != nil {
log.Fatalf("ping db: %v", err)
}
log.Println("connected to sub2api db")
if err := ensureBridgeTable(db); err != nil {
log.Fatalf("ensure table: %v", err)
}
cursor := ""
for {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
events, nextCursor, err := fetchPackageChanges(ctx, supplyURL, cursor)
cancel()
if err != nil {
log.Printf("fetch error: %v", err)
time.Sleep(10 * time.Second)
continue
}
for _, evt := range events {
if evt.GatewaySyncStatus != "pending" {
log.Printf("skip non-pending event: %s status=%s", evt.EventID, evt.GatewaySyncStatus)
continue
}
log.Printf("bridge event: %s package=%d model=%s", evt.EventID, evt.PackageID, evt.Model)
if err := bridgeToSub2API(db, evt); err != nil {
log.Printf("bridge error: %v", err)
continue
}
ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
ackErr := ackPackageChange(ctx2, supplyURL, evt.EventID, consumer, "applied", "synced to sub2api")
cancel2()
if ackErr != nil {
log.Printf("ack error for %s: %v", evt.EventID, ackErr)
continue
}
log.Printf("acked event: %s", evt.EventID)
}
if nextCursor == "" {
log.Println("no more events, sleeping 10s")
time.Sleep(10 * time.Second)
} else {
cursor = nextCursor
}
}
}
type PackageChangeEvent struct {
EventID string `json:"event_id"`
AccountID int64 `json:"account_id"`
EventType string `json:"event_type"`
PackageID int64 `json:"package_id"`
Platform string `json:"platform"`
Model string `json:"model"`
OccurredAt string `json:"occurred_at"`
Version int `json:"version"`
GatewaySyncStatus string `json:"gateway_sync_status"`
RetryCount int `json:"retry_count"`
NextRetryAt string `json:"next_retry_at,omitempty"`
LastFailureCategory string `json:"last_failure_category,omitempty"`
}
func fetchPackageChanges(ctx context.Context, baseURL, cursor string) ([]PackageChangeEvent, string, error) {
url := fmt.Sprintf("%s/internal/supply-intelligence/gateway/package-changes", baseURL)
if cursor != "" {
url += "?cursor=" + cursor
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, "", err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, "", err
}
if resp.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
var result struct {
Items []PackageChangeEvent `json:"items"`
NextCursor string `json:"next_cursor"`
}
if err := json.Unmarshal(body, &result); err != nil {
return nil, "", err
}
return result.Items, result.NextCursor, nil
}
func ackPackageChange(ctx context.Context, baseURL, eventID, consumer, result, detail string) error {
url := fmt.Sprintf("%s/internal/supply-intelligence/gateway/package-changes/%s/ack", baseURL, eventID)
payload := map[string]string{
"consumer": consumer,
"result": result,
"detail": detail,
}
body, _ := json.Marshal(payload)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(respBody))
}
return nil
}
func ensureBridgeTable(db *sql.DB) error {
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS supply_bridge_log (
id SERIAL PRIMARY KEY,
event_id TEXT NOT NULL UNIQUE,
package_id BIGINT,
platform TEXT,
model TEXT,
status TEXT,
result TEXT,
detail TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
)`)
return err
}
func bridgeToSub2API(db *sql.DB, evt PackageChangeEvent) error {
_, err := db.Exec(
`INSERT INTO supply_bridge_log (event_id, package_id, platform, model, status, result, detail)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (event_id) DO UPDATE SET
status = EXCLUDED.status,
result = EXCLUDED.result,
detail = EXCLUDED.detail,
created_at = NOW()`,
evt.EventID, evt.PackageID, evt.Platform, evt.Model, evt.GatewaySyncStatus, "applied", "synced to sub2api",
)
return err
}