Files
llm-intelligence/scripts/import_plan_catalog.go

553 lines
16 KiB
Go
Raw Normal View History

//go:build llm_script
package main
import (
"database/sql"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"sort"
"strings"
"time"
_ "github.com/lib/pq"
)
const defaultPlanCatalogSeedPaths = "seeds/plan_catalog_inventory_seed.json,seeds/plan_catalog_inventory_seed_cn_vendors_top20.json,seeds/plan_catalog_inventory_seed_cn_relays_top20plus.json,seeds/plan_catalog_inventory_seed_web_research.json"
type importPlanCatalogConfig struct {
SeedPaths string
DryRun bool
}
type planCatalogSeedEnvelope struct {
CheckedAt string `json:"checkedAt"`
Items []planCatalogSeedItem `json:"items"`
}
type planCatalogSeedItem struct {
CatalogCode string `json:"catalogCode"`
ProviderName string `json:"providerName"`
ProviderNameCn string `json:"providerNameCn"`
ProviderCountry string `json:"providerCountry"`
ProviderWebsite string `json:"providerWebsite"`
OperatorName string `json:"operatorName"`
OperatorNameCn string `json:"operatorNameCn"`
OperatorCountry string `json:"operatorCountry"`
OperatorWebsite string `json:"operatorWebsite"`
OperatorType string `json:"operatorType"`
PlatformName string `json:"platformName"`
PlatformNameCn string `json:"platformNameCn"`
PlatformType string `json:"platformType"`
PlanFamily string `json:"planFamily"`
PlanStatus string `json:"planStatus"`
SourceURL string `json:"sourceURL"`
SourceTitle string `json:"sourceTitle"`
SourceKind string `json:"sourceKind"`
Region string `json:"region"`
Currency string `json:"currency"`
BillingCycle string `json:"billingCycle"`
ImporterKey string `json:"importerKey"`
Notes string `json:"notes"`
CatalogSegment string `json:"catalogSegment"`
MarketRank int `json:"marketRank"`
}
type planCatalogRow struct {
CatalogCode string
ProviderName string
ProviderNameCn string
ProviderCountry string
ProviderWebsite string
OperatorName string
OperatorNameCn string
OperatorCountry string
OperatorWebsite string
OperatorType string
PlatformName string
PlatformNameCn string
PlatformType string
PlanFamily string
PlanStatus string
SourceURL string
SourceTitle string
SourceKind string
Region string
Currency string
BillingCycle string
ImporterKey string
Notes string
LastCheckedAt time.Time
CatalogSegment string
MarketRank int
}
func main() {
loadImportProjectEnv()
var seedPaths string
var dryRun bool
flag.StringVar(&seedPaths, "seed", defaultPlanCatalogSeedPaths, "基础目录 seed JSON 路径,支持逗号分隔多个文件")
flag.BoolVar(&dryRun, "dry-run", false, "仅校验并打印摘要,不写入数据库")
flag.Parse()
cfg := importPlanCatalogConfig{
SeedPaths: seedPaths,
DryRun: dryRun,
}
var db *sql.DB
var err error
if !cfg.DryRun {
dsn := os.Getenv("DATABASE_URL")
if dsn == "" {
dsn = "postgres://long@/llm_intelligence?host=/var/run/postgresql"
}
db, err = sql.Open("postgres", dsn)
if err != nil {
fmt.Fprintf(os.Stderr, "open db: %v\n", err)
os.Exit(1)
}
defer db.Close()
}
if err := runPlanCatalogImport(cfg, db, os.Stdout); err != nil {
fmt.Fprintf(os.Stderr, "import_plan_catalog: %v\n", err)
os.Exit(1)
}
}
func loadImportProjectEnv() {
for _, path := range []string{".env.local", ".env"} {
loadImportEnvFile(path)
}
}
func loadImportEnvFile(path string) {
data, err := os.ReadFile(path)
if err != nil {
return
}
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
key, value, ok := strings.Cut(line, "=")
if !ok {
continue
}
key = strings.TrimSpace(key)
value = strings.Trim(strings.TrimSpace(value), `"'`)
if key == "" {
continue
}
if _, exists := os.LookupEnv(key); exists {
continue
}
_ = os.Setenv(key, value)
}
}
func runPlanCatalogImport(cfg importPlanCatalogConfig, db *sql.DB, out io.Writer) error {
envelope, err := loadPlanCatalogSeeds(splitCSVPaths(cfg.SeedPaths))
if err != nil {
return err
}
rows, err := buildPlanCatalogRows(envelope)
if err != nil {
return err
}
if len(rows) == 0 {
return fmt.Errorf("seed is empty")
}
if cfg.DryRun {
_, err = fmt.Fprintf(
out,
"source=plan-catalog-import checked_at=%s rows=%d families=%s statuses=%s dry_run=true\n",
envelope.CheckedAt,
len(rows),
formatSummaryCount(countByField(rows, func(row planCatalogRow) string { return row.PlanFamily })),
formatSummaryCount(countByField(rows, func(row planCatalogRow) string { return row.PlanStatus })),
)
return err
}
if db == nil {
return fmt.Errorf("db is required when dry-run=false")
}
if err := upsertPlanCatalogInventory(db, rows); err != nil {
return err
}
var tableRows int
if err := db.QueryRow(`SELECT COUNT(*) FROM plan_catalog_inventory`).Scan(&tableRows); err != nil {
return fmt.Errorf("count plan_catalog_inventory: %w", err)
}
_, err = fmt.Fprintf(
out,
"source=plan-catalog-import checked_at=%s rows=%d table_rows=%d families=%s statuses=%s dry_run=false\n",
envelope.CheckedAt,
len(rows),
tableRows,
formatSummaryCount(countByField(rows, func(row planCatalogRow) string { return row.PlanFamily })),
formatSummaryCount(countByField(rows, func(row planCatalogRow) string { return row.PlanStatus })),
)
return err
}
func loadPlanCatalogSeed(path string) (planCatalogSeedEnvelope, error) {
data, err := os.ReadFile(path)
if err != nil {
return planCatalogSeedEnvelope{}, fmt.Errorf("read seed %s: %w", path, err)
}
var envelope planCatalogSeedEnvelope
if err := json.Unmarshal(data, &envelope); err != nil {
return planCatalogSeedEnvelope{}, fmt.Errorf("unmarshal seed %s: %w", path, err)
}
return envelope, nil
}
func loadPlanCatalogSeeds(paths []string) (planCatalogSeedEnvelope, error) {
if len(paths) == 0 {
return planCatalogSeedEnvelope{}, fmt.Errorf("at least one seed path is required")
}
mergedItems := make(map[string]planCatalogSeedItem)
var checkedAt string
for _, path := range paths {
envelope, err := loadPlanCatalogSeed(path)
if err != nil {
return planCatalogSeedEnvelope{}, err
}
if strings.TrimSpace(envelope.CheckedAt) != "" {
checkedAt = envelope.CheckedAt
}
for _, item := range envelope.Items {
mergedItems[item.CatalogCode] = item
}
}
codes := make([]string, 0, len(mergedItems))
for code := range mergedItems {
codes = append(codes, code)
}
sort.Strings(codes)
items := make([]planCatalogSeedItem, 0, len(codes))
for _, code := range codes {
items = append(items, mergedItems[code])
}
return planCatalogSeedEnvelope{
CheckedAt: checkedAt,
Items: items,
}, nil
}
func buildPlanCatalogRows(envelope planCatalogSeedEnvelope) ([]planCatalogRow, error) {
checkedAt, err := time.Parse(time.RFC3339, envelope.CheckedAt)
if err != nil {
return nil, fmt.Errorf("parse checkedAt: %w", err)
}
validPlatformTypes := map[string]bool{
"official_vendor": true,
"cloud_operator": true,
"relay_platform": true,
}
validPlanFamilies := map[string]bool{
"token_plan": true,
"coding_plan": true,
"package_plan": true,
"pay_as_you_go": true,
"unknown": true,
}
validPlanStatuses := map[string]bool{
"confirmed": true,
"pending_verification": true,
"retired": true,
}
validSourceKinds := map[string]bool{
"official_doc": true,
"official_pricing": true,
"official_product_page": true,
"official_community": true,
"inferred": true,
}
validCatalogSegments := map[string]bool{
"general": true,
"vendor_top20": true,
"relay_top20plus": true,
"global_reference": true,
}
rows := make([]planCatalogRow, 0, len(envelope.Items))
seenCodes := make(map[string]struct{}, len(envelope.Items))
for _, item := range envelope.Items {
if strings.TrimSpace(item.CatalogCode) == "" {
return nil, fmt.Errorf("catalogCode is required")
}
if _, exists := seenCodes[item.CatalogCode]; exists {
return nil, fmt.Errorf("duplicate catalogCode %q", item.CatalogCode)
}
seenCodes[item.CatalogCode] = struct{}{}
if !validPlatformTypes[item.PlatformType] {
return nil, fmt.Errorf("invalid platformType %q for %s", item.PlatformType, item.CatalogCode)
}
if !validPlanFamilies[item.PlanFamily] {
return nil, fmt.Errorf("invalid planFamily %q for %s", item.PlanFamily, item.CatalogCode)
}
if !validPlanStatuses[item.PlanStatus] {
return nil, fmt.Errorf("invalid planStatus %q for %s", item.PlanStatus, item.CatalogCode)
}
if !validSourceKinds[item.SourceKind] {
return nil, fmt.Errorf("invalid sourceKind %q for %s", item.SourceKind, item.CatalogCode)
}
segment := defaultIfEmpty(item.CatalogSegment, "general")
if !validCatalogSegments[segment] {
return nil, fmt.Errorf("invalid catalogSegment %q for %s", item.CatalogSegment, item.CatalogCode)
}
if item.MarketRank < 0 {
return nil, fmt.Errorf("invalid marketRank %d for %s", item.MarketRank, item.CatalogCode)
}
if strings.TrimSpace(item.ProviderName) == "" {
return nil, fmt.Errorf("providerName is required for %s", item.CatalogCode)
}
if strings.TrimSpace(item.PlatformName) == "" {
return nil, fmt.Errorf("platformName is required for %s", item.CatalogCode)
}
if strings.TrimSpace(item.SourceURL) == "" {
return nil, fmt.Errorf("sourceURL is required for %s", item.CatalogCode)
}
rows = append(rows, planCatalogRow{
CatalogCode: item.CatalogCode,
ProviderName: item.ProviderName,
ProviderNameCn: item.ProviderNameCn,
ProviderCountry: defaultIfEmpty(item.ProviderCountry, "unknown"),
ProviderWebsite: item.ProviderWebsite,
OperatorName: item.OperatorName,
OperatorNameCn: item.OperatorNameCn,
OperatorCountry: defaultIfEmpty(item.OperatorCountry, "unknown"),
OperatorWebsite: item.OperatorWebsite,
OperatorType: defaultIfEmpty(item.OperatorType, "official"),
PlatformName: item.PlatformName,
PlatformNameCn: item.PlatformNameCn,
PlatformType: item.PlatformType,
PlanFamily: item.PlanFamily,
PlanStatus: item.PlanStatus,
SourceURL: item.SourceURL,
SourceTitle: item.SourceTitle,
SourceKind: item.SourceKind,
Region: defaultIfEmpty(item.Region, "global"),
Currency: item.Currency,
BillingCycle: item.BillingCycle,
ImporterKey: item.ImporterKey,
Notes: item.Notes,
LastCheckedAt: checkedAt,
CatalogSegment: segment,
MarketRank: item.MarketRank,
})
}
return rows, nil
}
func upsertPlanCatalogInventory(db *sql.DB, rows []planCatalogRow) error {
for _, row := range rows {
providerID, err := ensurePlanCatalogProvider(db, row)
if err != nil {
return err
}
var operatorID any
if strings.TrimSpace(row.OperatorName) != "" {
id, err := ensurePlanCatalogOperator(db, row)
if err != nil {
return err
}
operatorID = id
}
_, err = db.Exec(
`INSERT INTO plan_catalog_inventory (
provider_id, operator_id, catalog_code, platform_name, platform_name_cn,
platform_type, plan_family, plan_status, source_url, source_title,
source_kind, region, currency, billing_cycle, last_checked_at,
importer_key, notes, catalog_segment, market_rank
) VALUES (
$1, $2, $3, $4, $5,
$6, $7, $8, $9, $10,
$11, $12, $13, $14, $15,
$16, $17, $18, $19
)
ON CONFLICT (catalog_code)
DO UPDATE SET
provider_id = EXCLUDED.provider_id,
operator_id = EXCLUDED.operator_id,
platform_name = EXCLUDED.platform_name,
platform_name_cn = EXCLUDED.platform_name_cn,
platform_type = EXCLUDED.platform_type,
plan_family = EXCLUDED.plan_family,
plan_status = EXCLUDED.plan_status,
source_url = EXCLUDED.source_url,
source_title = EXCLUDED.source_title,
source_kind = EXCLUDED.source_kind,
region = EXCLUDED.region,
currency = EXCLUDED.currency,
billing_cycle = EXCLUDED.billing_cycle,
last_checked_at = EXCLUDED.last_checked_at,
importer_key = EXCLUDED.importer_key,
notes = EXCLUDED.notes,
catalog_segment = EXCLUDED.catalog_segment,
market_rank = EXCLUDED.market_rank,
updated_at = CURRENT_TIMESTAMP`,
providerID, operatorID, row.CatalogCode, row.PlatformName, nullIfEmpty(row.PlatformNameCn),
row.PlatformType, row.PlanFamily, row.PlanStatus, row.SourceURL, nullIfEmpty(row.SourceTitle),
row.SourceKind, row.Region, nullIfEmpty(row.Currency), nullIfEmpty(row.BillingCycle), row.LastCheckedAt,
nullIfEmpty(row.ImporterKey), nullIfEmpty(row.Notes), row.CatalogSegment, nullIfZeroInt(row.MarketRank),
)
if err != nil {
return fmt.Errorf("upsert plan_catalog_inventory %s: %w", row.CatalogCode, err)
}
}
return nil
}
func ensurePlanCatalogProvider(db *sql.DB, row planCatalogRow) (int64, error) {
var providerID int64
err := db.QueryRow(`SELECT id FROM model_provider WHERE name = $1`, row.ProviderName).Scan(&providerID)
if err == nil {
_, updateErr := db.Exec(
`UPDATE model_provider
SET name_cn = COALESCE(NULLIF(name_cn, ''), $2),
country = CASE
WHEN COALESCE(country, '') = '' OR country = 'unknown' THEN $3
ELSE country
END,
website = COALESCE(NULLIF(website, ''), $4),
updated_at = CURRENT_TIMESTAMP
WHERE id = $1`,
providerID, nullIfEmpty(row.ProviderNameCn), row.ProviderCountry, nullIfEmpty(row.ProviderWebsite),
)
return providerID, updateErr
}
if err != sql.ErrNoRows {
return 0, err
}
err = db.QueryRow(
`INSERT INTO model_provider (name, name_cn, country, website, status)
VALUES ($1, $2, $3, $4, 'active')
RETURNING id`,
row.ProviderName, nullIfEmpty(row.ProviderNameCn), row.ProviderCountry, nullIfEmpty(row.ProviderWebsite),
).Scan(&providerID)
return providerID, err
}
func ensurePlanCatalogOperator(db *sql.DB, row planCatalogRow) (int64, error) {
var operatorID int64
err := db.QueryRow(`SELECT id FROM operator WHERE name = $1`, row.OperatorName).Scan(&operatorID)
if err == nil {
_, updateErr := db.Exec(
`UPDATE operator
SET name_cn = COALESCE(NULLIF(name_cn, ''), $2),
country = CASE
WHEN COALESCE(country, '') = '' OR country = 'unknown' THEN $3
ELSE country
END,
website = COALESCE(NULLIF(website, ''), $4),
description = COALESCE(NULLIF(description, ''), $5),
type = CASE
WHEN COALESCE(type, '') = '' OR type = 'reseller' THEN $6
ELSE type
END,
updated_at = CURRENT_TIMESTAMP
WHERE id = $1`,
operatorID,
nullIfEmpty(row.OperatorNameCn),
row.OperatorCountry,
nullIfEmpty(row.OperatorWebsite),
fmt.Sprintf("%s catalog inventory", row.PlatformName),
nullIfEmpty(row.OperatorType),
)
return operatorID, updateErr
}
if err != sql.ErrNoRows {
return 0, err
}
err = db.QueryRow(
`INSERT INTO operator (name, name_cn, country, website, description, status, type)
VALUES ($1, $2, $3, $4, $5, 'active', $6)
RETURNING id`,
row.OperatorName, nullIfEmpty(row.OperatorNameCn), row.OperatorCountry, nullIfEmpty(row.OperatorWebsite),
fmt.Sprintf("%s catalog inventory", row.PlatformName), row.OperatorType,
).Scan(&operatorID)
return operatorID, err
}
func countByField(rows []planCatalogRow, getter func(planCatalogRow) string) map[string]int {
result := make(map[string]int)
for _, row := range rows {
result[getter(row)]++
}
return result
}
func formatSummaryCount(values map[string]int) string {
keys := make([]string, 0, len(values))
for key := range values {
keys = append(keys, key)
}
sort.Strings(keys)
parts := make([]string, 0, len(keys))
for _, key := range keys {
parts = append(parts, fmt.Sprintf("%s:%d", key, values[key]))
}
return strings.Join(parts, ",")
}
func defaultIfEmpty(value string, fallback string) string {
if strings.TrimSpace(value) == "" {
return fallback
}
return value
}
func splitCSVPaths(raw string) []string {
parts := strings.Split(raw, ",")
paths := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
paths = append(paths, part)
}
}
return paths
}
func nullIfEmpty(value string) any {
if strings.TrimSpace(value) == "" {
return nil
}
return value
}
func nullIfZeroInt(value int) any {
if value == 0 {
return nil
}
return value
}