chore(supply-api): add runtime schema sql assets

Add the outbox, partitioning, and token-status DDL files alongside the partition strategy regression test. These files map directly to already committed repository and middleware paths, and were verified with fresh repository, outbox, and middleware test runs before commit.
This commit is contained in:
Your Name
2026-04-11 10:29:15 +08:00
parent 193372ca95
commit b0ca154e08
4 changed files with 509 additions and 0 deletions

View File

@@ -0,0 +1,93 @@
-- Outbox Pattern Schema v1.0
-- 用于跨系统事件发布的可靠消息传递
-- Outbox状态枚举
DO $$ BEGIN
CREATE TYPE outbox_status AS ENUM ('pending', 'processing', 'completed', 'failed', 'dead_letter');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
-- Outbox事件表
CREATE TABLE IF NOT EXISTS supply_outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_id VARCHAR(100) NOT NULL UNIQUE,
payload JSONB NOT NULL,
status outbox_status NOT NULL DEFAULT 'pending',
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 5,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMPTZ,
next_retry_at TIMESTAMPTZ,
dead_letter_reason TEXT,
version BIGINT NOT NULL DEFAULT 0,
-- 约束
CONSTRAINT valid_outbox_status CHECK (status IN ('pending', 'processing', 'completed', 'failed', 'dead_letter')),
CONSTRAINT valid_retry_count CHECK (retry_count >= 0 AND retry_count <= max_retries)
);
-- 死信队列表
CREATE TABLE IF NOT EXISTS supply_outbox_dead_letter (
id BIGSERIAL PRIMARY KEY,
original_event_id VARCHAR(100) NOT NULL,
original_aggregate_type VARCHAR(100) NOT NULL,
original_aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
error_message TEXT,
retry_count INT NOT NULL DEFAULT 0,
first_failed_at TIMESTAMPTZ NOT NULL,
dead_letter_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
handled BOOLEAN NOT NULL DEFAULT FALSE,
handled_at TIMESTAMPTZ,
handler_notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- 索引
CREATE INDEX IF NOT EXISTS idx_outbox_status ON supply_outbox(status);
CREATE INDEX IF NOT EXISTS idx_outbox_status_next_retry ON supply_outbox(status, next_retry_at) WHERE status = 'failed';
CREATE INDEX IF NOT EXISTS idx_outbox_aggregate ON supply_outbox(aggregate_type, aggregate_id);
CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON supply_outbox(created_at);
CREATE INDEX IF NOT EXISTS idx_outbox_event_id ON supply_outbox(event_id);
CREATE INDEX IF NOT EXISTS idx_dead_letter_original_event_id ON supply_outbox_dead_letter(original_event_id);
CREATE INDEX IF NOT EXISTS idx_dead_letter_handled ON supply_outbox_dead_letter(handled) WHERE handled = FALSE;
CREATE INDEX IF NOT EXISTS idx_dead_letter_created_at ON supply_outbox_dead_letter(created_at);
-- 注释
COMMENT ON TABLE supply_outbox IS 'Outbox事件表用于可靠的事件发布';
COMMENT ON COLUMN supply_outbox.aggregate_type IS '聚合类型,如 account, package, settlement';
COMMENT ON COLUMN supply_outbox.aggregate_id IS '聚合ID';
COMMENT ON COLUMN supply_outbox.event_type IS '事件类型,如 account.created, package.updated';
COMMENT ON COLUMN supply_outbox.event_id IS '事件唯一IDUUID';
COMMENT ON COLUMN supply_outbox.payload IS '事件负载JSON格式';
COMMENT ON COLUMN supply_outbox.status IS '处理状态pending/processing/completed/failed/dead_letter';
COMMENT ON COLUMN supply_outbox.retry_count IS '已重试次数';
COMMENT ON COLUMN supply_outbox.max_retries IS '最大重试次数';
COMMENT ON COLUMN supply_outbox.version IS '乐观锁版本号';
COMMENT ON TABLE supply_outbox_dead_letter IS 'Outbox死信队列用于存储无法处理的事件';
COMMENT ON COLUMN supply_outbox_dead_letter.original_event_id IS '原始事件ID';
COMMENT ON COLUMN supply_outbox_dead_letter.first_failed_at IS '首次失败时间';
-- 自动更新version触发器
CREATE OR REPLACE FUNCTION update_outbox_version()
RETURNS TRIGGER AS $$
BEGIN
NEW.version = OLD.version + 1;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trigger_outbox_version ON supply_outbox;
CREATE TRIGGER trigger_outbox_version
BEFORE UPDATE ON supply_outbox
FOR EACH ROW
WHEN (OLD.status IS DISTINCT FROM NEW.status)
EXECUTE FUNCTION update_outbox_version();

View File

@@ -0,0 +1,244 @@
-- Partition Strategy Schema v1.0
-- 按月分区的大表分区策略
-- ==================== 1. audit_events 分区 (按月分区保留12个月) ====================
-- 创建父表
CREATE TABLE IF NOT EXISTS audit_events (
id BIGSERIAL,
event_id VARCHAR(100) NOT NULL,
event_name VARCHAR(100) NOT NULL,
event_category VARCHAR(50),
event_sub_category VARCHAR(50),
timestamp TIMESTAMPTZ NOT NULL,
timestamp_ms BIGINT NOT NULL,
request_id VARCHAR(100),
idempotency_key VARCHAR(128),
tenant_id BIGINT,
object_type VARCHAR(100),
object_id VARCHAR(100),
action VARCHAR(100) NOT NULL,
result_code VARCHAR(50),
source_ip VARCHAR(50),
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, timestamp)
) PARTITION BY RANGE (timestamp);
-- 创建月度分区函数
CREATE OR REPLACE FUNCTION create_audit_events_partition(partition_date DATE)
RETURNS VOID AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
start_date := date_trunc('month', partition_date)::DATE;
end_date := (start_date + INTERVAL '1 month')::DATE;
partition_name := 'audit_events_' || to_char(start_date, 'YYYY_MM');
-- 检查分区是否已存在
IF NOT EXISTS (
SELECT 1 FROM pg_class WHERE relname = partition_name
) THEN
EXECUTE format(
'CREATE TABLE %I PARTITION OF audit_events FOR VALUES FROM (%L) TO (%L)',
partition_name, start_date, end_date
);
RAISE NOTICE 'Created partition: %', partition_name;
ELSE
RAISE NOTICE 'Partition already exists: %', partition_name;
END IF;
END;
$$ LANGUAGE plpgsql;
-- ==================== 2. supply_usage_records 分区 (按月分区保留3个月) ====================
CREATE TABLE IF NOT EXISTS supply_usage_records (
id BIGSERIAL,
order_id BIGINT NOT NULL,
buyer_user_id BIGINT NOT NULL,
supply_account_id BIGINT NOT NULL,
supplier_user_id BIGINT NOT NULL,
request_id VARCHAR(64) NOT NULL,
upstream_request_id VARCHAR(128),
api_key_id BIGINT,
platform VARCHAR(50) NOT NULL,
model VARCHAR(100) NOT NULL,
endpoint VARCHAR(100) NOT NULL,
request_tokens BIGINT,
response_tokens BIGINT,
total_tokens BIGINT,
input_cost NUMERIC(20,6),
output_cost NUMERIC(20,6),
total_cost NUMERIC(20,6) NOT NULL,
unit_price NUMERIC(20,6) NOT NULL,
response_status INT,
latency_ms INT,
error_message TEXT,
success BOOLEAN DEFAULT TRUE,
started_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, started_at)
) PARTITION BY RANGE (started_at);
CREATE OR REPLACE FUNCTION create_usage_records_partition(partition_date DATE)
RETURNS VOID AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
start_date := date_trunc('month', partition_date)::DATE;
end_date := (start_date + INTERVAL '1 month')::DATE;
partition_name := 'supply_usage_records_' || to_char(start_date, 'YYYY_MM');
IF NOT EXISTS (
SELECT 1 FROM pg_class WHERE relname = partition_name
) THEN
EXECUTE format(
'CREATE TABLE %I PARTITION OF supply_usage_records FOR VALUES FROM (%L) TO (%L)',
partition_name, start_date, end_date
);
RAISE NOTICE 'Created partition: %', partition_name;
END IF;
END;
$$ LANGUAGE plpgsql;
-- ==================== 3. supply_idempotency_records 分区 (按月分区保留7天) ====================
CREATE TABLE IF NOT EXISTS supply_idempotency_records (
id BIGSERIAL,
tenant_id BIGINT NOT NULL,
operator_id BIGINT NOT NULL,
api_path VARCHAR(200) NOT NULL,
idempotency_key VARCHAR(128) NOT NULL,
request_id VARCHAR(64) NOT NULL,
payload_hash CHAR(64) NOT NULL,
response_code INT,
response_body JSONB,
status VARCHAR(20) NOT NULL DEFAULT 'processing',
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, expires_at)
) PARTITION BY RANGE (expires_at);
CREATE OR REPLACE FUNCTION create_idempotency_partition(partition_date DATE)
RETURNS VOID AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
start_date := date_trunc('month', partition_date)::DATE;
end_date := (start_date + INTERVAL '1 month')::DATE;
partition_name := 'supply_idempotency_records_' || to_char(start_date, 'YYYY_MM');
IF NOT EXISTS (
SELECT 1 FROM pg_class WHERE relname = partition_name
) THEN
EXECUTE format(
'CREATE TABLE %I PARTITION OF supply_idempotency_records FOR VALUES FROM (%L) TO (%L)',
partition_name, start_date, end_date
);
RAISE NOTICE 'Created partition: %', partition_name;
END IF;
END;
$$ LANGUAGE plpgsql;
-- ==================== 4. 自动创建未来分区 ====================
CREATE OR REPLACE FUNCTION ensure_future_partitions()
RETURNS VOID AS $$
DECLARE
i INT;
future_date DATE;
BEGIN
-- 为未来3个月创建分区
FOR i IN 0..3 LOOP
future_date := (CURRENT_DATE + (i || ' months')::INTERVAL)::DATE;
PERFORM create_audit_events_partition(future_date);
PERFORM create_usage_records_partition(future_date);
PERFORM create_idempotency_partition(future_date);
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- ==================== 5. 清理过期分区 ====================
CREATE OR REPLACE FUNCTION drop_old_audit_partitions(retention_months INT DEFAULT 12)
RETURNS INTEGER AS $$
DECLARE
partition_name TEXT;
cutoff_date DATE;
dropped_count INTEGER := 0;
BEGIN
cutoff_date := (CURRENT_DATE - (retention_months || ' months')::INTERVAL)::DATE;
FOR partition_name IN
SELECT relname
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relname ~ 'audit_events_20[0-9]{2}_[0-9]{2}'
AND n.nspname = 'public'
LOOP
-- 提取分区日期
IF partition_name < 'audit_events_' || to_char(cutoff_date, 'YYYY_MM') THEN
EXECUTE format('DROP TABLE IF EXISTS %I', partition_name);
dropped_count := dropped_count + 1;
RAISE NOTICE 'Dropped partition: %', partition_name;
END IF;
END LOOP;
RETURN dropped_count;
END;
$$ LANGUAGE plpgsql;
-- ==================== 6. 初始化分区 (首次运行) ====================
-- 创建初始分区过去12个月 + 未来3个月
DO $$
DECLARE
i INT;
target_date DATE;
BEGIN
-- 过去12个月
FOR i IN -12..0 LOOP
target_date := (CURRENT_DATE + (i || ' months')::INTERVAL)::DATE;
PERFORM create_audit_events_partition(target_date);
PERFORM create_usage_records_partition(target_date);
PERFORM create_idempotency_partition(target_date);
END LOOP;
-- 未来3个月
FOR i IN 1..3 LOOP
target_date := (CURRENT_DATE + (i || ' months')::INTERVAL)::DATE;
PERFORM create_audit_events_partition(target_date);
PERFORM create_usage_records_partition(target_date);
PERFORM create_idempotency_partition(target_date);
END LOOP;
END $$;
-- ==================== 7. 索引 ====================
-- 在父表上创建索引(会自动继承到分区)
CREATE INDEX IF NOT EXISTS idx_audit_events_tenant_id ON audit_events(tenant_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_request_id ON audit_events(request_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_created_at ON audit_events(created_at);
CREATE INDEX IF NOT EXISTS idx_audit_events_object ON audit_events(object_type, object_id);
CREATE INDEX IF NOT EXISTS idx_usage_records_order_id ON supply_usage_records(order_id);
CREATE INDEX IF NOT EXISTS idx_usage_records_started_at ON supply_usage_records(started_at);
CREATE INDEX IF NOT EXISTS idx_idempotency_expires_at ON supply_idempotency_records(expires_at);
-- ==================== 8. 注释 ====================
COMMENT ON TABLE audit_events IS '审计事件表 - 按月分区保留12个月';
COMMENT ON TABLE supply_usage_records IS '使用记录表 - 按月分区保留3个月';
COMMENT ON TABLE supply_idempotency_records IS '幂等记录表 - 按月分区保留7天以上';
-- 创建pg_cron作业定期维护分区需要扩展 pg_cron
-- SELECT cron.schedule('partition-maintenance', '0 0 * * *', 'SELECT ensure_future_partitions()');
-- SELECT cron.schedule('partition-cleanup', '0 1 * * *', 'SELECT drop_old_audit_partitions(12)');

View File

@@ -0,0 +1,67 @@
-- Token Status Registry v1.0
-- 存储Token吊销状态支持主动失效机制P0-03修复
-- 设计目标:吊销传播延迟 <= 5s
-- Token状态枚举
DO $$ BEGIN
CREATE TYPE token_status AS ENUM ('active', 'revoked', 'expired');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
-- Token状态注册表
CREATE TABLE IF NOT EXISTS token_status_registry (
id BIGSERIAL PRIMARY KEY,
token_id VARCHAR(128) NOT NULL UNIQUE,
subject_id BIGINT NOT NULL,
tenant_id BIGINT NOT NULL,
role VARCHAR(50) NOT NULL DEFAULT 'user',
status token_status NOT NULL DEFAULT 'active',
issued_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMPTZ NOT NULL,
revoked_at TIMESTAMPTZ,
revoked_reason VARCHAR(256),
revoked_by BIGINT,
last_verified_at TIMESTAMPTZ,
verification_count BIGINT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
-- 约束
CONSTRAINT valid_token_status CHECK (status IN ('active', 'revoked', 'expired')),
CONSTRAINT valid_expiry CHECK (expires_at > issued_at)
);
-- 索引
CREATE INDEX IF NOT EXISTS idx_token_status_token_id ON token_status_registry(token_id);
CREATE INDEX IF NOT EXISTS idx_token_status_subject_id ON token_status_registry(subject_id);
CREATE INDEX IF NOT EXISTS idx_token_status_tenant_id ON token_status_registry(tenant_id);
CREATE INDEX IF NOT EXISTS idx_token_status_status ON token_status_registry(status);
CREATE INDEX IF NOT EXISTS idx_token_status_expires_at ON token_status_registry(expires_at) WHERE status = 'active';
CREATE INDEX IF NOT EXISTS idx_token_status_revoked_at ON token_status_registry(revoked_at) WHERE status = 'revoked';
-- 注释
COMMENT ON TABLE token_status_registry IS 'Token状态注册表用于管理Token吊销状态';
COMMENT ON COLUMN token_status_registry.token_id IS 'Token唯一标识符JWT jti claim';
COMMENT ON COLUMN token_status_registry.subject_id IS 'Token所属用户ID';
COMMENT ON COLUMN token_status_registry.tenant_id IS '租户ID';
COMMENT ON COLUMN token_status_registry.status IS 'Token状态active/revoked/expired';
COMMENT ON COLUMN token_status_registry.revoked_at IS '吊销时间';
COMMENT ON COLUMN token_status_registry.revoked_reason IS '吊销原因';
COMMENT ON COLUMN token_status_registry.last_verified_at IS '最后验证时间(用于活跃度追踪)';
COMMENT ON COLUMN token_status_registry.verification_count IS '验证次数统计';
-- 自动更新updated_at触发器
CREATE OR REPLACE FUNCTION update_token_status_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trigger_token_status_updated_at ON token_status_registry;
CREATE TRIGGER trigger_token_status_updated_at
BEFORE UPDATE ON token_status_registry
FOR EACH ROW
EXECUTE FUNCTION update_token_status_updated_at();