feat(control-plane): harden host-scoped reconcile and acceptance evidence

- add batch-scoped reconcile_runs persistence and queries
- route batch detail and reconcile writes through batch_id/host_id
- refresh production boards with host-scope acceptance artifacts
- include latest real-host acceptance evidence for self_service and subscription
This commit is contained in:
phamnazage-jpg
2026-05-18 22:22:22 +08:00
parent 71cbaf5fa6
commit 85d495dd16
332 changed files with 5561 additions and 422 deletions

View File

@@ -12,6 +12,8 @@ type Host struct {
BaseURL string
HostVersion string
CapabilityProbeJSON string
AuthType string
AuthToken string
}
type HostsRepo struct {
@@ -28,7 +30,7 @@ func (r *HostsRepo) GetByID(ctx context.Context, id int64) (Host, error) {
}
var host Host
if err := r.db.QueryRowContext(ctx, `SELECT id, host_id, base_url, host_version, capability_probe_json FROM hosts WHERE id = ?`, id).Scan(&host.ID, &host.HostID, &host.BaseURL, &host.HostVersion, &host.CapabilityProbeJSON); err != nil {
if err := r.db.QueryRowContext(ctx, `SELECT id, host_id, base_url, host_version, capability_probe_json, auth_type, auth_token FROM hosts WHERE id = ?`, id).Scan(&host.ID, &host.HostID, &host.BaseURL, &host.HostVersion, &host.CapabilityProbeJSON, &host.AuthType, &host.AuthToken); err != nil {
return Host{}, err
}
return host, nil
@@ -41,7 +43,20 @@ func (r *HostsRepo) GetByHostID(ctx context.Context, hostID string) (Host, error
}
var host Host
if err := r.db.QueryRowContext(ctx, `SELECT id, host_id, base_url, host_version, capability_probe_json FROM hosts WHERE host_id = ?`, hostID).Scan(&host.ID, &host.HostID, &host.BaseURL, &host.HostVersion, &host.CapabilityProbeJSON); err != nil {
if err := r.db.QueryRowContext(ctx, `SELECT id, host_id, base_url, host_version, capability_probe_json, auth_type, auth_token FROM hosts WHERE host_id = ?`, hostID).Scan(&host.ID, &host.HostID, &host.BaseURL, &host.HostVersion, &host.CapabilityProbeJSON, &host.AuthType, &host.AuthToken); err != nil {
return Host{}, err
}
return host, nil
}
func (r *HostsRepo) GetByBaseURL(ctx context.Context, baseURL string) (Host, error) {
baseURL = strings.TrimSpace(baseURL)
if baseURL == "" {
return Host{}, fmt.Errorf("base_url is required")
}
var host Host
if err := r.db.QueryRowContext(ctx, `SELECT id, host_id, base_url, host_version, capability_probe_json, auth_type, auth_token FROM hosts WHERE base_url = ?`, baseURL).Scan(&host.ID, &host.HostID, &host.BaseURL, &host.HostVersion, &host.CapabilityProbeJSON, &host.AuthType, &host.AuthToken); err != nil {
return Host{}, err
}
return host, nil
@@ -52,6 +67,8 @@ func (r *HostsRepo) Create(ctx context.Context, host Host) (int64, error) {
baseURL := strings.TrimSpace(host.BaseURL)
hostVersion := strings.TrimSpace(host.HostVersion)
capabilityProbeJSON := strings.TrimSpace(host.CapabilityProbeJSON)
authType := firstNonEmptyTrimmed(host.AuthType, "apikey")
authToken := strings.TrimSpace(host.AuthToken)
switch {
case hostID == "":
@@ -66,12 +83,14 @@ func (r *HostsRepo) Create(ctx context.Context, host Host) (int64, error) {
result, err := r.db.ExecContext(
ctx,
`INSERT INTO hosts (host_id, base_url, host_version, capability_probe_json)
VALUES (?, ?, ?, ?)`,
`INSERT INTO hosts (host_id, base_url, host_version, capability_probe_json, auth_type, auth_token)
VALUES (?, ?, ?, ?, ?, ?)`,
hostID,
baseURL,
hostVersion,
capabilityProbeJSON,
authType,
authToken,
)
if err != nil {
return 0, fmt.Errorf("insert host %q: %w", hostID, err)
@@ -84,3 +103,115 @@ func (r *HostsRepo) Create(ctx context.Context, host Host) (int64, error) {
return id, nil
}
func (r *HostsRepo) UpdateConnectionByHostID(ctx context.Context, hostID, baseURL, hostVersion, capabilityProbeJSON, authType, authToken string) error {
hostID = strings.TrimSpace(hostID)
baseURL = strings.TrimSpace(baseURL)
hostVersion = strings.TrimSpace(hostVersion)
capabilityProbeJSON = strings.TrimSpace(capabilityProbeJSON)
authType = firstNonEmptyTrimmed(authType, "apikey")
authToken = strings.TrimSpace(authToken)
if hostID == "" {
return fmt.Errorf("host_id is required")
}
if baseURL == "" {
return fmt.Errorf("base_url is required")
}
if hostVersion == "" {
return fmt.Errorf("host_version is required")
}
if capabilityProbeJSON == "" {
capabilityProbeJSON = "{}"
}
result, err := r.db.ExecContext(ctx, `UPDATE hosts SET base_url = ?, host_version = ?, capability_probe_json = ?, auth_type = ?, auth_token = ? WHERE host_id = ?`, baseURL, hostVersion, capabilityProbeJSON, authType, authToken, hostID)
if err != nil {
return fmt.Errorf("update host %q connection: %w", hostID, err)
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
return fmt.Errorf("host %q not found", hostID)
}
return nil
}
func (r *HostsRepo) ListAll(ctx context.Context) ([]Host, error) {
rows, err := r.db.QueryContext(ctx, `SELECT id, host_id, base_url, host_version, capability_probe_json, auth_type, auth_token FROM hosts ORDER BY id`)
if err != nil {
return nil, fmt.Errorf("list hosts: %w", err)
}
defer rows.Close()
var hosts []Host
for rows.Next() {
var host Host
if err := rows.Scan(&host.ID, &host.HostID, &host.BaseURL, &host.HostVersion, &host.CapabilityProbeJSON, &host.AuthType, &host.AuthToken); err != nil {
return nil, fmt.Errorf("scan host: %w", err)
}
hosts = append(hosts, host)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate hosts: %w", err)
}
return hosts, nil
}
func (r *HostsRepo) DeleteByHostID(ctx context.Context, hostID string) error {
hostID = strings.TrimSpace(hostID)
if hostID == "" {
return fmt.Errorf("host_id is required")
}
result, err := r.db.ExecContext(ctx, `DELETE FROM hosts WHERE host_id = ?`, hostID)
if err != nil {
return fmt.Errorf("delete host %q: %w", hostID, err)
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
return fmt.Errorf("host %q not found", hostID)
}
return nil
}
func (r *HostsRepo) UpdateProbeByHostID(ctx context.Context, hostID, hostVersion, capabilityProbeJSON string) error {
hostID = strings.TrimSpace(hostID)
hostVersion = strings.TrimSpace(hostVersion)
capabilityProbeJSON = strings.TrimSpace(capabilityProbeJSON)
if hostID == "" {
return fmt.Errorf("host_id is required")
}
if hostVersion == "" {
return fmt.Errorf("host_version is required")
}
if capabilityProbeJSON == "" {
capabilityProbeJSON = "{}"
}
result, err := r.db.ExecContext(ctx, `UPDATE hosts SET host_version = ?, capability_probe_json = ? WHERE host_id = ?`, hostVersion, capabilityProbeJSON, hostID)
if err != nil {
return fmt.Errorf("update host %q probe: %w", hostID, err)
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
return fmt.Errorf("host %q not found", hostID)
}
return nil
}
func firstNonEmptyTrimmed(values ...string) string {
for _, value := range values {
if trimmed := strings.TrimSpace(value); trimmed != "" {
return trimmed
}
}
return ""
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"path/filepath"
"testing"
)
@@ -56,6 +57,17 @@ func createTestHost(t *testing.T, store *DB) int64 {
return id
}
func createTestHostWithBaseURL(t *testing.T, store *DB, hostID, baseURL string) int64 {
t.Helper()
id, err := store.Hosts().Create(context.Background(), Host{
HostID: hostID, BaseURL: baseURL, HostVersion: "0.1.0",
})
if err != nil {
t.Fatalf("createTestHostWithBaseURL error = %v", err)
}
return id
}
func createTestBatch(t *testing.T, store *DB) int64 {
t.Helper()
hostID := createTestHost(t, store)
@@ -197,3 +209,85 @@ func TestHostsRepoGetByHostIDNotFound(t *testing.T) {
t.Fatalf("GetByHostID('nonexistent') error = %v, want sql.ErrNoRows", err)
}
}
func TestHostsRepoListAll(t *testing.T) {
store := openTestDB(t)
hosts, err := store.Hosts().ListAll(context.Background())
if err != nil {
t.Fatalf("ListAll() on empty DB error = %v", err)
}
if len(hosts) != 0 {
t.Fatalf("ListAll() len = %d, want 0", len(hosts))
}
for i := 0; i < 2; i++ {
_, err := store.Hosts().Create(context.Background(), Host{
HostID: fmt.Sprintf("host-listall-%d", i), BaseURL: "https://h.com", HostVersion: "0.1.0",
})
if err != nil {
t.Fatalf("Create() error = %v", err)
}
}
hosts, err = store.Hosts().ListAll(context.Background())
if err != nil {
t.Fatalf("ListAll() error = %v", err)
}
if len(hosts) != 2 {
t.Fatalf("ListAll() len = %d, want 2", len(hosts))
}
}
func TestHostsRepoDeleteByHostID(t *testing.T) {
store := openTestDB(t)
createTestHost(t, store)
if err := store.Hosts().DeleteByHostID(context.Background(), "host-"+sanitizeTestName(t.Name())); err != nil {
t.Fatalf("DeleteByHostID() error = %v", err)
}
hosts, err := store.Hosts().ListAll(context.Background())
if err != nil {
t.Fatalf("ListAll() error = %v", err)
}
if len(hosts) != 0 {
t.Fatalf("ListAll() after delete len = %d, want 0", len(hosts))
}
}
func TestHostsRepoUpdateProbeByHostID(t *testing.T) {
store := openTestDB(t)
createTestHost(t, store)
if err := store.Hosts().UpdateProbeByHostID(context.Background(), "host-"+sanitizeTestName(t.Name()), "0.2.0", `{"groups":true}`); err != nil {
t.Fatalf("UpdateProbeByHostID() error = %v", err)
}
host, err := store.Hosts().GetByHostID(context.Background(), "host-"+sanitizeTestName(t.Name()))
if err != nil {
t.Fatalf("GetByHostID() error = %v", err)
}
if host.HostVersion != "0.2.0" || host.CapabilityProbeJSON != `{"groups":true}` {
t.Fatalf("updated host = %+v, want version/capability update", host)
}
}
func TestHostsRepoDeleteByHostIDNotFound(t *testing.T) {
store := openTestDB(t)
err := store.Hosts().DeleteByHostID(context.Background(), "nonexistent")
if err == nil {
t.Fatal("DeleteByHostID('nonexistent') error = nil, want error")
}
if err.Error() != `host "nonexistent" not found` {
t.Fatalf("DeleteByHostID() error = %q, want not found error", err)
}
}
func TestHostsRepoDeleteByHostIDEmptyError(t *testing.T) {
store := openTestDB(t)
err := store.Hosts().DeleteByHostID(context.Background(), "")
if err == nil {
t.Fatal("DeleteByHostID('') error = nil, want error")
}
}

View File

@@ -114,6 +114,74 @@ func (r *ImportBatchesRepo) GetLatestByProviderID(ctx context.Context, providerI
return batch, nil
}
func (r *ImportBatchesRepo) GetLatestByProviderIDAndHostID(ctx context.Context, providerID, hostID int64) (ImportBatch, error) {
if providerID <= 0 {
return ImportBatch{}, fmt.Errorf("provider_id is required")
}
if hostID <= 0 {
return ImportBatch{}, fmt.Errorf("host_id is required")
}
var batch ImportBatch
if err := r.db.QueryRowContext(ctx, `SELECT id, host_id, pack_id, provider_id, mode, batch_status, access_status FROM import_batches WHERE provider_id = ? AND host_id = ? ORDER BY id DESC LIMIT 1`, providerID, hostID).Scan(&batch.ID, &batch.HostID, &batch.PackID, &batch.ProviderID, &batch.Mode, &batch.BatchStatus, &batch.AccessStatus); err != nil {
return ImportBatch{}, err
}
return batch, nil
}
func (r *ImportBatchesRepo) ListByProviderID(ctx context.Context, providerID int64) ([]ImportBatch, error) {
if providerID <= 0 {
return nil, fmt.Errorf("provider_id is required")
}
rows, err := r.db.QueryContext(ctx, `SELECT id, host_id, pack_id, provider_id, mode, batch_status, access_status FROM import_batches WHERE provider_id = ? ORDER BY id DESC`, providerID)
if err != nil {
return nil, fmt.Errorf("query import batches by provider_id %d: %w", providerID, err)
}
defer rows.Close()
batches := make([]ImportBatch, 0)
for rows.Next() {
var batch ImportBatch
if err := rows.Scan(&batch.ID, &batch.HostID, &batch.PackID, &batch.ProviderID, &batch.Mode, &batch.BatchStatus, &batch.AccessStatus); err != nil {
return nil, fmt.Errorf("scan import batch by provider_id %d: %w", providerID, err)
}
batches = append(batches, batch)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate import batches by provider_id %d: %w", providerID, err)
}
return batches, nil
}
func (r *ImportBatchesRepo) ListByProviderIDAndHostID(ctx context.Context, providerID, hostID int64) ([]ImportBatch, error) {
if providerID <= 0 {
return nil, fmt.Errorf("provider_id is required")
}
if hostID <= 0 {
return nil, fmt.Errorf("host_id is required")
}
rows, err := r.db.QueryContext(ctx, `SELECT id, host_id, pack_id, provider_id, mode, batch_status, access_status FROM import_batches WHERE provider_id = ? AND host_id = ? ORDER BY id DESC`, providerID, hostID)
if err != nil {
return nil, fmt.Errorf("query import batches by provider_id %d and host_id %d: %w", providerID, hostID, err)
}
defer rows.Close()
batches := make([]ImportBatch, 0)
for rows.Next() {
var batch ImportBatch
if err := rows.Scan(&batch.ID, &batch.HostID, &batch.PackID, &batch.ProviderID, &batch.Mode, &batch.BatchStatus, &batch.AccessStatus); err != nil {
return nil, fmt.Errorf("scan import batch by provider_id %d and host_id %d: %w", providerID, hostID, err)
}
batches = append(batches, batch)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate import batches by provider_id %d and host_id %d: %w", providerID, hostID, err)
}
return batches, nil
}
func (r *ImportBatchItemsRepo) GetByBatchID(ctx context.Context, batchID int64) ([]ImportBatchItem, error) {
if batchID <= 0 {
return nil, fmt.Errorf("batch_id is required")

View File

@@ -69,6 +69,39 @@ func TestImportBatchesRepoGetLatestByProviderID(t *testing.T) {
}
}
func TestImportBatchesRepoListByProviderID(t *testing.T) {
store := openTestDB(t)
hostID := createTestHost(t, store)
packID := createTestPack(t, store)
providerID := createTestProviderWithPack(t, store, packID)
olderID, err := store.ImportBatches().Create(context.Background(), ImportBatch{
HostID: hostID, PackID: packID, ProviderID: providerID,
Mode: "partial", BatchStatus: "running", AccessStatus: "pending",
})
if err != nil {
t.Fatalf("Create older batch error = %v", err)
}
newerID, err := store.ImportBatches().Create(context.Background(), ImportBatch{
HostID: hostID, PackID: packID, ProviderID: providerID,
Mode: "strict", BatchStatus: "succeeded", AccessStatus: "subscription_ready",
})
if err != nil {
t.Fatalf("Create newer batch error = %v", err)
}
batches, err := store.ImportBatches().ListByProviderID(context.Background(), providerID)
if err != nil {
t.Fatalf("ListByProviderID() error = %v", err)
}
if len(batches) != 2 {
t.Fatalf("ListByProviderID() len = %d, want 2", len(batches))
}
if batches[0].ID != newerID || batches[1].ID != olderID {
t.Fatalf("ListByProviderID() ids = [%d %d], want [%d %d]", batches[0].ID, batches[1].ID, newerID, olderID)
}
}
func TestImportBatchesRepoGetByIDNotFound(t *testing.T) {
store := openTestDB(t)
_, err := store.ImportBatches().GetByID(context.Background(), 999)
@@ -116,9 +149,9 @@ func TestImportBatchItemsRepoCreateAndGet(t *testing.T) {
batchID := createTestBatch(t, store)
id, err := store.ImportBatchItems().Create(context.Background(), ImportBatchItem{
BatchID: batchID,
KeyFingerprint: "sha256:abc",
AccountStatus: "passed",
BatchID: batchID,
KeyFingerprint: "sha256:abc",
AccountStatus: "passed",
ProbeSummaryJSON: `{"ok":true}`,
})
if err != nil {
@@ -190,9 +223,10 @@ func TestImportBatchItemsRepoValidation(t *testing.T) {
func TestManagedResourcesRepoCreateAndGet(t *testing.T) {
store := openTestDB(t)
batchID := createTestBatch(t, store)
batch, _ := store.ImportBatches().GetByID(context.Background(), batchID)
id, err := store.ManagedResources().Create(context.Background(), ManagedResource{
BatchID: batchID, ResourceType: "group", HostResourceID: "g_01", ResourceName: "test-group",
BatchID: batchID, HostID: batch.HostID, ResourceType: "group", HostResourceID: "g_01", ResourceName: "test-group",
})
if err != nil {
t.Fatalf("Create() error = %v", err)
@@ -207,11 +241,12 @@ func TestManagedResourcesRepoCreateAndGet(t *testing.T) {
func TestManagedResourcesRepoMultipleResources(t *testing.T) {
store := openTestDB(t)
batchID := createTestBatch(t, store)
batch, _ := store.ImportBatches().GetByID(context.Background(), batchID)
for _, r := range []ManagedResource{
{BatchID: batchID, ResourceType: "group", HostResourceID: "g_01", ResourceName: "group-1"},
{BatchID: batchID, ResourceType: "channel", HostResourceID: "c_01", ResourceName: "channel-1"},
{BatchID: batchID, ResourceType: "account", HostResourceID: "a_01", ResourceName: "account-1"},
{BatchID: batchID, HostID: batch.HostID, ResourceType: "group", HostResourceID: "g_01", ResourceName: "group-1"},
{BatchID: batchID, HostID: batch.HostID, ResourceType: "channel", HostResourceID: "c_01", ResourceName: "channel-1"},
{BatchID: batchID, HostID: batch.HostID, ResourceType: "account", HostResourceID: "a_01", ResourceName: "account-1"},
} {
store.ManagedResources().Create(context.Background(), r)
}
@@ -236,10 +271,11 @@ func TestManagedResourcesRepoValidationErrors(t *testing.T) {
name string
r ManagedResource
}{
{"batch_id zero", ManagedResource{ResourceType: "g", HostResourceID: "h", ResourceName: "n"}},
{"empty resource_type", ManagedResource{BatchID: 1, HostResourceID: "h", ResourceName: "n"}},
{"empty host_resource_id", ManagedResource{BatchID: 1, ResourceType: "g", ResourceName: "n"}},
{"empty resource_name", ManagedResource{BatchID: 1, ResourceType: "g", HostResourceID: "h"}},
{"batch_id zero", ManagedResource{HostID: 1, ResourceType: "g", HostResourceID: "h", ResourceName: "n"}},
{"host_id zero", ManagedResource{BatchID: 1, ResourceType: "g", HostResourceID: "h", ResourceName: "n"}},
{"empty resource_type", ManagedResource{BatchID: 1, HostID: 1, HostResourceID: "h", ResourceName: "n"}},
{"empty host_resource_id", ManagedResource{BatchID: 1, HostID: 1, ResourceType: "g", ResourceName: "n"}},
{"empty resource_name", ManagedResource{BatchID: 1, HostID: 1, ResourceType: "g", HostResourceID: "h"}},
} {
t.Run(tt.name, func(t *testing.T) {
_, err := store.ManagedResources().Create(context.Background(), tt.r)
@@ -383,15 +419,19 @@ func createTestProvider(t *testing.T, store *DB) int64 {
func TestReconcileRunsRepoCreateAndGet(t *testing.T) {
store := openTestDB(t)
providerID := createTestProvider(t, store)
batchID := createTestBatch(t, store)
batch, err := store.ImportBatches().GetByID(context.Background(), batchID)
if err != nil {
t.Fatalf("ImportBatches().GetByID() error = %v", err)
}
id, err := store.ReconcileRuns().Create(context.Background(), ReconcileRun{
ProviderID: providerID, Status: "active", SummaryJSON: `{"drifted":false}`,
BatchID: batchID, HostID: batch.HostID, ProviderID: batch.ProviderID, Status: "active", SummaryJSON: `{"drifted":false}`,
})
if err != nil {
t.Fatalf("Create() error = %v", err)
}
runs, _ := store.ReconcileRuns().GetByProviderID(context.Background(), providerID)
runs, _ := store.ReconcileRuns().GetByBatchID(context.Background(), batchID)
if len(runs) != 1 || runs[0].Status != "active" {
t.Fatalf("runs = %+v, want 1 active", runs)
}
@@ -400,19 +440,62 @@ func TestReconcileRunsRepoCreateAndGet(t *testing.T) {
func TestReconcileRunsRepoMultipleRunsOrderedDesc(t *testing.T) {
store := openTestDB(t)
providerID := createTestProvider(t, store)
batchID := createTestBatch(t, store)
batch, err := store.ImportBatches().GetByID(context.Background(), batchID)
if err != nil {
t.Fatalf("ImportBatches().GetByID() error = %v", err)
}
id1, _ := store.ReconcileRuns().Create(context.Background(), ReconcileRun{ProviderID: providerID, Status: "first", SummaryJSON: "{}"})
id2, _ := store.ReconcileRuns().Create(context.Background(), ReconcileRun{ProviderID: providerID, Status: "second", SummaryJSON: "{}"})
runs, _ := store.ReconcileRuns().GetByProviderID(context.Background(), providerID)
id1, _ := store.ReconcileRuns().Create(context.Background(), ReconcileRun{BatchID: batchID, HostID: batch.HostID, ProviderID: batch.ProviderID, Status: "first", SummaryJSON: "{}"})
id2, _ := store.ReconcileRuns().Create(context.Background(), ReconcileRun{BatchID: batchID, HostID: batch.HostID, ProviderID: batch.ProviderID, Status: "second", SummaryJSON: "{}"})
runs, _ := store.ReconcileRuns().GetByBatchID(context.Background(), batchID)
if len(runs) != 2 || runs[0].ID != id2 || runs[1].ID != id1 {
t.Fatalf("order: got %d, %d; want %d, %d (DESC)", runs[0].ID, runs[1].ID, id2, id1)
}
}
func TestReconcileRunsRepoGetByProviderIDEmpty(t *testing.T) {
func TestReconcileRunsRepoSeparatesHosts(t *testing.T) {
store := openTestDB(t)
runs, _ := store.ReconcileRuns().GetByProviderID(context.Background(), 999)
hostA := createTestHost(t, store)
hostB := createTestHostWithBaseURL(t, store, "host-b", "https://host-b.example.com")
packID := createTestPack(t, store)
providerID := createTestProviderWithPack(t, store, packID)
batchA, err := store.ImportBatches().Create(context.Background(), ImportBatch{HostID: hostA, PackID: packID, ProviderID: providerID, Mode: "partial", BatchStatus: "running", AccessStatus: "pending"})
if err != nil {
t.Fatalf("ImportBatches().Create(hostA) error = %v", err)
}
batchB, err := store.ImportBatches().Create(context.Background(), ImportBatch{HostID: hostB, PackID: packID, ProviderID: providerID, Mode: "partial", BatchStatus: "running", AccessStatus: "pending"})
if err != nil {
t.Fatalf("ImportBatches().Create(hostB) error = %v", err)
}
if _, err := store.ReconcileRuns().Create(context.Background(), ReconcileRun{BatchID: batchA, HostID: hostA, ProviderID: providerID, Status: "drifted", SummaryJSON: `{"host":"a"}`}); err != nil {
t.Fatalf("Create(hostA) error = %v", err)
}
if _, err := store.ReconcileRuns().Create(context.Background(), ReconcileRun{BatchID: batchB, HostID: hostB, ProviderID: providerID, Status: "active", SummaryJSON: `{"host":"b"}`}); err != nil {
t.Fatalf("Create(hostB) error = %v", err)
}
runs, _ := store.ReconcileRuns().GetByProviderIDAndHostID(context.Background(), providerID, hostA)
if len(runs) != 1 {
t.Fatalf("len(runs) = %d, want 1", len(runs))
}
if runs[0].Status != "drifted" {
t.Fatalf("runs[0].Status = %q, want drifted", runs[0].Status)
}
}
func TestReconcileRunsRepoGetByProviderIDAndHostIDEmpty(t *testing.T) {
store := openTestDB(t)
runs, _ := store.ReconcileRuns().GetByProviderIDAndHostID(context.Background(), 999, 1)
if len(runs) != 0 {
t.Fatalf("count = %d, want 0", len(runs))
}
}
func TestReconcileRunsRepoGetByBatchIDEmpty(t *testing.T) {
store := openTestDB(t)
runs, _ := store.ReconcileRuns().GetByBatchID(context.Background(), 999)
if len(runs) != 0 {
t.Fatalf("count = %d, want 0", len(runs))
}
@@ -420,10 +503,9 @@ func TestReconcileRunsRepoGetByProviderIDEmpty(t *testing.T) {
func TestReconcileRunsRepoValidation(t *testing.T) {
store := openTestDB(t)
_, err := store.ReconcileRuns().Create(context.Background(), ReconcileRun{ProviderID: 0, Status: "s"})
hostID := createTestHost(t, store)
_, err := store.ReconcileRuns().Create(context.Background(), ReconcileRun{BatchID: 1, HostID: hostID, ProviderID: 0, Status: "s"})
if err == nil {
t.Fatal("Create provider_id=0 error = nil")
}
}

View File

@@ -9,6 +9,7 @@ import (
type ManagedResource struct {
ID int64
BatchID int64
HostID int64
ResourceType string
HostResourceID string
ResourceName string
@@ -30,6 +31,8 @@ func (r *ManagedResourcesRepo) Create(ctx context.Context, resource ManagedResou
switch {
case resource.BatchID <= 0:
return 0, fmt.Errorf("batch_id is required")
case resource.HostID <= 0:
return 0, fmt.Errorf("host_id is required")
case resourceType == "":
return 0, fmt.Errorf("resource_type is required")
case hostResourceID == "":
@@ -38,7 +41,7 @@ func (r *ManagedResourcesRepo) Create(ctx context.Context, resource ManagedResou
return 0, fmt.Errorf("resource_name is required")
}
result, err := r.db.ExecContext(ctx, `INSERT INTO managed_resources (batch_id, resource_type, host_resource_id, resource_name) VALUES (?, ?, ?, ?)`, resource.BatchID, resourceType, hostResourceID, resourceName)
result, err := r.db.ExecContext(ctx, `INSERT INTO managed_resources (batch_id, host_id, resource_type, host_resource_id, resource_name) VALUES (?, ?, ?, ?, ?)`, resource.BatchID, resource.HostID, resourceType, hostResourceID, resourceName)
if err != nil {
return 0, fmt.Errorf("insert managed resource %q: %w", hostResourceID, err)
}
@@ -50,12 +53,32 @@ func (r *ManagedResourcesRepo) Create(ctx context.Context, resource ManagedResou
return id, nil
}
func (r *ManagedResourcesRepo) GetByResourceIdentity(ctx context.Context, hostID int64, resourceType, hostResourceID string) (ManagedResource, error) {
resourceType = strings.TrimSpace(resourceType)
hostResourceID = strings.TrimSpace(hostResourceID)
if hostID <= 0 {
return ManagedResource{}, fmt.Errorf("host_id is required")
}
if resourceType == "" {
return ManagedResource{}, fmt.Errorf("resource_type is required")
}
if hostResourceID == "" {
return ManagedResource{}, fmt.Errorf("host_resource_id is required")
}
var resource ManagedResource
if err := r.db.QueryRowContext(ctx, `SELECT id, batch_id, host_id, resource_type, host_resource_id, resource_name FROM managed_resources WHERE host_id = ? AND resource_type = ? AND host_resource_id = ?`, hostID, resourceType, hostResourceID).Scan(&resource.ID, &resource.BatchID, &resource.HostID, &resource.ResourceType, &resource.HostResourceID, &resource.ResourceName); err != nil {
return ManagedResource{}, err
}
return resource, nil
}
func (r *ManagedResourcesRepo) GetByBatchID(ctx context.Context, batchID int64) ([]ManagedResource, error) {
if batchID <= 0 {
return nil, fmt.Errorf("batch_id is required")
}
rows, err := r.db.QueryContext(ctx, `SELECT id, batch_id, resource_type, host_resource_id, resource_name FROM managed_resources WHERE batch_id = ? ORDER BY id`, batchID)
rows, err := r.db.QueryContext(ctx, `SELECT id, batch_id, host_id, resource_type, host_resource_id, resource_name FROM managed_resources WHERE batch_id = ? ORDER BY id`, batchID)
if err != nil {
return nil, fmt.Errorf("query managed resources: %w", err)
}
@@ -64,7 +87,7 @@ func (r *ManagedResourcesRepo) GetByBatchID(ctx context.Context, batchID int64)
resources := make([]ManagedResource, 0)
for rows.Next() {
var resource ManagedResource
if err := rows.Scan(&resource.ID, &resource.BatchID, &resource.ResourceType, &resource.HostResourceID, &resource.ResourceName); err != nil {
if err := rows.Scan(&resource.ID, &resource.BatchID, &resource.HostID, &resource.ResourceType, &resource.HostResourceID, &resource.ResourceName); err != nil {
return nil, fmt.Errorf("scan managed resource: %w", err)
}
resources = append(resources, resource)
@@ -74,3 +97,64 @@ func (r *ManagedResourcesRepo) GetByBatchID(ctx context.Context, batchID int64)
}
return resources, nil
}
func (r *ManagedResourcesRepo) ListByProviderID(ctx context.Context, providerID int64) ([]ManagedResource, error) {
if providerID <= 0 {
return nil, fmt.Errorf("provider_id is required")
}
rows, err := r.db.QueryContext(ctx, `SELECT mr.id, mr.batch_id, mr.host_id, mr.resource_type, mr.host_resource_id, mr.resource_name
FROM managed_resources mr
JOIN import_batches ib ON ib.id = mr.batch_id
WHERE ib.provider_id = ?
ORDER BY mr.id`, providerID)
if err != nil {
return nil, fmt.Errorf("query managed resources by provider_id %d: %w", providerID, err)
}
defer rows.Close()
resources := make([]ManagedResource, 0)
for rows.Next() {
var resource ManagedResource
if err := rows.Scan(&resource.ID, &resource.BatchID, &resource.HostID, &resource.ResourceType, &resource.HostResourceID, &resource.ResourceName); err != nil {
return nil, fmt.Errorf("scan managed resource by provider_id %d: %w", providerID, err)
}
resources = append(resources, resource)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate managed resources by provider_id %d: %w", providerID, err)
}
return resources, nil
}
func (r *ManagedResourcesRepo) ListByProviderIDAndHostID(ctx context.Context, providerID, hostID int64) ([]ManagedResource, error) {
if providerID <= 0 {
return nil, fmt.Errorf("provider_id is required")
}
if hostID <= 0 {
return nil, fmt.Errorf("host_id is required")
}
rows, err := r.db.QueryContext(ctx, `SELECT mr.id, mr.batch_id, mr.host_id, mr.resource_type, mr.host_resource_id, mr.resource_name
FROM managed_resources mr
JOIN import_batches ib ON ib.id = mr.batch_id
WHERE ib.provider_id = ? AND mr.host_id = ?
ORDER BY mr.id`, providerID, hostID)
if err != nil {
return nil, fmt.Errorf("query managed resources by provider_id %d and host_id %d: %w", providerID, hostID, err)
}
defer rows.Close()
resources := make([]ManagedResource, 0)
for rows.Next() {
var resource ManagedResource
if err := rows.Scan(&resource.ID, &resource.BatchID, &resource.HostID, &resource.ResourceType, &resource.HostResourceID, &resource.ResourceName); err != nil {
return nil, fmt.Errorf("scan managed resource by provider_id %d and host_id %d: %w", providerID, hostID, err)
}
resources = append(resources, resource)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate managed resources by provider_id %d and host_id %d: %w", providerID, hostID, err)
}
return resources, nil
}

View File

@@ -113,6 +113,37 @@ func (r *PacksRepo) Create(ctx context.Context, pack Pack) (int64, error) {
return id, nil
}
func (r *PacksRepo) ListAll(ctx context.Context) ([]Pack, error) {
rows, err := r.db.QueryContext(ctx, `SELECT id, pack_id, version, checksum, vendor, target_host, min_host_version, max_host_version, manifest_json FROM packs ORDER BY id`)
if err != nil {
return nil, fmt.Errorf("list packs: %w", err)
}
defer rows.Close()
var packs []Pack
for rows.Next() {
var pack Pack
if err := rows.Scan(
&pack.ID,
&pack.PackID,
&pack.Version,
&pack.Checksum,
&pack.Vendor,
&pack.TargetHost,
&pack.MinHostVersion,
&pack.MaxHostVersion,
&pack.ManifestJSON,
); err != nil {
return nil, fmt.Errorf("scan pack: %w", err)
}
packs = append(packs, pack)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate packs: %w", err)
}
return packs, nil
}
func (r *PacksRepo) Upsert(ctx context.Context, pack Pack) (int64, error) {
packID := strings.TrimSpace(pack.PackID)
version := strings.TrimSpace(pack.Version)

View File

@@ -11,14 +11,14 @@ func TestPacksRepoCreateAndGet(t *testing.T) {
store := openTestDB(t)
id, err := store.Packs().Create(context.Background(), Pack{
PackID: "test-pack",
Version: "1.0.0",
Checksum: "abc123",
Vendor: "test-vendor",
TargetHost: "sub2api",
PackID: "test-pack",
Version: "1.0.0",
Checksum: "abc123",
Vendor: "test-vendor",
TargetHost: "sub2api",
MinHostVersion: "0.1.0",
MaxHostVersion: "0.2.x",
ManifestJSON: `{"name":"test"}`,
ManifestJSON: `{"name":"test"}`,
})
if err != nil {
t.Fatalf("Create() error = %v", err)

View File

@@ -31,6 +31,46 @@ func newProvidersRepo(db execQuerier) *ProvidersRepo {
return &ProvidersRepo{db: db}
}
func (r *ProvidersRepo) ListByPackID(ctx context.Context, packID int64) ([]Provider, error) {
if packID <= 0 {
return nil, fmt.Errorf("pack_id is required")
}
rows, err := r.db.QueryContext(ctx, `SELECT id, pack_id, provider_id, display_name, base_url, platform, account_type, default_models_json, smoke_test_model, group_template_json, channel_template_json, plan_template_json, import_options_json, manifest_json FROM providers WHERE pack_id = ? ORDER BY id`, packID)
if err != nil {
return nil, fmt.Errorf("query providers by pack_id %d: %w", packID, err)
}
defer rows.Close()
providers := make([]Provider, 0)
for rows.Next() {
var provider Provider
if err := rows.Scan(
&provider.ID,
&provider.PackID,
&provider.ProviderID,
&provider.DisplayName,
&provider.BaseURL,
&provider.Platform,
&provider.AccountType,
&provider.DefaultModelsJSON,
&provider.SmokeTestModel,
&provider.GroupTemplateJSON,
&provider.ChannelTemplateJSON,
&provider.PlanTemplateJSON,
&provider.ImportOptionsJSON,
&provider.ManifestJSON,
); err != nil {
return nil, fmt.Errorf("scan provider: %w", err)
}
providers = append(providers, provider)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate providers: %w", err)
}
return providers, nil
}
func (r *ProvidersRepo) ListByProviderID(ctx context.Context, providerID string) ([]Provider, error) {
providerID = strings.TrimSpace(providerID)
if providerID == "" {
@@ -103,6 +143,33 @@ func (r *ProvidersRepo) GetByPackIDAndProviderID(ctx context.Context, packID int
return provider, nil
}
func (r *ProvidersRepo) GetByID(ctx context.Context, id int64) (Provider, error) {
if id <= 0 {
return Provider{}, fmt.Errorf("id is required")
}
var provider Provider
if err := r.db.QueryRowContext(ctx, `SELECT id, pack_id, provider_id, display_name, base_url, platform, account_type, default_models_json, smoke_test_model, group_template_json, channel_template_json, plan_template_json, import_options_json, manifest_json FROM providers WHERE id = ?`, id).Scan(
&provider.ID,
&provider.PackID,
&provider.ProviderID,
&provider.DisplayName,
&provider.BaseURL,
&provider.Platform,
&provider.AccountType,
&provider.DefaultModelsJSON,
&provider.SmokeTestModel,
&provider.GroupTemplateJSON,
&provider.ChannelTemplateJSON,
&provider.PlanTemplateJSON,
&provider.ImportOptionsJSON,
&provider.ManifestJSON,
); err != nil {
return Provider{}, err
}
return provider, nil
}
func (r *ProvidersRepo) Create(ctx context.Context, provider Provider) (int64, error) {
providerID := strings.TrimSpace(provider.ProviderID)
displayName := strings.TrimSpace(provider.DisplayName)

View File

@@ -12,14 +12,14 @@ func TestProvidersRepoCreateAndGet(t *testing.T) {
packID := createTestPack(t, store)
providerID, err := store.Providers().Create(context.Background(), Provider{
PackID: packID,
ProviderID: "deepseek",
DisplayName: "DeepSeek",
BaseURL: "https://api.deepseek.com",
Platform: "openai",
AccountType: "api",
PackID: packID,
ProviderID: "deepseek",
DisplayName: "DeepSeek",
BaseURL: "https://api.deepseek.com",
Platform: "openai",
AccountType: "apikey",
SmokeTestModel: "deepseek-chat",
ManifestJSON: `{"models":["deepseek-chat"]}`,
ManifestJSON: `{"models":["deepseek-chat"]}`,
})
if err != nil {
t.Fatalf("Create() error = %v", err)

View File

@@ -8,6 +8,8 @@ import (
type ReconcileRun struct {
ID int64
BatchID int64
HostID int64
ProviderID int64
Status string
SummaryJSON string
@@ -29,13 +31,17 @@ func (r *ReconcileRunsRepo) Create(ctx context.Context, run ReconcileRun) (int64
}
switch {
case run.BatchID <= 0:
return 0, fmt.Errorf("batch_id is required")
case run.HostID <= 0:
return 0, fmt.Errorf("host_id is required")
case run.ProviderID <= 0:
return 0, fmt.Errorf("provider_id is required")
case status == "":
return 0, fmt.Errorf("status is required")
}
result, err := r.db.ExecContext(ctx, `INSERT INTO reconcile_runs (provider_id, status, summary_json) VALUES (?, ?, ?)`, run.ProviderID, status, summaryJSON)
result, err := r.db.ExecContext(ctx, `INSERT INTO reconcile_runs (batch_id, host_id, provider_id, status, summary_json) VALUES (?, ?, ?, ?, ?)`, run.BatchID, run.HostID, run.ProviderID, status, summaryJSON)
if err != nil {
return 0, fmt.Errorf("insert reconcile run: %w", err)
}
@@ -47,12 +53,40 @@ func (r *ReconcileRunsRepo) Create(ctx context.Context, run ReconcileRun) (int64
return id, nil
}
func (r *ReconcileRunsRepo) GetByProviderID(ctx context.Context, providerID int64) ([]ReconcileRun, error) {
func (r *ReconcileRunsRepo) GetByBatchID(ctx context.Context, batchID int64) ([]ReconcileRun, error) {
if batchID <= 0 {
return nil, fmt.Errorf("batch_id is required")
}
rows, err := r.db.QueryContext(ctx, `SELECT id, batch_id, host_id, provider_id, status, summary_json FROM reconcile_runs WHERE batch_id = ? ORDER BY id DESC`, batchID)
if err != nil {
return nil, fmt.Errorf("query reconcile runs by batch_id: %w", err)
}
defer rows.Close()
runs := make([]ReconcileRun, 0)
for rows.Next() {
var run ReconcileRun
if err := rows.Scan(&run.ID, &run.BatchID, &run.HostID, &run.ProviderID, &run.Status, &run.SummaryJSON); err != nil {
return nil, fmt.Errorf("scan reconcile run by batch_id: %w", err)
}
runs = append(runs, run)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate reconcile runs by batch_id: %w", err)
}
return runs, nil
}
func (r *ReconcileRunsRepo) GetByProviderIDAndHostID(ctx context.Context, providerID, hostID int64) ([]ReconcileRun, error) {
if providerID <= 0 {
return nil, fmt.Errorf("provider_id is required")
}
if hostID <= 0 {
return nil, fmt.Errorf("host_id is required")
}
rows, err := r.db.QueryContext(ctx, `SELECT id, provider_id, status, summary_json FROM reconcile_runs WHERE provider_id = ? ORDER BY id DESC`, providerID)
rows, err := r.db.QueryContext(ctx, `SELECT id, batch_id, host_id, provider_id, status, summary_json FROM reconcile_runs WHERE provider_id = ? AND host_id = ? ORDER BY id DESC`, providerID, hostID)
if err != nil {
return nil, fmt.Errorf("query reconcile runs: %w", err)
}
@@ -61,7 +95,7 @@ func (r *ReconcileRunsRepo) GetByProviderID(ctx context.Context, providerID int6
runs := make([]ReconcileRun, 0)
for rows.Next() {
var run ReconcileRun
if err := rows.Scan(&run.ID, &run.ProviderID, &run.Status, &run.SummaryJSON); err != nil {
if err := rows.Scan(&run.ID, &run.BatchID, &run.HostID, &run.ProviderID, &run.Status, &run.SummaryJSON); err != nil {
return nil, fmt.Errorf("scan reconcile run: %w", err)
}
runs = append(runs, run)