Files
llm-intelligence/scripts/vertex_pricing_signature_guard_test.go
phamnazage-jpg 256975e10c feat(audit): add pricing signature guards and reporting
Add snapshot, signature, and drift guard support for Vertex AI, Cloudflare Workers AI, and Perplexity API, backed by a queryable audit table and recent-window view.

This commit also wires the audit query layer into daily signal materialization and report generation so structure drift becomes a first-class signal instead of a log-only artifact.
2026-05-15 22:34:22 +08:00

237 lines
7.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//go:build llm_script
package main
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
)
func TestRunVertexPricingSignatureGuardInitializesBaseline(t *testing.T) {
tempDir := t.TempDir()
baselinePath := filepath.Join(tempDir, "baseline.signature.json")
result, err := runVertexPricingSignatureGuard(vertexPricingSignatureGuardConfig{
URL: defaultVertexPricingURL,
Fixture: filepath.Join("testdata", "vertex_pricing_sample.html"),
SnapshotDir: tempDir,
BaselinePath: baselinePath,
Timeout: time.Second,
AllowBootstrap: true,
}, time.Date(2026, 5, 15, 19, 40, 0, 0, time.FixedZone("CST", 8*3600)))
if err != nil {
t.Fatalf("runVertexPricingSignatureGuard 返回错误: %v", err)
}
if !result.BaselineInitialized {
t.Fatalf("期望初始化 baseline")
}
if result.DriftDetected {
t.Fatalf("首次初始化不应判定为漂移")
}
if _, err := os.Stat(baselinePath); err != nil {
t.Fatalf("baseline 未写入: %v", err)
}
if _, err := os.Stat(result.SnapshotPath); err != nil {
t.Fatalf("snapshot 未写入: %v", err)
}
}
func TestRunVertexPricingSignatureGuardDetectsDrift(t *testing.T) {
tempDir := t.TempDir()
baselinePath := filepath.Join(tempDir, "baseline.signature.json")
initialResult, err := runVertexPricingSignatureGuard(vertexPricingSignatureGuardConfig{
URL: defaultVertexPricingURL,
Fixture: filepath.Join("testdata", "vertex_pricing_sample.html"),
SnapshotDir: tempDir,
BaselinePath: baselinePath,
Timeout: time.Second,
AllowBootstrap: true,
}, time.Date(2026, 5, 15, 19, 41, 0, 0, time.FixedZone("CST", 8*3600)))
if err != nil {
t.Fatalf("初始化 baseline 失败: %v", err)
}
driftFixture := `<html><body><h2>Google 模型</h2><h3>标准</h3><section><div>新结构</div></section></body></html>`
driftPath := filepath.Join(tempDir, "vertex-drift.html")
if err := os.WriteFile(driftPath, []byte(driftFixture), 0o644); err != nil {
t.Fatalf("写入 drift fixture 失败: %v", err)
}
result, err := runVertexPricingSignatureGuard(vertexPricingSignatureGuardConfig{
URL: defaultVertexPricingURL,
Fixture: driftPath,
SnapshotDir: tempDir,
BaselinePath: baselinePath,
Timeout: time.Second,
AllowBootstrap: false,
}, time.Date(2026, 5, 15, 19, 42, 0, 0, time.FixedZone("CST", 8*3600)))
if err == nil {
t.Fatalf("期望结构漂移时报错")
}
if !result.DriftDetected {
t.Fatalf("期望 driftDetected=true")
}
if result.CurrentSignature.StructureSHA256 == initialResult.CurrentSignature.StructureSHA256 {
t.Fatalf("期望结构签名发生变化")
}
if !strings.Contains(err.Error(), "vertex pricing structure drift detected") {
t.Fatalf("期望返回 drift 错误,实际: %v", err)
}
}
func TestFormatVertexPricingSignatureGuardSummary(t *testing.T) {
result := vertexPricingSignatureGuardResult{
SnapshotPath: "/tmp/vertex.html",
SignaturePath: "/tmp/vertex.signature.json",
BaselinePath: "/tmp/baseline.signature.json",
DriftDetected: false,
BaselineInitialized: true,
CurrentSignature: vertexPricingStructureSignature{StructureSHA256: "abc123", ByteSize: 99},
PreviousBaselineHash: "",
}
summary := formatVertexPricingSignatureGuardSummary(result)
for _, want := range []string{
"source=vertex-pricing-signature-guard",
"drift=false",
"baseline_initialized=true",
"structure_sha256=abc123",
"snapshot_out=/tmp/vertex.html",
} {
if !strings.Contains(summary, want) {
t.Fatalf("summary 缺少 %q实际: %q", want, summary)
}
}
}
func TestInsertOfficialImportSignatureAuditPersistsStructuredRecord(t *testing.T) {
db, calls := openVertexSignatureAuditRecordingDB(t)
checkedAt := time.Date(2026, 5, 15, 20, 15, 0, 0, time.FixedZone("CST", 8*3600))
record := officialImportSignatureAuditRecord{
SourceKey: "vertex_pricing_signature",
CheckedAt: checkedAt,
Status: "drift_detected",
DriftDetected: true,
BaselineInitialized: false,
SourceURL: defaultVertexPricingURL,
SnapshotPath: "/tmp/vertex.html",
SignaturePath: "/tmp/vertex.signature.json",
BaselinePath: "/tmp/baseline.signature.json",
StructureSHA256: "current-sha",
PreviousStructureSHA256: "baseline-sha",
ByteSize: 813810,
SignaturePayload: &vertexPricingStructureSignature{
ByteSize: 813810,
StructureSHA256: "current-sha",
Headings: []string{"Gemini 2.5 Pro", "标准"},
TagCounts: map[string]int{"table": 1, "h2": 2},
ContainsGemini: true,
ContainsTable: true,
},
ErrorMessage: "vertex pricing structure drift detected",
}
if err := insertOfficialImportSignatureAudit(db, record); err != nil {
t.Fatalf("insertOfficialImportSignatureAudit 返回错误: %v", err)
}
if len(calls.calls) != 1 {
t.Fatalf("期望 1 次写库,实际 %d", len(calls.calls))
}
call := calls.calls[0]
if !strings.Contains(call.query, "INSERT INTO official_import_signature_audit") {
t.Fatalf("期望写入 official_import_signature_audit实际 SQL: %s", call.query)
}
if got := call.args[0]; got != "vertex_pricing_signature" {
t.Fatalf("source_key 不匹配,实际 %#v", got)
}
if got := call.args[2]; got != "drift_detected" {
t.Fatalf("status 不匹配,实际 %#v", got)
}
if got := call.args[3]; got != true {
t.Fatalf("drift_detected 不匹配,实际 %#v", got)
}
if got := call.args[10]; got != "current-sha" {
t.Fatalf("structure_sha256 不匹配,实际 %#v", got)
}
if got := call.args[11]; got != "baseline-sha" {
t.Fatalf("previous_structure_sha256 不匹配,实际 %#v", got)
}
if got := call.args[13]; !strings.Contains(fmt.Sprint(got), `"structure_sha256":"current-sha"`) {
t.Fatalf("signature_payload 未写入结构化 JSON实际 %#v", got)
}
if got := call.args[14]; got != "vertex pricing structure drift detected" {
t.Fatalf("error_message 不匹配,实际 %#v", got)
}
}
type vertexSignatureAuditExecCall struct {
query string
args []any
}
type vertexSignatureAuditExecRecorder struct {
mu sync.Mutex
calls []vertexSignatureAuditExecCall
}
type vertexSignatureAuditDriver struct {
recorder *vertexSignatureAuditExecRecorder
}
type vertexSignatureAuditConn struct {
recorder *vertexSignatureAuditExecRecorder
}
func openVertexSignatureAuditRecordingDB(t *testing.T) (*sql.DB, *vertexSignatureAuditExecRecorder) {
t.Helper()
name := fmt.Sprintf("vertex-signature-audit-%d", time.Now().UnixNano())
recorder := &vertexSignatureAuditExecRecorder{}
sql.Register(name, vertexSignatureAuditDriver{recorder: recorder})
db, err := sql.Open(name, "")
if err != nil {
t.Fatalf("open recording db: %v", err)
}
t.Cleanup(func() {
_ = db.Close()
})
return db, recorder
}
func (d vertexSignatureAuditDriver) Open(string) (driver.Conn, error) {
return vertexSignatureAuditConn{recorder: d.recorder}, nil
}
func (c vertexSignatureAuditConn) Prepare(string) (driver.Stmt, error) {
return nil, fmt.Errorf("not implemented")
}
func (c vertexSignatureAuditConn) Close() error {
return nil
}
func (c vertexSignatureAuditConn) Begin() (driver.Tx, error) {
return nil, fmt.Errorf("not implemented")
}
func (c vertexSignatureAuditConn) ExecContext(_ context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
values := make([]any, 0, len(args))
for _, arg := range args {
values = append(values, arg.Value)
}
c.recorder.mu.Lock()
c.recorder.calls = append(c.recorder.calls, vertexSignatureAuditExecCall{
query: query,
args: values,
})
c.recorder.mu.Unlock()
return driver.RowsAffected(1), nil
}