Add plan catalog and subscription schema support, seed baselines, and real importers for core domestic subscriptions plus stable official pricing sources. This commit also hardens the shared fetch layers so the importers can support live collection and database writes instead of relying on manual placeholders alone.
553 lines
16 KiB
Go
553 lines
16 KiB
Go
//go:build llm_script
|
|
|
|
package main
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
const defaultPlanCatalogSeedPaths = "seeds/plan_catalog_inventory_seed.json,seeds/plan_catalog_inventory_seed_cn_vendors_top20.json,seeds/plan_catalog_inventory_seed_cn_relays_top20plus.json,seeds/plan_catalog_inventory_seed_web_research.json"
|
|
|
|
type importPlanCatalogConfig struct {
|
|
SeedPaths string
|
|
DryRun bool
|
|
}
|
|
|
|
type planCatalogSeedEnvelope struct {
|
|
CheckedAt string `json:"checkedAt"`
|
|
Items []planCatalogSeedItem `json:"items"`
|
|
}
|
|
|
|
type planCatalogSeedItem struct {
|
|
CatalogCode string `json:"catalogCode"`
|
|
ProviderName string `json:"providerName"`
|
|
ProviderNameCn string `json:"providerNameCn"`
|
|
ProviderCountry string `json:"providerCountry"`
|
|
ProviderWebsite string `json:"providerWebsite"`
|
|
OperatorName string `json:"operatorName"`
|
|
OperatorNameCn string `json:"operatorNameCn"`
|
|
OperatorCountry string `json:"operatorCountry"`
|
|
OperatorWebsite string `json:"operatorWebsite"`
|
|
OperatorType string `json:"operatorType"`
|
|
PlatformName string `json:"platformName"`
|
|
PlatformNameCn string `json:"platformNameCn"`
|
|
PlatformType string `json:"platformType"`
|
|
PlanFamily string `json:"planFamily"`
|
|
PlanStatus string `json:"planStatus"`
|
|
SourceURL string `json:"sourceURL"`
|
|
SourceTitle string `json:"sourceTitle"`
|
|
SourceKind string `json:"sourceKind"`
|
|
Region string `json:"region"`
|
|
Currency string `json:"currency"`
|
|
BillingCycle string `json:"billingCycle"`
|
|
ImporterKey string `json:"importerKey"`
|
|
Notes string `json:"notes"`
|
|
CatalogSegment string `json:"catalogSegment"`
|
|
MarketRank int `json:"marketRank"`
|
|
}
|
|
|
|
type planCatalogRow struct {
|
|
CatalogCode string
|
|
ProviderName string
|
|
ProviderNameCn string
|
|
ProviderCountry string
|
|
ProviderWebsite string
|
|
OperatorName string
|
|
OperatorNameCn string
|
|
OperatorCountry string
|
|
OperatorWebsite string
|
|
OperatorType string
|
|
PlatformName string
|
|
PlatformNameCn string
|
|
PlatformType string
|
|
PlanFamily string
|
|
PlanStatus string
|
|
SourceURL string
|
|
SourceTitle string
|
|
SourceKind string
|
|
Region string
|
|
Currency string
|
|
BillingCycle string
|
|
ImporterKey string
|
|
Notes string
|
|
LastCheckedAt time.Time
|
|
CatalogSegment string
|
|
MarketRank int
|
|
}
|
|
|
|
func main() {
|
|
loadImportProjectEnv()
|
|
|
|
var seedPaths string
|
|
var dryRun bool
|
|
|
|
flag.StringVar(&seedPaths, "seed", defaultPlanCatalogSeedPaths, "基础目录 seed JSON 路径,支持逗号分隔多个文件")
|
|
flag.BoolVar(&dryRun, "dry-run", false, "仅校验并打印摘要,不写入数据库")
|
|
flag.Parse()
|
|
|
|
cfg := importPlanCatalogConfig{
|
|
SeedPaths: seedPaths,
|
|
DryRun: dryRun,
|
|
}
|
|
|
|
var db *sql.DB
|
|
var err error
|
|
if !cfg.DryRun {
|
|
dsn := os.Getenv("DATABASE_URL")
|
|
if dsn == "" {
|
|
dsn = "postgres://long@/llm_intelligence?host=/var/run/postgresql"
|
|
}
|
|
db, err = sql.Open("postgres", dsn)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "open db: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
defer db.Close()
|
|
}
|
|
|
|
if err := runPlanCatalogImport(cfg, db, os.Stdout); err != nil {
|
|
fmt.Fprintf(os.Stderr, "import_plan_catalog: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func loadImportProjectEnv() {
|
|
for _, path := range []string{".env.local", ".env"} {
|
|
loadImportEnvFile(path)
|
|
}
|
|
}
|
|
|
|
func loadImportEnvFile(path string) {
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for _, line := range strings.Split(string(data), "\n") {
|
|
line = strings.TrimSpace(line)
|
|
if line == "" || strings.HasPrefix(line, "#") {
|
|
continue
|
|
}
|
|
key, value, ok := strings.Cut(line, "=")
|
|
if !ok {
|
|
continue
|
|
}
|
|
key = strings.TrimSpace(key)
|
|
value = strings.Trim(strings.TrimSpace(value), `"'`)
|
|
if key == "" {
|
|
continue
|
|
}
|
|
if _, exists := os.LookupEnv(key); exists {
|
|
continue
|
|
}
|
|
_ = os.Setenv(key, value)
|
|
}
|
|
}
|
|
|
|
func runPlanCatalogImport(cfg importPlanCatalogConfig, db *sql.DB, out io.Writer) error {
|
|
envelope, err := loadPlanCatalogSeeds(splitCSVPaths(cfg.SeedPaths))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rows, err := buildPlanCatalogRows(envelope)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(rows) == 0 {
|
|
return fmt.Errorf("seed is empty")
|
|
}
|
|
|
|
if cfg.DryRun {
|
|
_, err = fmt.Fprintf(
|
|
out,
|
|
"source=plan-catalog-import checked_at=%s rows=%d families=%s statuses=%s dry_run=true\n",
|
|
envelope.CheckedAt,
|
|
len(rows),
|
|
formatSummaryCount(countByField(rows, func(row planCatalogRow) string { return row.PlanFamily })),
|
|
formatSummaryCount(countByField(rows, func(row planCatalogRow) string { return row.PlanStatus })),
|
|
)
|
|
return err
|
|
}
|
|
if db == nil {
|
|
return fmt.Errorf("db is required when dry-run=false")
|
|
}
|
|
|
|
if err := upsertPlanCatalogInventory(db, rows); err != nil {
|
|
return err
|
|
}
|
|
|
|
var tableRows int
|
|
if err := db.QueryRow(`SELECT COUNT(*) FROM plan_catalog_inventory`).Scan(&tableRows); err != nil {
|
|
return fmt.Errorf("count plan_catalog_inventory: %w", err)
|
|
}
|
|
|
|
_, err = fmt.Fprintf(
|
|
out,
|
|
"source=plan-catalog-import checked_at=%s rows=%d table_rows=%d families=%s statuses=%s dry_run=false\n",
|
|
envelope.CheckedAt,
|
|
len(rows),
|
|
tableRows,
|
|
formatSummaryCount(countByField(rows, func(row planCatalogRow) string { return row.PlanFamily })),
|
|
formatSummaryCount(countByField(rows, func(row planCatalogRow) string { return row.PlanStatus })),
|
|
)
|
|
return err
|
|
}
|
|
|
|
func loadPlanCatalogSeed(path string) (planCatalogSeedEnvelope, error) {
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return planCatalogSeedEnvelope{}, fmt.Errorf("read seed %s: %w", path, err)
|
|
}
|
|
|
|
var envelope planCatalogSeedEnvelope
|
|
if err := json.Unmarshal(data, &envelope); err != nil {
|
|
return planCatalogSeedEnvelope{}, fmt.Errorf("unmarshal seed %s: %w", path, err)
|
|
}
|
|
return envelope, nil
|
|
}
|
|
|
|
func loadPlanCatalogSeeds(paths []string) (planCatalogSeedEnvelope, error) {
|
|
if len(paths) == 0 {
|
|
return planCatalogSeedEnvelope{}, fmt.Errorf("at least one seed path is required")
|
|
}
|
|
|
|
mergedItems := make(map[string]planCatalogSeedItem)
|
|
var checkedAt string
|
|
for _, path := range paths {
|
|
envelope, err := loadPlanCatalogSeed(path)
|
|
if err != nil {
|
|
return planCatalogSeedEnvelope{}, err
|
|
}
|
|
if strings.TrimSpace(envelope.CheckedAt) != "" {
|
|
checkedAt = envelope.CheckedAt
|
|
}
|
|
for _, item := range envelope.Items {
|
|
mergedItems[item.CatalogCode] = item
|
|
}
|
|
}
|
|
|
|
codes := make([]string, 0, len(mergedItems))
|
|
for code := range mergedItems {
|
|
codes = append(codes, code)
|
|
}
|
|
sort.Strings(codes)
|
|
|
|
items := make([]planCatalogSeedItem, 0, len(codes))
|
|
for _, code := range codes {
|
|
items = append(items, mergedItems[code])
|
|
}
|
|
|
|
return planCatalogSeedEnvelope{
|
|
CheckedAt: checkedAt,
|
|
Items: items,
|
|
}, nil
|
|
}
|
|
|
|
func buildPlanCatalogRows(envelope planCatalogSeedEnvelope) ([]planCatalogRow, error) {
|
|
checkedAt, err := time.Parse(time.RFC3339, envelope.CheckedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse checkedAt: %w", err)
|
|
}
|
|
|
|
validPlatformTypes := map[string]bool{
|
|
"official_vendor": true,
|
|
"cloud_operator": true,
|
|
"relay_platform": true,
|
|
}
|
|
validPlanFamilies := map[string]bool{
|
|
"token_plan": true,
|
|
"coding_plan": true,
|
|
"package_plan": true,
|
|
"pay_as_you_go": true,
|
|
"unknown": true,
|
|
}
|
|
validPlanStatuses := map[string]bool{
|
|
"confirmed": true,
|
|
"pending_verification": true,
|
|
"retired": true,
|
|
}
|
|
validSourceKinds := map[string]bool{
|
|
"official_doc": true,
|
|
"official_pricing": true,
|
|
"official_product_page": true,
|
|
"official_community": true,
|
|
"inferred": true,
|
|
}
|
|
validCatalogSegments := map[string]bool{
|
|
"general": true,
|
|
"vendor_top20": true,
|
|
"relay_top20plus": true,
|
|
"global_reference": true,
|
|
}
|
|
|
|
rows := make([]planCatalogRow, 0, len(envelope.Items))
|
|
seenCodes := make(map[string]struct{}, len(envelope.Items))
|
|
for _, item := range envelope.Items {
|
|
if strings.TrimSpace(item.CatalogCode) == "" {
|
|
return nil, fmt.Errorf("catalogCode is required")
|
|
}
|
|
if _, exists := seenCodes[item.CatalogCode]; exists {
|
|
return nil, fmt.Errorf("duplicate catalogCode %q", item.CatalogCode)
|
|
}
|
|
seenCodes[item.CatalogCode] = struct{}{}
|
|
if !validPlatformTypes[item.PlatformType] {
|
|
return nil, fmt.Errorf("invalid platformType %q for %s", item.PlatformType, item.CatalogCode)
|
|
}
|
|
if !validPlanFamilies[item.PlanFamily] {
|
|
return nil, fmt.Errorf("invalid planFamily %q for %s", item.PlanFamily, item.CatalogCode)
|
|
}
|
|
if !validPlanStatuses[item.PlanStatus] {
|
|
return nil, fmt.Errorf("invalid planStatus %q for %s", item.PlanStatus, item.CatalogCode)
|
|
}
|
|
if !validSourceKinds[item.SourceKind] {
|
|
return nil, fmt.Errorf("invalid sourceKind %q for %s", item.SourceKind, item.CatalogCode)
|
|
}
|
|
segment := defaultIfEmpty(item.CatalogSegment, "general")
|
|
if !validCatalogSegments[segment] {
|
|
return nil, fmt.Errorf("invalid catalogSegment %q for %s", item.CatalogSegment, item.CatalogCode)
|
|
}
|
|
if item.MarketRank < 0 {
|
|
return nil, fmt.Errorf("invalid marketRank %d for %s", item.MarketRank, item.CatalogCode)
|
|
}
|
|
if strings.TrimSpace(item.ProviderName) == "" {
|
|
return nil, fmt.Errorf("providerName is required for %s", item.CatalogCode)
|
|
}
|
|
if strings.TrimSpace(item.PlatformName) == "" {
|
|
return nil, fmt.Errorf("platformName is required for %s", item.CatalogCode)
|
|
}
|
|
if strings.TrimSpace(item.SourceURL) == "" {
|
|
return nil, fmt.Errorf("sourceURL is required for %s", item.CatalogCode)
|
|
}
|
|
|
|
rows = append(rows, planCatalogRow{
|
|
CatalogCode: item.CatalogCode,
|
|
ProviderName: item.ProviderName,
|
|
ProviderNameCn: item.ProviderNameCn,
|
|
ProviderCountry: defaultIfEmpty(item.ProviderCountry, "unknown"),
|
|
ProviderWebsite: item.ProviderWebsite,
|
|
OperatorName: item.OperatorName,
|
|
OperatorNameCn: item.OperatorNameCn,
|
|
OperatorCountry: defaultIfEmpty(item.OperatorCountry, "unknown"),
|
|
OperatorWebsite: item.OperatorWebsite,
|
|
OperatorType: defaultIfEmpty(item.OperatorType, "official"),
|
|
PlatformName: item.PlatformName,
|
|
PlatformNameCn: item.PlatformNameCn,
|
|
PlatformType: item.PlatformType,
|
|
PlanFamily: item.PlanFamily,
|
|
PlanStatus: item.PlanStatus,
|
|
SourceURL: item.SourceURL,
|
|
SourceTitle: item.SourceTitle,
|
|
SourceKind: item.SourceKind,
|
|
Region: defaultIfEmpty(item.Region, "global"),
|
|
Currency: item.Currency,
|
|
BillingCycle: item.BillingCycle,
|
|
ImporterKey: item.ImporterKey,
|
|
Notes: item.Notes,
|
|
LastCheckedAt: checkedAt,
|
|
CatalogSegment: segment,
|
|
MarketRank: item.MarketRank,
|
|
})
|
|
}
|
|
return rows, nil
|
|
}
|
|
|
|
func upsertPlanCatalogInventory(db *sql.DB, rows []planCatalogRow) error {
|
|
for _, row := range rows {
|
|
providerID, err := ensurePlanCatalogProvider(db, row)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var operatorID any
|
|
if strings.TrimSpace(row.OperatorName) != "" {
|
|
id, err := ensurePlanCatalogOperator(db, row)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
operatorID = id
|
|
}
|
|
|
|
_, err = db.Exec(
|
|
`INSERT INTO plan_catalog_inventory (
|
|
provider_id, operator_id, catalog_code, platform_name, platform_name_cn,
|
|
platform_type, plan_family, plan_status, source_url, source_title,
|
|
source_kind, region, currency, billing_cycle, last_checked_at,
|
|
importer_key, notes, catalog_segment, market_rank
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5,
|
|
$6, $7, $8, $9, $10,
|
|
$11, $12, $13, $14, $15,
|
|
$16, $17, $18, $19
|
|
)
|
|
ON CONFLICT (catalog_code)
|
|
DO UPDATE SET
|
|
provider_id = EXCLUDED.provider_id,
|
|
operator_id = EXCLUDED.operator_id,
|
|
platform_name = EXCLUDED.platform_name,
|
|
platform_name_cn = EXCLUDED.platform_name_cn,
|
|
platform_type = EXCLUDED.platform_type,
|
|
plan_family = EXCLUDED.plan_family,
|
|
plan_status = EXCLUDED.plan_status,
|
|
source_url = EXCLUDED.source_url,
|
|
source_title = EXCLUDED.source_title,
|
|
source_kind = EXCLUDED.source_kind,
|
|
region = EXCLUDED.region,
|
|
currency = EXCLUDED.currency,
|
|
billing_cycle = EXCLUDED.billing_cycle,
|
|
last_checked_at = EXCLUDED.last_checked_at,
|
|
importer_key = EXCLUDED.importer_key,
|
|
notes = EXCLUDED.notes,
|
|
catalog_segment = EXCLUDED.catalog_segment,
|
|
market_rank = EXCLUDED.market_rank,
|
|
updated_at = CURRENT_TIMESTAMP`,
|
|
providerID, operatorID, row.CatalogCode, row.PlatformName, nullIfEmpty(row.PlatformNameCn),
|
|
row.PlatformType, row.PlanFamily, row.PlanStatus, row.SourceURL, nullIfEmpty(row.SourceTitle),
|
|
row.SourceKind, row.Region, nullIfEmpty(row.Currency), nullIfEmpty(row.BillingCycle), row.LastCheckedAt,
|
|
nullIfEmpty(row.ImporterKey), nullIfEmpty(row.Notes), row.CatalogSegment, nullIfZeroInt(row.MarketRank),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("upsert plan_catalog_inventory %s: %w", row.CatalogCode, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func ensurePlanCatalogProvider(db *sql.DB, row planCatalogRow) (int64, error) {
|
|
var providerID int64
|
|
err := db.QueryRow(`SELECT id FROM model_provider WHERE name = $1`, row.ProviderName).Scan(&providerID)
|
|
if err == nil {
|
|
_, updateErr := db.Exec(
|
|
`UPDATE model_provider
|
|
SET name_cn = COALESCE(NULLIF(name_cn, ''), $2),
|
|
country = CASE
|
|
WHEN COALESCE(country, '') = '' OR country = 'unknown' THEN $3
|
|
ELSE country
|
|
END,
|
|
website = COALESCE(NULLIF(website, ''), $4),
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = $1`,
|
|
providerID, nullIfEmpty(row.ProviderNameCn), row.ProviderCountry, nullIfEmpty(row.ProviderWebsite),
|
|
)
|
|
return providerID, updateErr
|
|
}
|
|
if err != sql.ErrNoRows {
|
|
return 0, err
|
|
}
|
|
|
|
err = db.QueryRow(
|
|
`INSERT INTO model_provider (name, name_cn, country, website, status)
|
|
VALUES ($1, $2, $3, $4, 'active')
|
|
RETURNING id`,
|
|
row.ProviderName, nullIfEmpty(row.ProviderNameCn), row.ProviderCountry, nullIfEmpty(row.ProviderWebsite),
|
|
).Scan(&providerID)
|
|
return providerID, err
|
|
}
|
|
|
|
func ensurePlanCatalogOperator(db *sql.DB, row planCatalogRow) (int64, error) {
|
|
var operatorID int64
|
|
err := db.QueryRow(`SELECT id FROM operator WHERE name = $1`, row.OperatorName).Scan(&operatorID)
|
|
if err == nil {
|
|
_, updateErr := db.Exec(
|
|
`UPDATE operator
|
|
SET name_cn = COALESCE(NULLIF(name_cn, ''), $2),
|
|
country = CASE
|
|
WHEN COALESCE(country, '') = '' OR country = 'unknown' THEN $3
|
|
ELSE country
|
|
END,
|
|
website = COALESCE(NULLIF(website, ''), $4),
|
|
description = COALESCE(NULLIF(description, ''), $5),
|
|
type = CASE
|
|
WHEN COALESCE(type, '') = '' OR type = 'reseller' THEN $6
|
|
ELSE type
|
|
END,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = $1`,
|
|
operatorID,
|
|
nullIfEmpty(row.OperatorNameCn),
|
|
row.OperatorCountry,
|
|
nullIfEmpty(row.OperatorWebsite),
|
|
fmt.Sprintf("%s catalog inventory", row.PlatformName),
|
|
nullIfEmpty(row.OperatorType),
|
|
)
|
|
return operatorID, updateErr
|
|
}
|
|
if err != sql.ErrNoRows {
|
|
return 0, err
|
|
}
|
|
|
|
err = db.QueryRow(
|
|
`INSERT INTO operator (name, name_cn, country, website, description, status, type)
|
|
VALUES ($1, $2, $3, $4, $5, 'active', $6)
|
|
RETURNING id`,
|
|
row.OperatorName, nullIfEmpty(row.OperatorNameCn), row.OperatorCountry, nullIfEmpty(row.OperatorWebsite),
|
|
fmt.Sprintf("%s catalog inventory", row.PlatformName), row.OperatorType,
|
|
).Scan(&operatorID)
|
|
return operatorID, err
|
|
}
|
|
|
|
func countByField(rows []planCatalogRow, getter func(planCatalogRow) string) map[string]int {
|
|
result := make(map[string]int)
|
|
for _, row := range rows {
|
|
result[getter(row)]++
|
|
}
|
|
return result
|
|
}
|
|
|
|
func formatSummaryCount(values map[string]int) string {
|
|
keys := make([]string, 0, len(values))
|
|
for key := range values {
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
|
|
parts := make([]string, 0, len(keys))
|
|
for _, key := range keys {
|
|
parts = append(parts, fmt.Sprintf("%s:%d", key, values[key]))
|
|
}
|
|
return strings.Join(parts, ",")
|
|
}
|
|
|
|
func defaultIfEmpty(value string, fallback string) string {
|
|
if strings.TrimSpace(value) == "" {
|
|
return fallback
|
|
}
|
|
return value
|
|
}
|
|
|
|
func splitCSVPaths(raw string) []string {
|
|
parts := strings.Split(raw, ",")
|
|
paths := make([]string, 0, len(parts))
|
|
for _, part := range parts {
|
|
part = strings.TrimSpace(part)
|
|
if part != "" {
|
|
paths = append(paths, part)
|
|
}
|
|
}
|
|
return paths
|
|
}
|
|
|
|
func nullIfEmpty(value string) any {
|
|
if strings.TrimSpace(value) == "" {
|
|
return nil
|
|
}
|
|
return value
|
|
}
|
|
|
|
func nullIfZeroInt(value int) any {
|
|
if value == 0 {
|
|
return nil
|
|
}
|
|
return value
|
|
}
|