182 lines
5.0 KiB
Go
182 lines
5.0 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
func main() {
|
|
supplyURL := os.Getenv("SUPPLY_URL")
|
|
if supplyURL == "" {
|
|
supplyURL = "http://127.0.0.1:8081"
|
|
}
|
|
consumer := os.Getenv("CONSUMER")
|
|
if consumer == "" {
|
|
consumer = "sub2api-bridge"
|
|
}
|
|
dbConn := os.Getenv("SUB2API_DB")
|
|
if dbConn == "" {
|
|
dbConn = "postgres://sub2api:***@localhost:5432/sub2api?sslmode=disable"
|
|
}
|
|
|
|
db, err := sql.Open("postgres", dbConn)
|
|
if err != nil {
|
|
log.Fatalf("open db: %v", err)
|
|
}
|
|
defer db.Close()
|
|
if err := db.Ping(); err != nil {
|
|
log.Fatalf("ping db: %v", err)
|
|
}
|
|
log.Println("connected to sub2api db")
|
|
if err := ensureBridgeTable(db); err != nil {
|
|
log.Fatalf("ensure table: %v", err)
|
|
}
|
|
|
|
cursor := ""
|
|
for {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
events, nextCursor, err := fetchPackageChanges(ctx, supplyURL, cursor)
|
|
cancel()
|
|
if err != nil {
|
|
log.Printf("fetch error: %v", err)
|
|
time.Sleep(10 * time.Second)
|
|
continue
|
|
}
|
|
for _, evt := range events {
|
|
if evt.GatewaySyncStatus != "pending" {
|
|
log.Printf("skip non-pending event: %s status=%s", evt.EventID, evt.GatewaySyncStatus)
|
|
continue
|
|
}
|
|
log.Printf("bridge event: %s package=%d model=%s", evt.EventID, evt.PackageID, evt.Model)
|
|
if err := bridgeToSub2API(db, evt); err != nil {
|
|
log.Printf("bridge error: %v", err)
|
|
continue
|
|
}
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
|
|
ackErr := ackPackageChange(ctx2, supplyURL, evt.EventID, consumer, "applied", "synced to sub2api")
|
|
cancel2()
|
|
if ackErr != nil {
|
|
log.Printf("ack error for %s: %v", evt.EventID, ackErr)
|
|
continue
|
|
}
|
|
log.Printf("acked event: %s", evt.EventID)
|
|
}
|
|
if nextCursor == "" {
|
|
log.Println("no more events, sleeping 10s")
|
|
time.Sleep(10 * time.Second)
|
|
} else {
|
|
cursor = nextCursor
|
|
}
|
|
}
|
|
}
|
|
|
|
type PackageChangeEvent struct {
|
|
EventID string `json:"event_id"`
|
|
AccountID int64 `json:"account_id"`
|
|
EventType string `json:"event_type"`
|
|
PackageID int64 `json:"package_id"`
|
|
Platform string `json:"platform"`
|
|
Model string `json:"model"`
|
|
OccurredAt string `json:"occurred_at"`
|
|
Version int `json:"version"`
|
|
GatewaySyncStatus string `json:"gateway_sync_status"`
|
|
RetryCount int `json:"retry_count"`
|
|
NextRetryAt string `json:"next_retry_at,omitempty"`
|
|
LastFailureCategory string `json:"last_failure_category,omitempty"`
|
|
}
|
|
|
|
func fetchPackageChanges(ctx context.Context, baseURL, cursor string) ([]PackageChangeEvent, string, error) {
|
|
url := fmt.Sprintf("%s/internal/supply-intelligence/gateway/package-changes", baseURL)
|
|
if cursor != "" {
|
|
url += "?cursor=" + cursor
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, "", fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
var result struct {
|
|
Items []PackageChangeEvent `json:"items"`
|
|
NextCursor string `json:"next_cursor"`
|
|
}
|
|
if err := json.Unmarshal(body, &result); err != nil {
|
|
return nil, "", err
|
|
}
|
|
return result.Items, result.NextCursor, nil
|
|
}
|
|
|
|
func ackPackageChange(ctx context.Context, baseURL, eventID, consumer, result, detail string) error {
|
|
url := fmt.Sprintf("%s/internal/supply-intelligence/gateway/package-changes/%s/ack", baseURL, eventID)
|
|
payload := map[string]string{
|
|
"consumer": consumer,
|
|
"result": result,
|
|
"detail": detail,
|
|
}
|
|
body, _ := json.Marshal(payload)
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusNoContent {
|
|
respBody, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(respBody))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func ensureBridgeTable(db *sql.DB) error {
|
|
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS supply_bridge_log (
|
|
id SERIAL PRIMARY KEY,
|
|
event_id TEXT NOT NULL UNIQUE,
|
|
package_id BIGINT,
|
|
platform TEXT,
|
|
model TEXT,
|
|
status TEXT,
|
|
result TEXT,
|
|
detail TEXT,
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
)`)
|
|
return err
|
|
}
|
|
|
|
func bridgeToSub2API(db *sql.DB, evt PackageChangeEvent) error {
|
|
_, err := db.Exec(
|
|
`INSERT INTO supply_bridge_log (event_id, package_id, platform, model, status, result, detail)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
ON CONFLICT (event_id) DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
result = EXCLUDED.result,
|
|
detail = EXCLUDED.detail,
|
|
created_at = NOW()`,
|
|
evt.EventID, evt.PackageID, evt.Platform, evt.Model, evt.GatewaySyncStatus, "applied", "synced to sub2api",
|
|
)
|
|
return err
|
|
}
|