Files
sub2api-cn-relay-manager/internal/provision/batch_detail_and_reconcile_service.go

351 lines
12 KiB
Go

package provision
import (
"context"
"encoding/json"
"fmt"
"strings"
"sub2api-cn-relay-manager/internal/host/sub2api"
"sub2api-cn-relay-manager/internal/pack"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
type BatchDetailResult struct {
Batch sqlite.ImportBatch
Items []sqlite.ImportBatchItem
ManagedResources []sqlite.ManagedResource
AccessClosures []sqlite.AccessClosureRecord
ReconcileRuns []sqlite.ReconcileRun
}
type BatchDetailService struct {
store *sqlite.DB
}
func NewBatchDetailService(store *sqlite.DB) *BatchDetailService {
return &BatchDetailService{store: store}
}
func (s *BatchDetailService) Get(ctx context.Context, batchID int64) (BatchDetailResult, error) {
if s == nil || s.store == nil {
return BatchDetailResult{}, fmt.Errorf("store is required")
}
batch, err := s.store.ImportBatches().GetByID(ctx, batchID)
if err != nil {
return BatchDetailResult{}, err
}
items, err := s.store.ImportBatchItems().GetByBatchID(ctx, batchID)
if err != nil {
return BatchDetailResult{}, err
}
managedResources, err := s.store.ManagedResources().GetByBatchID(ctx, batchID)
if err != nil {
return BatchDetailResult{}, err
}
accessClosures, err := s.store.AccessClosures().GetByBatchID(ctx, batchID)
if err != nil {
return BatchDetailResult{}, err
}
reconcileRuns, err := s.store.ReconcileRuns().GetByBatchID(ctx, batchID)
if err != nil {
return BatchDetailResult{}, err
}
return BatchDetailResult{
Batch: batch,
Items: items,
ManagedResources: managedResources,
AccessClosures: accessClosures,
ReconcileRuns: reconcileRuns,
}, nil
}
type ReconcileRequest struct {
HostID string
HostBaseURL string
AccessProbeAPIKey string
Pack pack.LoadedPack
Provider pack.ProviderManifest
}
type ReconcileResult struct {
BatchID int64
Status string
MissingCount int
ExtraCount int
ProbeFailureCount int
AccessStatus string
Summary map[string]any
}
type ReconcileService struct {
store *sqlite.DB
host sub2api.HostAdapter
}
func NewReconcileService(store *sqlite.DB, host sub2api.HostAdapter) *ReconcileService {
return &ReconcileService{store: store, host: host}
}
func (s *ReconcileService) Reconcile(ctx context.Context, req ReconcileRequest) (ReconcileResult, error) {
if s == nil || s.store == nil {
return ReconcileResult{}, fmt.Errorf("store is required")
}
if s.host == nil {
return ReconcileResult{}, fmt.Errorf("host adapter is required")
}
if strings.TrimSpace(req.HostID) == "" {
return ReconcileResult{}, fmt.Errorf("host_id is required")
}
if strings.TrimSpace(req.HostBaseURL) == "" {
return ReconcileResult{}, fmt.Errorf("host_base_url is required")
}
hostVersion, err := s.host.GetHostVersion(ctx)
if err != nil {
return ReconcileResult{}, fmt.Errorf("get host version: %w", err)
}
if err := pack.CheckHostCompatibility(req.Pack.Manifest, hostVersion); err != nil {
return ReconcileResult{}, err
}
packRow, err := s.store.Packs().GetByPackID(ctx, req.Pack.Manifest.PackID)
if err != nil {
return ReconcileResult{}, err
}
providerRow, err := s.store.Providers().GetByPackIDAndProviderID(ctx, packRow.ID, req.Provider.ProviderID)
if err != nil {
return ReconcileResult{}, err
}
hostRow, err := s.store.Hosts().GetByHostID(ctx, req.HostID)
if err != nil {
return ReconcileResult{}, err
}
batchRow, err := s.store.ImportBatches().GetLatestByProviderIDAndHostID(ctx, providerRow.ID, hostRow.ID)
if err != nil {
return ReconcileResult{}, err
}
switch strings.TrimSpace(batchRow.BatchStatus) {
case BatchStatusSucceeded, BatchStatusPartial:
default:
return ReconcileResult{}, fmt.Errorf("latest import batch is %s; run import again before reconcile", batchRow.BatchStatus)
}
storedResources, err := s.store.ManagedResources().GetByBatchID(ctx, batchRow.ID)
if err != nil {
return ReconcileResult{}, err
}
batchItems, err := s.store.ImportBatchItems().GetByBatchID(ctx, batchRow.ID)
if err != nil {
return ReconcileResult{}, err
}
accessClosures, err := s.store.AccessClosures().GetByBatchID(ctx, batchRow.ID)
if err != nil {
return ReconcileResult{}, err
}
snapshot, err := s.host.ListManagedResources(ctx, buildManagedResourceListRequest(req.Provider, accessClosureType(accessClosures)))
if err != nil {
return ReconcileResult{}, fmt.Errorf("list managed resources: %w", err)
}
missing, extra := diffManagedResources(storedResources, snapshot)
probeFailures, err := s.rerunAccountProbes(ctx, batchItems, req.Provider.SmokeTestModel)
if err != nil {
return ReconcileResult{}, err
}
accessStatus, accessChecked, err := s.rerunAccessClosure(ctx, batchRow.ID, accessClosures, req.AccessProbeAPIKey, req.Provider.SmokeTestModel)
if err != nil {
return ReconcileResult{}, err
}
status := "active"
if missing > 0 || extra > 0 {
status = "drifted"
} else if probeFailures > 0 || (accessChecked && accessStatus == AccessStatusBroken) {
status = "degraded"
}
summary := map[string]any{
"missing_count": missing,
"extra_count": extra,
"host_version": hostVersion,
"probe_failures": probeFailures,
"access_status": accessStatus,
"access_rechecked": accessChecked,
}
summaryJSON, err := json.Marshal(summary)
if err != nil {
return ReconcileResult{}, fmt.Errorf("marshal reconcile summary: %w", err)
}
if _, err := s.store.ReconcileRuns().Create(ctx, sqlite.ReconcileRun{BatchID: batchRow.ID, HostID: hostRow.ID, ProviderID: providerRow.ID, Status: status, SummaryJSON: string(summaryJSON)}); err != nil {
return ReconcileResult{}, err
}
return ReconcileResult{BatchID: batchRow.ID, Status: status, MissingCount: missing, ExtraCount: extra, ProbeFailureCount: probeFailures, AccessStatus: accessStatus, Summary: summary}, nil
}
func (s *ReconcileService) rerunAccountProbes(ctx context.Context, items []sqlite.ImportBatchItem, expectedModel string) (int, error) {
if len(items) == 0 {
return 0, nil
}
failures := 0
for _, item := range items {
accountID, err := accountIDFromProbeSummary(item.ProbeSummaryJSON)
if err != nil {
return 0, fmt.Errorf("decode import batch item %d probe summary: %w", item.ID, err)
}
if strings.TrimSpace(accountID) == "" {
return 0, fmt.Errorf("import batch item %d missing account_id in probe summary", item.ID)
}
probe, err := s.host.TestAccount(ctx, accountID)
if err != nil {
return 0, fmt.Errorf("re-test account %s: %w", accountID, err)
}
models, err := s.host.GetAccountModels(ctx, accountID)
if err != nil {
return 0, fmt.Errorf("reload account models %s: %w", accountID, err)
}
smokeModelSeen := hasModel(models, expectedModel)
status := firstNonEmpty(probe.Status, "unknown")
payload, err := json.Marshal(map[string]any{
"account_id": accountID,
"probe_ok": probe.OK,
"probe_status": probe.Status,
"probe_message": probe.Message,
"models": models,
"smoke_model_seen": smokeModelSeen,
"reconcile_rerun": true,
})
if err != nil {
return 0, fmt.Errorf("marshal probe rerun summary for %s: %w", accountID, err)
}
if err := s.store.ImportBatchItems().UpdateResult(ctx, item.ID, status, string(payload)); err != nil {
return 0, err
}
if _, err := s.store.ProbeResults().Create(ctx, sqlite.ProbeResult{BatchItemID: item.ID, ProbeType: "account_smoke_rerun", Status: status, SummaryJSON: string(payload)}); err != nil {
return 0, err
}
if !probe.OK || !smokeModelSeen {
failures++
}
}
return failures, nil
}
func (s *ReconcileService) rerunAccessClosure(ctx context.Context, batchID int64, accessClosures []sqlite.AccessClosureRecord, probeAPIKey, expectedModel string) (string, bool, error) {
if len(accessClosures) == 0 {
return "not_configured", false, nil
}
latest := accessClosures[len(accessClosures)-1]
status := firstNonEmpty(latest.Status, deriveHealthyAccessStatus(latest.ClosureType))
if strings.TrimSpace(probeAPIKey) == "" {
return status, false, nil
}
result, err := s.host.CheckGatewayAccess(ctx, sub2api.GatewayAccessCheckRequest{APIKey: probeAPIKey, ExpectedModel: expectedModel})
if err != nil {
return "", false, fmt.Errorf("re-check gateway access: %w", err)
}
if result.OK && result.HasExpectedModel {
status = deriveHealthyAccessStatus(latest.ClosureType)
} else {
status = AccessStatusBroken
}
payload, err := json.Marshal(map[string]any{
"status_code": result.StatusCode,
"ok": result.OK,
"has_expected_model": result.HasExpectedModel,
"models": result.Models,
"reconcile_rerun": true,
})
if err != nil {
return "", false, fmt.Errorf("marshal access rerun summary: %w", err)
}
if _, err := s.store.AccessClosures().Create(ctx, sqlite.AccessClosureRecord{BatchID: batchID, ClosureType: latest.ClosureType, Status: status, DetailsJSON: string(payload)}); err != nil {
return "", false, err
}
return status, true, nil
}
func deriveHealthyAccessStatus(closureType string) string {
switch strings.TrimSpace(closureType) {
case AccessModeSubscription:
return AccessStatusSubscriptionReady
case AccessModeSelfService:
return AccessStatusSelfServiceReady
default:
return "unknown"
}
}
func accessClosureType(accessClosures []sqlite.AccessClosureRecord) string {
if len(accessClosures) == 0 {
return ""
}
return strings.TrimSpace(accessClosures[len(accessClosures)-1].ClosureType)
}
func buildManagedResourceListRequest(provider pack.ProviderManifest, accessMode string) sub2api.ListManagedResourcesRequest {
names := SuggestResourceNamesForMode(provider, accessMode)
req := sub2api.ListManagedResourcesRequest{
GroupName: names.Group,
ChannelName: names.Channel,
AccountNamePrefix: SuggestAccountNamePrefix(provider),
}
if strings.TrimSpace(accessMode) == AccessModeSubscription {
req.PlanName = names.Plan
}
return req
}
func accountIDFromProbeSummary(summaryJSON string) (string, error) {
if strings.TrimSpace(summaryJSON) == "" {
return "", nil
}
var payload map[string]any
if err := json.Unmarshal([]byte(summaryJSON), &payload); err != nil {
return "", err
}
accountID, _ := payload["account_id"].(string)
return strings.TrimSpace(accountID), nil
}
func diffManagedResources(stored []sqlite.ManagedResource, snapshot sub2api.ManagedResourceSnapshot) (int, int) {
live := map[string]map[string]struct{}{
"group": make(map[string]struct{}),
"channel": make(map[string]struct{}),
"plan": make(map[string]struct{}),
"account": make(map[string]struct{}),
}
for _, resource := range snapshot.Groups {
live["group"][strings.TrimSpace(resource.ID)] = struct{}{}
}
for _, resource := range snapshot.Channels {
live["channel"][strings.TrimSpace(resource.ID)] = struct{}{}
}
for _, resource := range snapshot.Plans {
live["plan"][strings.TrimSpace(resource.ID)] = struct{}{}
}
for _, resource := range snapshot.Accounts {
live["account"][strings.TrimSpace(resource.ID)] = struct{}{}
}
storedByType := map[string]map[string]struct{}{
"group": make(map[string]struct{}),
"channel": make(map[string]struct{}),
"plan": make(map[string]struct{}),
"account": make(map[string]struct{}),
}
for _, resource := range stored {
storedByType[strings.TrimSpace(resource.ResourceType)][strings.TrimSpace(resource.HostResourceID)] = struct{}{}
}
missing := 0
extra := 0
for resourceType, storedIDs := range storedByType {
for id := range storedIDs {
if _, ok := live[resourceType][id]; !ok {
missing++
}
}
for id := range live[resourceType] {
if _, ok := storedIDs[id]; !ok {
extra++
}
}
}
return missing, extra
}