chore: sync local latest state and repository cleanup

This commit is contained in:
Your Name
2026-03-23 13:02:36 +08:00
parent f1ff3d629f
commit 2ef0f17961
493 changed files with 46912 additions and 7977 deletions

View File

@@ -0,0 +1,120 @@
package com.mosquito.project.job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.UUID;
/**
* 内部奖励发放适配器
* 对接内部账户系统API支持超时、重试、幂等键
*/
@Component
public class InternalRewardDistributor implements RewardJobProcessor.RewardDistributor {
private static final Logger log = LoggerFactory.getLogger(InternalRewardDistributor.class);
@Value("${app.internal-account.api-url:}")
private String apiUrl;
@Value("${app.internal-account.api-key:}")
private String apiKey;
@Value("${app.internal-account.timeout:5000}")
private int timeout;
/**
* 是否允许模拟成功模式(仅开发/测试环境使用)
* 生产环境必须设置为false否则可能造成"假成功发放"
*/
@Value("${app.internal-account.allow-mock-success:false}")
private boolean allowMockSuccess;
private final RestTemplate restTemplate;
public InternalRewardDistributor(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
@Override
public boolean distribute(Long userId, Long activityId, String trackingId, int points, String rewardType) {
// 生成幂等键,确保重试不会重复发放
String idempotencyKey = generateIdempotencyKey(userId, activityId, trackingId);
log.info("发放奖励: userId={}, activityId={}, trackingId={}, points={}, type={}, idempotencyKey={}, allowMockSuccess={}",
userId, activityId, trackingId, points, rewardType, idempotencyKey, allowMockSuccess);
// 如果未配置API地址
if (apiUrl == null || apiUrl.isBlank()) {
// 生产环境(Fail-Closed)不允许模拟成功返回false进入重试队列
if (!allowMockSuccess) {
log.error("内部账户系统API地址未配置且不允许模拟成功奖励发放失败请检查配置");
return false;
}
// 开发/测试环境:允许模拟成功
log.warn("内部账户系统API地址未配置模拟奖励发放成功仅开发环境");
return true;
}
try {
return callInternalAccountApi(userId, points, rewardType, idempotencyKey);
} catch (Exception e) {
log.error("奖励发放失败: userId={}, error={}", userId, e.getMessage());
return false;
}
}
/**
* 调用内部账户系统API
*/
private boolean callInternalAccountApi(Long userId, int points, String rewardType, String idempotencyKey) {
// 构建请求体
String requestBody = String.format(
"{\"userId\":%d,\"points\":%d,\"type\":\"%s\",\"idempotencyKey\":\"%s\"}",
userId, points, rewardType, idempotencyKey
);
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
if (apiKey != null && !apiKey.isBlank()) {
headers.set("X-API-Key", apiKey);
}
headers.set("X-Idempotency-Key", idempotencyKey);
HttpEntity<String> request = new HttpEntity<>(requestBody, headers);
try {
ResponseEntity<String> response = restTemplate.exchange(
apiUrl + "/api/v1/points/grant",
HttpMethod.POST,
request,
String.class
);
if (response.getStatusCode() == HttpStatus.OK || response.getStatusCode() == HttpStatus.CREATED) {
log.info("奖励发放成功: userId={}, response={}", userId, response.getBody());
return true;
} else {
log.error("奖励发放失败: userId={}, status={}", userId, response.getStatusCode());
return false;
}
} catch (Exception e) {
log.error("调用内部账户系统API异常: userId={}, error={}", userId, e.getMessage());
throw e;
}
}
/**
* 生成幂等键
*/
private String generateIdempotencyKey(Long userId, Long activityId, String trackingId) {
return UUID.nameUUIDFromBytes(
String.format("%d-%d-%s", userId, activityId, trackingId).getBytes()
).toString();
}
}

View File

@@ -0,0 +1,374 @@
package com.mosquito.project.job;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mosquito.project.domain.Activity;
import com.mosquito.project.persistence.entity.ActivityEntity;
import com.mosquito.project.persistence.entity.RewardJobEntity;
import com.mosquito.project.persistence.entity.ShortLinkEntity;
import com.mosquito.project.persistence.entity.UserRewardEntity;
import com.mosquito.project.persistence.repository.ActivityRepository;
import com.mosquito.project.persistence.repository.RewardJobRepository;
import com.mosquito.project.persistence.repository.ShortLinkRepository;
import com.mosquito.project.persistence.repository.UserRewardRepository;
import com.mosquito.project.service.CouponRewardService;
import com.mosquito.project.service.RewardService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
/**
* 奖励任务消费处理器
* 定时处理奖励队列中的待发放任务
* 使用tracking_id进行精确归因
* 按活动规则计算奖励值
*/
@Component
public class RewardJobProcessor {
private static final Logger log = LoggerFactory.getLogger(RewardJobProcessor.class);
private static final int MAX_RETRY_COUNT = 3;
private final RewardJobRepository rewardJobRepository;
private final ShortLinkRepository shortLinkRepository;
private final UserRewardRepository userRewardRepository;
private final ActivityRepository activityRepository;
private final ObjectMapper objectMapper;
private final RewardDistributor rewardDistributor;
private final CouponRewardService couponRewardService;
public RewardJobProcessor(RewardJobRepository rewardJobRepository,
ShortLinkRepository shortLinkRepository,
UserRewardRepository userRewardRepository,
ActivityRepository activityRepository,
ObjectMapper objectMapper,
RewardDistributor rewardDistributor,
CouponRewardService couponRewardService) {
this.rewardJobRepository = rewardJobRepository;
this.shortLinkRepository = shortLinkRepository;
this.userRewardRepository = userRewardRepository;
this.activityRepository = activityRepository;
this.objectMapper = objectMapper;
this.rewardDistributor = rewardDistributor;
this.couponRewardService = couponRewardService;
}
@Scheduled(fixedDelay = 5000) // 每5秒执行一次
@Transactional
public void processRewardJobs() {
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
List<RewardJobEntity> pendingJobs = rewardJobRepository
.findTop10ByStatusAndNextRunAtLessThanEqualOrderByCreatedAtAsc("pending", now);
if (pendingJobs.isEmpty()) {
return;
}
log.info("开始处理 {} 个奖励任务", pendingJobs.size());
for (RewardJobEntity job : pendingJobs) {
try {
processRewardJob(job);
} catch (Exception e) {
log.error("处理奖励任务 {} 失败: {}", job.getId(), e.getMessage());
handleJobFailure(job);
}
}
log.info("奖励任务处理完成");
}
/**
* 处理单个奖励任务
* 通过tracking_id精确定位邀请关系并发奖
* PRD要求去掉"按externalUserId查最近邀请记录"的隐式归因
*/
private void processRewardJob(RewardJobEntity job) {
String trackingId = job.getTrackingId();
if (trackingId == null || trackingId.isEmpty()) {
log.warn("奖励任务 {} 缺少tracking_id", job.getId());
job.setStatus("failed");
job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC));
rewardJobRepository.save(job);
return;
}
// 通过tracking_id精确查找对应的短链接记录包含邀请关系
var shortLinkOpt = shortLinkRepository.findByTrackingId(trackingId);
if (shortLinkOpt.isEmpty()) {
log.warn("找不到tracking_id {} 对应的邀请记录", trackingId);
job.setStatus("failed");
job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC));
rewardJobRepository.save(job);
return;
}
ShortLinkEntity shortLink = shortLinkOpt.get();
Long inviterUserId = shortLink.getInviterUserId();
Long activityId = shortLink.getActivityId();
if (inviterUserId == null) {
log.warn("tracking_id {} 对应的邀请记录没有邀请人", trackingId);
job.setStatus("failed");
job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC));
rewardJobRepository.save(job);
return;
}
// 根据活动配置计算奖励值和奖励类型
int points = calculateRewardPoints(activityId);
String rewardType = calculateRewardType(activityId); // 从活动配置获取奖励类型
// 处理优惠券奖励
if ("COUPON".equals(rewardType)) {
// 优惠券奖励处理先创建积分记录状态为APPROVED再调用优惠券服务发放
processCouponReward(job, shortLink, inviterUserId, activityId, trackingId, points);
return;
}
// 调用外部发放适配器进行奖励发放
try {
boolean success = rewardDistributor.distribute(inviterUserId, activityId, trackingId, points, rewardType);
if (!success) {
log.warn("奖励发放失败: userId={}, activityId={}, trackingId={}", inviterUserId, activityId, trackingId);
handleJobFailure(job);
return;
}
} catch (Exception e) {
log.error("奖励发放异常: {}", e.getMessage());
handleJobFailure(job);
return;
}
// 创建用户奖励记录
UserRewardEntity reward = new UserRewardEntity();
reward.setUserId(inviterUserId);
reward.setActivityId(activityId);
reward.setPoints(points);
reward.setType(rewardType);
reward.setStatus(RewardService.STATUS_GRANTED);
reward.setCreatedAt(OffsetDateTime.now(ZoneOffset.UTC));
reward.setTrackingId(trackingId); // 记录归因的tracking_id
// 从活动获取部门ID用于数据权限过滤
var activityOpt = activityRepository.findById(activityId);
if (activityOpt.isPresent()) {
reward.setDepartmentId(activityOpt.get().getDepartmentId());
log.debug("设置奖励departmentId={} from activityId={}", activityOpt.get().getDepartmentId(), activityId);
}
userRewardRepository.save(reward);
// 更新任务状态为已完成
job.setStatus("completed");
job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC));
rewardJobRepository.save(job);
log.info("成功处理奖励任务 {}通过tracking_id {} 为用户 {} 发放 {} 积分", job.getId(), trackingId, inviterUserId, points);
}
/**
* 根据活动配置计算奖励积分
*/
private int calculateRewardPoints(Long activityId) {
// 默认奖励值
int defaultPoints = 10;
try {
var activityOpt = activityRepository.findById(activityId);
if (activityOpt.isEmpty()) {
log.warn("找不到活动 {} 的配置,使用默认奖励", activityId);
return defaultPoints;
}
ActivityEntity activity = activityOpt.get();
String calculationMode = activity.getRewardCalculationMode();
// 如果有阶梯奖励配置,按阶梯计算
if (activity.getRewardTiersConfig() != null && !activity.getRewardTiersConfig().isEmpty()) {
return calculateTieredReward(activity.getRewardTiersConfig());
}
// 根据计算模式返回奖励
if ("FIXED".equals(calculationMode)) {
return defaultPoints;
}
// 默认返回固定奖励
return defaultPoints;
} catch (Exception e) {
log.error("计算奖励失败: {}", e.getMessage());
return defaultPoints;
}
}
/**
* 计算阶梯奖励
*/
private int calculateTieredReward(String tiersConfig) {
try {
@SuppressWarnings("unchecked")
List<Map<String, Object>> tiers = objectMapper.readValue(tiersConfig, List.class);
if (tiers == null || tiers.isEmpty()) {
return 10;
}
// 取第一个阶梯的奖励值
Map<String, Object> firstTier = tiers.get(0);
Object points = firstTier.get("points");
if (points instanceof Number) {
return ((Number) points).intValue();
}
return 10;
} catch (JsonProcessingException e) {
log.error("解析阶梯奖励配置失败: {}", e.getMessage());
return 10;
}
}
/**
* 从活动配置中获取奖励类型
*/
private String calculateRewardType(Long activityId) {
try {
var activityOpt = activityRepository.findById(activityId);
if (activityOpt.isEmpty()) {
return "POINTS"; // 默认积分奖励
}
ActivityEntity activity = activityOpt.get();
// 从活动配置的 rewardTiersConfig 中获取奖励类型
if (activity.getRewardTiersConfig() != null && !activity.getRewardTiersConfig().isEmpty()) {
@SuppressWarnings("unchecked")
List<Map<String, Object>> tiers = objectMapper.readValue(activity.getRewardTiersConfig(), List.class);
if (tiers != null && !tiers.isEmpty()) {
Map<String, Object> firstTier = tiers.get(0);
Object type = firstTier.get("type");
if (type != null) {
return type.toString();
}
}
}
return "POINTS"; // 默认积分奖励
} catch (Exception e) {
log.error("获取奖励类型失败: {}", e.getMessage());
return "POINTS";
}
}
/**
* 处理优惠券奖励发放
*/
private void processCouponReward(RewardJobEntity job, ShortLinkEntity shortLink,
Long inviterUserId, Long activityId, String trackingId, int points) {
try {
// 1. 先创建用户奖励记录状态为APPROVED待发放
UserRewardEntity reward = new UserRewardEntity();
reward.setUserId(inviterUserId);
reward.setActivityId(activityId);
reward.setPoints(points);
reward.setType("COUPON");
reward.setStatus("APPROVED"); // 待发放状态
reward.setCreatedAt(OffsetDateTime.now(ZoneOffset.UTC));
reward.setTrackingId(trackingId);
// 从活动获取部门ID和优惠券批次ID
var activityOpt = activityRepository.findById(activityId);
if (activityOpt.isPresent()) {
ActivityEntity activity = activityOpt.get();
reward.setDepartmentId(activity.getDepartmentId());
// 从活动配置中获取优惠券批次ID
if (activity.getRewardTiersConfig() != null) {
try {
@SuppressWarnings("unchecked")
List<Map<String, Object>> tiers = objectMapper.readValue(activity.getRewardTiersConfig(), List.class);
if (tiers != null && !tiers.isEmpty()) {
Object couponBatchId = tiers.get(0).get("couponBatchId");
if (couponBatchId != null) {
reward.setCouponBatchId(couponBatchId.toString());
}
}
} catch (Exception e) {
log.warn("解析优惠券批次ID失败: {}", e.getMessage());
}
}
}
UserRewardEntity savedReward = userRewardRepository.save(reward);
// 2. 调用优惠券服务发放优惠券
if (savedReward.getCouponBatchId() != null) {
CouponRewardService.CouponGrantResult result = couponRewardService.grantCoupon(
savedReward.getId(),
savedReward.getCouponBatchId(),
inviterUserId
);
if (result.isSuccess()) {
log.info("优惠券发放成功: rewardId={}, couponCode={}", savedReward.getId(), result.getCouponCode());
} else {
log.warn("优惠券发放失败: rewardId={}, reason={}", savedReward.getId(), result.getMessage());
// 优惠券发放失败不算任务失败,因为奖励记录已创建
}
}
// 3. 更新任务状态为已完成
job.setStatus("completed");
job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC));
rewardJobRepository.save(job);
log.info("成功处理优惠券奖励任务 {}通过tracking_id {} 为用户 {} 发放优惠券", job.getId(), trackingId, inviterUserId);
} catch (Exception e) {
log.error("处理优惠券奖励任务 {} 失败: {}", job.getId(), e.getMessage());
handleJobFailure(job);
}
}
/**
* 处理任务失败,重试或标记为失败
*/
private void handleJobFailure(RewardJobEntity job) {
int retryCount = job.getRetryCount() != null ? job.getRetryCount() : 0;
if (retryCount >= MAX_RETRY_COUNT) {
// 超过最大重试次数,标记为失败
job.setStatus("failed");
log.warn("奖励任务 {} 超过最大重试次数,标记为失败", job.getId());
} else {
// 增加重试次数,安排下次执行
job.setRetryCount(retryCount + 1);
job.setNextRunAt(OffsetDateTime.now(ZoneOffset.UTC).plusMinutes((long) Math.pow(2, retryCount)));
job.setStatus("pending");
log.info("奖励任务 {} 重试次数 {}, 下次执行时间: {}", job.getId(), retryCount + 1, job.getNextRunAt());
}
job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC));
rewardJobRepository.save(job);
}
/**
* 奖励发放接口(适配器模式)
*/
public interface RewardDistributor {
/**
* 发放奖励
* @param userId 用户ID
* @param activityId 活动ID
* @param trackingId 追踪ID
* @param points 积分数量
* @param rewardType 奖励类型
* @return 是否发放成功
*/
boolean distribute(Long userId, Long activityId, String trackingId, int points, String rewardType);
}
}

