fix(authorization): 修复团队升级竞态条件,改用事件链模式

问题:authorization-service 和 referral-service 并行消费 TreePlanted 事件,
导致升级检查时统计数据尚未更新完成。

解决方案:
- referral-service: 批量更新团队统计后发布 TeamStatisticsUpdatedEvent
- authorization-service: 监听该事件触发升级检查,替代原有的即时检查
- TeamStatistics 聚合添加 accountSequence 字段用于事件发布

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-24 00:13:39 -08:00
parent ca95c1decf
commit ed1f863919
9 changed files with 161 additions and 92 deletions

View File

@ -39,6 +39,24 @@ interface PlantingEventMessage {
}
}
/**
*
* referral-service TeamStatisticsUpdatedEvent
*/
interface TeamStatisticsUpdatedMessage {
eventId?: string
eventName?: string
occurredAt?: string
data?: {
userId: string
accountSequence: string
totalTeamCount: number
directReferralCount: number
leaderboardScore: number
updateReason: 'planting_added' | 'planting_removed' | 'member_joined' | 'recalculation'
}
}
@Controller()
export class EventConsumerController {
private readonly logger = new Logger(EventConsumerController.name)
@ -119,6 +137,60 @@ export class EventConsumerController {
}
}
/**
* -
* referral-service
*
*/
@EventPattern('referral.team-statistics.events')
async handleTeamStatisticsUpdated(
@Payload() message: TeamStatisticsUpdatedMessage,
@Ctx() context: KafkaContext,
) {
const eventName = message.eventName || 'unknown'
const eventId = message.eventId || 'unknown'
try {
this.logger.log(`[KAFKA] Received team statistics event: ${eventName}, eventId=${eventId}`)
this.logger.debug(`[KAFKA] Event payload: ${JSON.stringify(message)}`)
if (eventName === 'referral.team_statistics.updated' && message.data) {
const { accountSequence, totalTeamCount } = message.data
await this.checkUserAutoUpgrade(accountSequence, totalTeamCount)
}
} catch (error) {
this.logger.error('[KAFKA] Failed to handle team statistics event:', error)
}
}
/**
*
*
*/
private async checkUserAutoUpgrade(accountSequence: string, totalTeamCount: number): Promise<void> {
this.logger.debug(`[TEAM-AUTO-UPGRADE] Checking upgrade for ${accountSequence}, totalTeamCount=${totalTeamCount}`)
// 获取该用户的所有授权
const authProvince = await this.authorizationRepository.findByAccountSequenceAndRoleType(
accountSequence,
RoleType.AUTH_PROVINCE_COMPANY,
)
const authCity = await this.authorizationRepository.findByAccountSequenceAndRoleType(
accountSequence,
RoleType.AUTH_CITY_COMPANY,
)
// 检查省团队升级
if (authProvince && authProvince.benefitActive && authProvince.status === AuthorizationStatus.AUTHORIZED) {
await this.checkAuthProvinceUpgrade(authProvince)
}
// 检查市团队升级
if (authCity && authCity.benefitActive && authCity.status === AuthorizationStatus.AUTHORIZED) {
await this.checkAuthCityUpgrade(authCity)
}
}
/**
*
* 1.
@ -182,9 +254,9 @@ export class EventConsumerController {
}
}
// 4. 检查所有已激活市/省团队的自动升级条件
// 业务规则:市/省团队本人伞下认种数达到阈值时,团队本人获得区域授权
await this.checkAllTeamAutoUpgrade()
// 注意:自动升级检查已移至 handleTeamStatisticsUpdated
// 通过监听 referral.team-statistics.events 事件来触发升级检查
// 这样确保在 referral-service 完成统计更新后再进行检查,避免竞态条件
this.logger.log(`[PLANTING] Completed processing tree planted event for user ${userId}`)
} catch (error) {
@ -293,43 +365,6 @@ export class EventConsumerController {
private static readonly PROVINCE_UPGRADE_THRESHOLD = 50000 // 5万棵
private static readonly CITY_UPGRADE_THRESHOLD = 10000 // 1万棵
/**
* /
*
* - (AUTH_PROVINCE_COMPANY)5(PROVINCE_COMPANY)
* - (AUTH_CITY_COMPANY)1(CITY_COMPANY)
*
*/
private async checkAllTeamAutoUpgrade(): Promise<void> {
this.logger.debug('[TEAM-AUTO-UPGRADE] Starting check for all active team authorizations')
// 并行检查省团队和市团队
await Promise.all([
this.checkAllAuthProvinceUpgrade(),
this.checkAllAuthCityUpgrade(),
])
this.logger.debug('[TEAM-AUTO-UPGRADE] Completed check for all active team authorizations')
}
/**
*
*/
private async checkAllAuthProvinceUpgrade(): Promise<void> {
// 1. 获取所有权益已激活的省团队授权
const activeAuthProvinces = await this.authorizationRepository.findAllActiveAuthProvinceCompanies()
if (activeAuthProvinces.length === 0) {
this.logger.debug('[TEAM-AUTO-UPGRADE] No active auth province companies found')
return
}
this.logger.debug(`[TEAM-AUTO-UPGRADE] Found ${activeAuthProvinces.length} active auth province companies`)
// 2. 逐个检查是否达到升级阈值
for (const authProvince of activeAuthProvinces) {
await this.checkAuthProvinceUpgrade(authProvince)
}
}
/**
*
@ -403,25 +438,6 @@ export class EventConsumerController {
this.logger.log(`[TEAM-AUTO-UPGRADE] Successfully auto upgraded auth province ${accountSequence} to province company: ${provinceName}`)
}
/**
*
*/
private async checkAllAuthCityUpgrade(): Promise<void> {
// 1. 获取所有权益已激活的市团队授权
const activeAuthCities = await this.authorizationRepository.findAllActiveAuthCityCompanies()
if (activeAuthCities.length === 0) {
this.logger.debug('[TEAM-AUTO-UPGRADE] No active auth city companies found')
return
}
this.logger.debug(`[TEAM-AUTO-UPGRADE] Found ${activeAuthCities.length} active auth city companies`)
// 2. 逐个检查是否达到升级阈值
for (const authCity of activeAuthCities) {
await this.checkAuthCityUpgrade(authCity)
}
}
/**
*
* 1

View File

@ -93,7 +93,7 @@ export class ReferralService {
const saved = await this.referralRepo.save(relationship);
// 创建团队统计记录
await this.teamStatsRepo.create(command.userId);
await this.teamStatsRepo.create(command.userId, command.accountSequence);
// 如果有推荐人,更新推荐人的直推计数
if (referrerId) {
@ -101,7 +101,10 @@ export class ReferralService {
// 如果推荐人没有统计记录(历史用户),先创建
if (!referrerStats) {
this.logger.warn(`Creating missing team statistics for referrer ${referrerId}`);
referrerStats = await this.teamStatsRepo.create(referrerId);
// 获取推荐人的 accountSequence
const referrerRelationship = await this.referralRepo.findByUserId(referrerId);
const referrerAccountSequence = referrerRelationship?.accountSequence ?? '';
referrerStats = await this.teamStatsRepo.create(referrerId, referrerAccountSequence);
}
referrerStats.addDirectReferral(command.userId);
await this.teamStatsRepo.save(referrerStats);

View File

@ -5,6 +5,7 @@ import {
TEAM_STATISTICS_REPOSITORY,
ITeamStatisticsRepository,
ReferralChainService,
TeamStatisticsUpdatedEvent,
} from '../../domain';
import { EventPublisherService } from '../../infrastructure';
import { UpdateTeamStatisticsCommand } from '../commands';
@ -78,6 +79,26 @@ export class TeamStatisticsService {
// 批量更新
await this.teamStatsRepo.batchUpdateTeamCounts(updates);
// 批量更新后,为所有更新的祖先发布 TeamStatisticsUpdatedEvent
// 这是为了让 authorization-service 可以在统计数据更新后进行升级检查
const updatedStats = await this.teamStatsRepo.findByUserIds(ancestors);
const events = updatedStats.map(
(stats) =>
new TeamStatisticsUpdatedEvent(
stats.userId,
stats.accountSequence,
stats.teamPlantingCount, // totalTeamCount 即团队认种数
stats.directReferralCount,
stats.leaderboardScore,
'planting_added',
),
);
if (events.length > 0) {
await this.eventPublisher.publishDomainEvents(events);
this.logger.log(`Published ${events.length} TeamStatisticsUpdatedEvent for ancestors`);
}
this.logger.log(
`Updated team statistics for ${ancestors.length} ancestors of accountSequence ${command.accountSequence}`,
);

View File

@ -9,6 +9,7 @@ export interface DirectReferralStats {
export interface TeamStatisticsProps {
id: bigint;
userId: bigint;
accountSequence: string; // 账户序列号,用于事件发布
directReferralCount: number;
totalTeamCount: number;
personalPlantingCount: number;
@ -36,6 +37,7 @@ export class TeamStatistics {
private constructor(
private readonly _id: bigint,
private readonly _userId: UserId,
private readonly _accountSequence: string, // 账户序列号
private _directReferralCount: number,
private _totalTeamCount: number,
private _personalPlantingCount: number,
@ -55,6 +57,9 @@ export class TeamStatistics {
get userId(): bigint {
return this._userId.value;
}
get accountSequence(): string {
return this._accountSequence;
}
get directReferralCount(): number {
return this._directReferralCount;
}
@ -92,11 +97,12 @@ export class TeamStatistics {
/**
* ()
*/
static create(userId: bigint): TeamStatistics {
static create(userId: bigint, accountSequence: string): TeamStatistics {
const now = new Date();
return new TeamStatistics(
0n,
UserId.create(userId),
accountSequence,
0,
0,
0,
@ -124,6 +130,7 @@ export class TeamStatistics {
return new TeamStatistics(
props.id,
UserId.create(props.userId),
props.accountSequence,
props.directReferralCount,
props.totalTeamCount,
props.personalPlantingCount,
@ -181,6 +188,7 @@ export class TeamStatistics {
this._domainEvents.push(
new TeamStatisticsUpdatedEvent(
this._userId.value,
this._accountSequence,
this._totalTeamCount,
this._directReferralCount,
this._leaderboardScore.score,
@ -207,6 +215,7 @@ export class TeamStatistics {
this._domainEvents.push(
new TeamStatisticsUpdatedEvent(
this._userId.value,
this._accountSequence,
this._teamPlantingCount,
this._directReferralCount,
this._leaderboardScore.score,
@ -252,6 +261,7 @@ export class TeamStatistics {
return {
id: this._id,
userId: this._userId.value,
accountSequence: this._accountSequence,
directReferralCount: this._directReferralCount,
totalTeamCount: this._totalTeamCount,
personalPlantingCount: this._personalPlantingCount,

View File

@ -6,6 +6,7 @@ import { DomainEvent } from './domain-event.base';
export class TeamStatisticsUpdatedEvent extends DomainEvent {
constructor(
public readonly userId: bigint,
public readonly accountSequence: string,
public readonly totalTeamCount: number,
public readonly directReferralCount: number,
public readonly leaderboardScore: number,
@ -25,6 +26,7 @@ export class TeamStatisticsUpdatedEvent extends DomainEvent {
occurredAt: this.occurredAt.toISOString(),
data: {
userId: this.userId.toString(),
accountSequence: this.accountSequence,
totalTeamCount: this.totalTeamCount,
directReferralCount: this.directReferralCount,
leaderboardScore: this.leaderboardScore,

View File

@ -58,7 +58,7 @@ export interface ITeamStatisticsRepository {
/**
*
*/
create(userId: bigint): Promise<TeamStatistics>;
create(userId: bigint, accountSequence: string): Promise<TeamStatistics>;
}
export const TEAM_STATISTICS_REPOSITORY = Symbol('ITeamStatisticsRepository');

View File

@ -88,6 +88,11 @@ export class TeamStatisticsRepository implements ITeamStatisticsRepository {
async findByUserId(userId: bigint): Promise<TeamStatistics | null> {
const record = await this.prisma.teamStatistics.findUnique({
where: { userId },
include: {
referralRelationship: {
select: { accountSequence: true },
},
},
});
if (!record) return null;
@ -98,12 +103,19 @@ export class TeamStatisticsRepository implements ITeamStatisticsRepository {
select: { referralId: true, teamPlantingCount: true },
});
return TeamStatistics.reconstitute(this.mapToProps(record, directReferrals));
return TeamStatistics.reconstitute(
this.mapToProps(record, directReferrals, record.referralRelationship?.accountSequence ?? ''),
);
}
async findByUserIds(userIds: bigint[]): Promise<TeamStatistics[]> {
const records = await this.prisma.teamStatistics.findMany({
where: { userId: { in: userIds } },
include: {
referralRelationship: {
select: { accountSequence: true },
},
},
});
const result: TeamStatistics[] = [];
@ -112,7 +124,9 @@ export class TeamStatisticsRepository implements ITeamStatisticsRepository {
where: { referrerId: record.userId },
select: { referralId: true, teamPlantingCount: true },
});
result.push(TeamStatistics.reconstitute(this.mapToProps(record, directReferrals)));
result.push(TeamStatistics.reconstitute(
this.mapToProps(record, directReferrals, record.referralRelationship?.accountSequence ?? ''),
));
}
return result;
@ -240,7 +254,7 @@ export class TeamStatisticsRepository implements ITeamStatisticsRepository {
});
}
async create(userId: bigint): Promise<TeamStatistics> {
async create(userId: bigint, accountSequence: string): Promise<TeamStatistics> {
const created = await this.prisma.teamStatistics.create({
data: {
userId,
@ -255,7 +269,7 @@ export class TeamStatisticsRepository implements ITeamStatisticsRepository {
},
});
return TeamStatistics.reconstitute(this.mapToProps(created, []));
return TeamStatistics.reconstitute(this.mapToProps(created, [], accountSequence));
}
private mapToProps(
@ -277,6 +291,7 @@ export class TeamStatisticsRepository implements ITeamStatisticsRepository {
referralId: bigint;
teamPlantingCount: number;
}>,
accountSequence: string,
): TeamStatisticsProps {
const directReferralStats: DirectReferralStats[] = directReferrals.map((dr) => ({
referralId: dr.referralId,
@ -286,6 +301,7 @@ export class TeamStatisticsRepository implements ITeamStatisticsRepository {
return {
id: record.id,
userId: record.userId,
accountSequence,
directReferralCount: record.directReferralCount,
totalTeamCount: record.totalTeamCount,
personalPlantingCount: record.selfPlantingCount,

View File

@ -4,7 +4,7 @@ import { TeamStatisticsUpdatedEvent } from '../../../src/domain/events';
describe('TeamStatistics Aggregate', () => {
describe('create', () => {
it('should create empty team statistics', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
expect(stats.userId).toBe(100n);
expect(stats.directReferralCount).toBe(0);
@ -20,6 +20,7 @@ describe('TeamStatistics Aggregate', () => {
const props = {
id: 1n,
userId: 100n,
accountSequence: 'D25010100001',
directReferralCount: 5,
totalTeamCount: 100,
personalPlantingCount: 10,
@ -48,7 +49,7 @@ describe('TeamStatistics Aggregate', () => {
describe('addDirectReferral', () => {
it('should increment direct referral count', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addDirectReferral(200n);
expect(stats.directReferralCount).toBe(1);
@ -60,7 +61,7 @@ describe('TeamStatistics Aggregate', () => {
describe('addPersonalPlanting', () => {
it('should add personal planting count', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addPersonalPlanting(10, '110000', '110100');
@ -69,7 +70,7 @@ describe('TeamStatistics Aggregate', () => {
});
it('should emit TeamStatisticsUpdatedEvent', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addPersonalPlanting(10, '110000', '110100');
expect(stats.domainEvents.length).toBe(1);
@ -77,7 +78,7 @@ describe('TeamStatistics Aggregate', () => {
});
it('should update province/city distribution', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addPersonalPlanting(10, '110000', '110100');
stats.addPersonalPlanting(5, '110000', '110100');
@ -88,7 +89,7 @@ describe('TeamStatistics Aggregate', () => {
describe('addTeamPlanting', () => {
it('should add team planting count', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addTeamPlanting(20, '110000', '110100', 200n);
@ -97,7 +98,7 @@ describe('TeamStatistics Aggregate', () => {
});
it('should track direct referral team count', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addDirectReferral(200n);
stats.addDirectReferral(300n);
@ -110,7 +111,7 @@ describe('TeamStatistics Aggregate', () => {
});
it('should recalculate leaderboard score', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addDirectReferral(200n);
stats.addDirectReferral(300n);
@ -124,7 +125,7 @@ describe('TeamStatistics Aggregate', () => {
describe('getDirectReferralStats', () => {
it('should return copy of direct referral stats', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addDirectReferral(200n);
stats.addTeamPlanting(30, '110000', '110100', 200n);
@ -138,7 +139,7 @@ describe('TeamStatistics Aggregate', () => {
describe('clearDomainEvents', () => {
it('should clear domain events', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addPersonalPlanting(10, '110000', '110100');
expect(stats.domainEvents.length).toBe(1);
@ -149,7 +150,7 @@ describe('TeamStatistics Aggregate', () => {
describe('toPersistence', () => {
it('should convert to persistence format', () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addDirectReferral(200n);
stats.addTeamPlanting(30, '110000', '110100', 200n);

View File

@ -30,7 +30,7 @@ describe('TeamStatisticsRepository (Integration)', () => {
describe('create', () => {
it('should create new team statistics', async () => {
const stats = await repository.create(100n);
const stats = await repository.create(100n, 'D25010100001');
expect(stats).toBeDefined();
expect(stats.userId).toBe(100n);
@ -42,7 +42,7 @@ describe('TeamStatisticsRepository (Integration)', () => {
describe('save', () => {
it('should save team statistics', async () => {
const stats = TeamStatistics.create(100n);
const stats = TeamStatistics.create(100n, 'D25010100001');
stats.addPersonalPlanting(10, '110000', '110100');
const saved = await repository.save(stats);
@ -54,10 +54,10 @@ describe('TeamStatisticsRepository (Integration)', () => {
it('should update existing statistics', async () => {
// Create initial
const stats = await repository.create(100n);
const stats = await repository.create(100n, 'D25010100001');
// Create new instance and add planting
const updated = TeamStatistics.create(100n);
const updated = TeamStatistics.create(100n, 'D25010100001');
updated.addPersonalPlanting(20, '110000', '110100');
const saved = await repository.save(updated);
@ -67,7 +67,7 @@ describe('TeamStatisticsRepository (Integration)', () => {
describe('findByUserId', () => {
it('should find statistics by user ID', async () => {
await repository.create(100n);
await repository.create(100n, 'D25010100001');
const found = await repository.findByUserId(100n);
@ -83,9 +83,9 @@ describe('TeamStatisticsRepository (Integration)', () => {
describe('findByUserIds', () => {
it('should find statistics for multiple users', async () => {
await repository.create(100n);
await repository.create(101n);
await repository.create(102n);
await repository.create(100n, 'D25010100001');
await repository.create(101n, 'D25010100002');
await repository.create(102n, 'D25010100003');
const found = await repository.findByUserIds([100n, 101n]);
@ -98,15 +98,15 @@ describe('TeamStatisticsRepository (Integration)', () => {
describe('getLeaderboard', () => {
it('should return leaderboard sorted by score', async () => {
// Create users with different scores
const stats1 = TeamStatistics.create(100n);
const stats1 = TeamStatistics.create(100n, 'D25010100001');
stats1.addPersonalPlanting(50, '110000', '110100');
await repository.save(stats1);
const stats2 = TeamStatistics.create(101n);
const stats2 = TeamStatistics.create(101n, 'D25010100002');
stats2.addPersonalPlanting(100, '110000', '110100');
await repository.save(stats2);
const stats3 = TeamStatistics.create(102n);
const stats3 = TeamStatistics.create(102n, 'D25010100003');
stats3.addPersonalPlanting(30, '110000', '110100');
await repository.save(stats3);
@ -122,7 +122,7 @@ describe('TeamStatisticsRepository (Integration)', () => {
it('should respect limit and offset', async () => {
for (let i = 0; i < 10; i++) {
const stats = TeamStatistics.create(BigInt(100 + i));
const stats = TeamStatistics.create(BigInt(100 + i), `D2501010000${i}`);
stats.addPersonalPlanting(10 + i, '110000', '110100');
await repository.save(stats);
}
@ -136,11 +136,11 @@ describe('TeamStatisticsRepository (Integration)', () => {
describe('getUserRank', () => {
it('should return correct rank', async () => {
const stats1 = TeamStatistics.create(100n);
const stats1 = TeamStatistics.create(100n, 'D25010100001');
stats1.addPersonalPlanting(100, '110000', '110100');
await repository.save(stats1);
const stats2 = TeamStatistics.create(101n);
const stats2 = TeamStatistics.create(101n, 'D25010100002');
stats2.addPersonalPlanting(50, '110000', '110100');
await repository.save(stats2);