Files
llm-intelligence/scripts/vertex_pricing_signature_guard_test.go

237 lines
7.6 KiB
Go
Raw Normal View History

//go:build llm_script
package main
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
)
func TestRunVertexPricingSignatureGuardInitializesBaseline(t *testing.T) {
tempDir := t.TempDir()
baselinePath := filepath.Join(tempDir, "baseline.signature.json")
result, err := runVertexPricingSignatureGuard(vertexPricingSignatureGuardConfig{
URL: defaultVertexPricingURL,
Fixture: filepath.Join("testdata", "vertex_pricing_sample.html"),
SnapshotDir: tempDir,
BaselinePath: baselinePath,
Timeout: time.Second,
AllowBootstrap: true,
}, time.Date(2026, 5, 15, 19, 40, 0, 0, time.FixedZone("CST", 8*3600)))
if err != nil {
t.Fatalf("runVertexPricingSignatureGuard 返回错误: %v", err)
}
if !result.BaselineInitialized {
t.Fatalf("期望初始化 baseline")
}
if result.DriftDetected {
t.Fatalf("首次初始化不应判定为漂移")
}
if _, err := os.Stat(baselinePath); err != nil {
t.Fatalf("baseline 未写入: %v", err)
}
if _, err := os.Stat(result.SnapshotPath); err != nil {
t.Fatalf("snapshot 未写入: %v", err)
}
}
func TestRunVertexPricingSignatureGuardDetectsDrift(t *testing.T) {
tempDir := t.TempDir()
baselinePath := filepath.Join(tempDir, "baseline.signature.json")
initialResult, err := runVertexPricingSignatureGuard(vertexPricingSignatureGuardConfig{
URL: defaultVertexPricingURL,
Fixture: filepath.Join("testdata", "vertex_pricing_sample.html"),
SnapshotDir: tempDir,
BaselinePath: baselinePath,
Timeout: time.Second,
AllowBootstrap: true,
}, time.Date(2026, 5, 15, 19, 41, 0, 0, time.FixedZone("CST", 8*3600)))
if err != nil {
t.Fatalf("初始化 baseline 失败: %v", err)
}
driftFixture := `<html><body><h2>Google 模型</h2><h3>标准</h3><section><div>新结构</div></section></body></html>`
driftPath := filepath.Join(tempDir, "vertex-drift.html")
if err := os.WriteFile(driftPath, []byte(driftFixture), 0o644); err != nil {
t.Fatalf("写入 drift fixture 失败: %v", err)
}
result, err := runVertexPricingSignatureGuard(vertexPricingSignatureGuardConfig{
URL: defaultVertexPricingURL,
Fixture: driftPath,
SnapshotDir: tempDir,
BaselinePath: baselinePath,
Timeout: time.Second,
AllowBootstrap: false,
}, time.Date(2026, 5, 15, 19, 42, 0, 0, time.FixedZone("CST", 8*3600)))
if err == nil {
t.Fatalf("期望结构漂移时报错")
}
if !result.DriftDetected {
t.Fatalf("期望 driftDetected=true")
}
if result.CurrentSignature.StructureSHA256 == initialResult.CurrentSignature.StructureSHA256 {
t.Fatalf("期望结构签名发生变化")
}
if !strings.Contains(err.Error(), "vertex pricing structure drift detected") {
t.Fatalf("期望返回 drift 错误,实际: %v", err)
}
}
func TestFormatVertexPricingSignatureGuardSummary(t *testing.T) {
result := vertexPricingSignatureGuardResult{
SnapshotPath: "/tmp/vertex.html",
SignaturePath: "/tmp/vertex.signature.json",
BaselinePath: "/tmp/baseline.signature.json",
DriftDetected: false,
BaselineInitialized: true,
CurrentSignature: vertexPricingStructureSignature{StructureSHA256: "abc123", ByteSize: 99},
PreviousBaselineHash: "",
}
summary := formatVertexPricingSignatureGuardSummary(result)
for _, want := range []string{
"source=vertex-pricing-signature-guard",
"drift=false",
"baseline_initialized=true",
"structure_sha256=abc123",
"snapshot_out=/tmp/vertex.html",
} {
if !strings.Contains(summary, want) {
t.Fatalf("summary 缺少 %q实际: %q", want, summary)
}
}
}
func TestInsertOfficialImportSignatureAuditPersistsStructuredRecord(t *testing.T) {
db, calls := openVertexSignatureAuditRecordingDB(t)
checkedAt := time.Date(2026, 5, 15, 20, 15, 0, 0, time.FixedZone("CST", 8*3600))
record := officialImportSignatureAuditRecord{
SourceKey: "vertex_pricing_signature",
CheckedAt: checkedAt,
Status: "drift_detected",
DriftDetected: true,
BaselineInitialized: false,
SourceURL: defaultVertexPricingURL,
SnapshotPath: "/tmp/vertex.html",
SignaturePath: "/tmp/vertex.signature.json",
BaselinePath: "/tmp/baseline.signature.json",
StructureSHA256: "current-sha",
PreviousStructureSHA256: "baseline-sha",
ByteSize: 813810,
SignaturePayload: &vertexPricingStructureSignature{
ByteSize: 813810,
StructureSHA256: "current-sha",
Headings: []string{"Gemini 2.5 Pro", "标准"},
TagCounts: map[string]int{"table": 1, "h2": 2},
ContainsGemini: true,
ContainsTable: true,
},
ErrorMessage: "vertex pricing structure drift detected",
}
if err := insertOfficialImportSignatureAudit(db, record); err != nil {
t.Fatalf("insertOfficialImportSignatureAudit 返回错误: %v", err)
}
if len(calls.calls) != 1 {
t.Fatalf("期望 1 次写库,实际 %d", len(calls.calls))
}
call := calls.calls[0]
if !strings.Contains(call.query, "INSERT INTO official_import_signature_audit") {
t.Fatalf("期望写入 official_import_signature_audit实际 SQL: %s", call.query)
}
if got := call.args[0]; got != "vertex_pricing_signature" {
t.Fatalf("source_key 不匹配,实际 %#v", got)
}
if got := call.args[2]; got != "drift_detected" {
t.Fatalf("status 不匹配,实际 %#v", got)
}
if got := call.args[3]; got != true {
t.Fatalf("drift_detected 不匹配,实际 %#v", got)
}
if got := call.args[10]; got != "current-sha" {
t.Fatalf("structure_sha256 不匹配,实际 %#v", got)
}
if got := call.args[11]; got != "baseline-sha" {
t.Fatalf("previous_structure_sha256 不匹配,实际 %#v", got)
}
if got := call.args[13]; !strings.Contains(fmt.Sprint(got), `"structure_sha256":"current-sha"`) {
t.Fatalf("signature_payload 未写入结构化 JSON实际 %#v", got)
}
if got := call.args[14]; got != "vertex pricing structure drift detected" {
t.Fatalf("error_message 不匹配,实际 %#v", got)
}
}
type vertexSignatureAuditExecCall struct {
query string
args []any
}
type vertexSignatureAuditExecRecorder struct {
mu sync.Mutex
calls []vertexSignatureAuditExecCall
}
type vertexSignatureAuditDriver struct {
recorder *vertexSignatureAuditExecRecorder
}
type vertexSignatureAuditConn struct {
recorder *vertexSignatureAuditExecRecorder
}
func openVertexSignatureAuditRecordingDB(t *testing.T) (*sql.DB, *vertexSignatureAuditExecRecorder) {
t.Helper()
name := fmt.Sprintf("vertex-signature-audit-%d", time.Now().UnixNano())
recorder := &vertexSignatureAuditExecRecorder{}
sql.Register(name, vertexSignatureAuditDriver{recorder: recorder})
db, err := sql.Open(name, "")
if err != nil {
t.Fatalf("open recording db: %v", err)
}
t.Cleanup(func() {
_ = db.Close()
})
return db, recorder
}
func (d vertexSignatureAuditDriver) Open(string) (driver.Conn, error) {
return vertexSignatureAuditConn{recorder: d.recorder}, nil
}
func (c vertexSignatureAuditConn) Prepare(string) (driver.Stmt, error) {
return nil, fmt.Errorf("not implemented")
}
func (c vertexSignatureAuditConn) Close() error {
return nil
}
func (c vertexSignatureAuditConn) Begin() (driver.Tx, error) {
return nil, fmt.Errorf("not implemented")
}
func (c vertexSignatureAuditConn) ExecContext(_ context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
values := make([]any, 0, len(args))
for _, arg := range args {
values = append(values, arg.Value)
}
c.recorder.mu.Lock()
c.recorder.calls = append(c.recorder.calls, vertexSignatureAuditExecCall{
query: query,
args: values,
})
c.recorder.mu.Unlock()
return driver.RowsAffected(1), nil
}