View File

@@ -5,15 +5,20 @@ import com.mosquito.project.domain.DailyActivityStats;
import com.mosquito.project.service.ActivityService;
import com.mosquito.project.persistence.entity.DailyActivityStatsEntity;
import com.mosquito.project.persistence.repository.DailyActivityStatsRepository;
import com.mosquito.project.persistence.repository.LinkClickRepository;
import com.mosquito.project.persistence.repository.UserInviteRepository;
import com.mosquito.project.persistence.repository.UserRewardRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
@Component
@@ -23,11 +28,21 @@ public class StatisticsAggregationJob {
private final ActivityService activityService;
private final DailyActivityStatsRepository dailyStatsRepository;
private final LinkClickRepository linkClickRepository;
private final UserInviteRepository userInviteRepository;
private final UserRewardRepository userRewardRepository;
private final Map<Long, DailyActivityStats> dailyStats = new ConcurrentHashMap<>();
public StatisticsAggregationJob(ActivityService activityService, DailyActivityStatsRepository dailyStatsRepository) {
public StatisticsAggregationJob(ActivityService activityService,
DailyActivityStatsRepository dailyStatsRepository,
LinkClickRepository linkClickRepository,
UserInviteRepository userInviteRepository,
UserRewardRepository userRewardRepository) {
this.activityService = activityService;
this.dailyStatsRepository = dailyStatsRepository;
this.linkClickRepository = linkClickRepository;
this.userInviteRepository = userInviteRepository;
this.userRewardRepository = userRewardRepository;
}
@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行
@@ -37,28 +52,90 @@ public class StatisticsAggregationJob {
LocalDate yesterday = LocalDate.now().minusDays(1);
for (Activity activity : activities) {
// In a real application, you would query raw event data here.
// For now, we simulate by calling the helper method.
// 从真实事件表聚合数据
DailyActivityStats stats = aggregateStatsForActivity(activity, yesterday);
// Upsert into persistence store for analytics queries
// 持久化到统计表
upsertDailyStats(stats);
log.info("为活动ID {} 聚合了数据: {} 次浏览, {} 次分享", activity.getId(), stats.getViews(), stats.getShares());
log.info("为活动ID {} 聚合了真实数据: {} 次浏览, {} 次分享, {} 次新注册, {} 次转化",
activity.getId(), stats.getViews(), stats.getShares(), stats.getNewRegistrations(), stats.getConversions());
}
log.info("每日活动数据聚合任务执行完成");
}
// This is a helper method for simulation and testing
@Scheduled(cron = "0 0 * * * ?") // 每小时执行一次,检查过期活动
public void processExpiredActivities() {
log.info("开始执行活动过期检查任务");
int processedCount = activityService.processExpiredActivities();
log.info("活动过期检查任务执行完成,共处理 {} 个过期活动", processedCount);
}
/**
* 从真实事件表聚合统计数据
* 不再使用随机数模拟
*/
public DailyActivityStats aggregateStatsForActivity(Activity activity, LocalDate date) {
Random random = new Random();
DailyActivityStats stats = new DailyActivityStats();
stats.setActivityId(activity.getId());
stats.setStatDate(date);
stats.setViews(1000 + random.nextInt(500));
stats.setShares(200 + random.nextInt(100));
stats.setNewRegistrations(50 + random.nextInt(50));
stats.setConversions(10 + random.nextInt(20));
// 转换日期范围
OffsetDateTime startOfDay = date.atStartOfDay().atOffset(ZoneOffset.UTC);
OffsetDateTime endOfDay = date.atTime(LocalTime.MAX).atOffset(ZoneOffset.UTC);
// 从 link_clicks 表聚合PV/UV/分享数
// PV: 总浏览次数(按日期范围)
long views = linkClickRepository.countViewsByActivityIdAndDateRange(
activity.getId(), startOfDay, endOfDay);
stats.setViews((int) views);
// UV: 按IP去重统计独立访客数
long uniqueVisitors = linkClickRepository.countUniqueVisitorsByActivityIdAndDateRange(
activity.getId(), startOfDay, endOfDay);
stats.setUniqueVisitors((int) uniqueVisitors);
// 分享数: 统计有inviterUserId的记录有效归因的分享点击
long shares = linkClickRepository.countSharesByActivityIdAndDateRange(
activity.getId(), startOfDay, endOfDay);
stats.setShares((int) shares);
// 新注册: 从 user_invites 表统计当天创建的邀请记录
List<?> invites = userInviteRepository.findByActivityId(activity.getId());
int newRegistrations = (int) invites.stream()
.filter(i -> {
try {
Object createdAt = i.getClass().getMethod("getCreatedAt").invoke(i);
if (createdAt instanceof OffsetDateTime) {
OffsetDateTime dt = (OffsetDateTime) createdAt;
return dt.toLocalDate().equals(date);
}
} catch (Exception e) {
// ignore
}
return false;
})
.count();
stats.setNewRegistrations(newRegistrations);
// 转化: 从 user_rewards 表统计当天发放的奖励
List<?> rewards = userRewardRepository.findByActivityId(activity.getId());
int conversions = (int) rewards.stream()
.filter(r -> {
try {
Object createdAt = r.getClass().getMethod("getCreatedAt").invoke(r);
if (createdAt instanceof OffsetDateTime) {
OffsetDateTime dt = (OffsetDateTime) createdAt;
return dt.toLocalDate().equals(date);
}
} catch (Exception e) {
// ignore
}
return false;
})
.count();
stats.setConversions(conversions);
dailyStats.put(activity.getId(), stats);
// Persist
// 持久化
upsertDailyStats(stats);
return stats;
}
@@ -70,6 +147,7 @@ public class StatisticsAggregationJob {
entity.setActivityId(stats.getActivityId());
entity.setStatDate(stats.getStatDate());
entity.setViews(stats.getViews());
entity.setUniqueVisitors(stats.getUniqueVisitors());
entity.setShares(stats.getShares());
entity.setNewRegistrations(stats.getNewRegistrations());
entity.setConversions(stats.getConversions());