feat(presence-service): 完整实现 DDD+Hexagonal 架构的用户活跃度与在线状态服务

Domain 层 (领域层):
- 值对象: InstallId, EventName, EventProperties, DeviceInfo, TimeWindow
- 实体: EventLog, OnlineSnapshot
- 聚合根: DailyActiveStats
- 领域事件: SessionStartedEvent, HeartbeatReceivedEvent, DauCalculatedEvent
- 仓储接口: IEventLogRepository, IDailyActiveStatsRepository, IOnlineSnapshotRepository
- 领域服务: DauCalculationService, OnlineDetectionService

Infrastructure 层 (基础设施层):
- Prisma: EventLog, DailyActiveStats, OnlineSnapshot 数据模型
- Redis: 在线状态存储 (ZSET) + HyperLogLog DAU 实时统计
- Kafka: 事件发布服务 (可选)
- Mappers: 领域对象 <-> Prisma 模型转换
- 仓储实现: EventLogRepositoryImpl, DailyActiveStatsRepositoryImpl, OnlineSnapshotRepositoryImpl

Application 层 (应用层):
- Commands: RecordEvents, RecordHeartbeat, CalculateDau
- Queries: GetOnlineCount, GetDauStats
- Schedulers: 每分钟记录在线快照, 每小时清理过期数据, 每天凌晨计算前一天DAU

API 层 (表现层):
- Controllers: AnalyticsController, PresenceController, HealthController
- DTOs: BatchEventsDto, HeartbeatDto, QueryDauDto, OnlineCountResponseDto, DauStatsResponseDto
- Guards: JwtAuthGuard
- Decorators: @Public, @CurrentUser

核心功能:
-  用户行为事件批量上报与存储
-  日活 DAU 统计 (按自然日去重, 支持省/市维度)
-  实时在线人数统计 (3分钟窗口)
-  心跳机制 (前台60秒间隔)
-  HyperLogLog 实时 DAU 计数
-  定时任务 (快照记录, 过期清理, DAU 计算)
-  Swagger API 文档
-  Docker 多阶段构建
-  健康检查

技术栈:
- NestJS + TypeScript
- Prisma ORM + PostgreSQL
- Redis (ioredis)
- Kafka (kafkajs, 可选)
- CQRS 模式 (@nestjs/cqrs)
- 定时任务 (@nestjs/schedule)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Developer 2025-12-02 12:11:38 -08:00
parent 996bc231f2
commit 0be3fe619e
64 changed files with 2163 additions and 72 deletions

View File

@ -0,0 +1,10 @@
node_modules
dist
npm-debug.log
.env
.env.local
.env.*.local
.git
.gitignore
README.md
analytics-presence-service-dev-guide.md

View File

@ -0,0 +1,25 @@
# 应用配置
NODE_ENV=development
APP_PORT=3001
API_PREFIX=api/v1
# 数据库
DATABASE_URL=postgresql://user:password@localhost:5432/rwa_analytics?schema=public
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
# JWT (与 Identity Service 共用)
JWT_SECRET=your-jwt-secret
JWT_EXPIRES_IN=7d
# Kafka
KAFKA_ENABLED=false
KAFKA_BROKERS=localhost:9092
KAFKA_TOPIC_ANALYTICS=analytics-events
# 时区
TZ=Asia/Shanghai

View File

@ -0,0 +1,56 @@
# Build stage
FROM node:20-alpine AS builder
WORKDIR /app
# 复制依赖文件
COPY package*.json ./
RUN npm ci
# 复制 Prisma schema 并生成客户端
COPY prisma ./prisma/
RUN DATABASE_URL="postgresql://user:pass@localhost:5432/db" npx prisma generate
# 复制源代码并构建
COPY src ./src/
COPY tsconfig.json nest-cli.json ./
RUN npm run build
# 验证构建产物
RUN ls -la dist/ && test -f dist/main.js
# Production stage
FROM node:20-slim
WORKDIR /app
# 安装必要的系统依赖 (OpenSSL for Prisma, curl for healthcheck)
RUN apt-get update && apt-get install -y --no-install-recommends \
openssl \
curl \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件并安装生产依赖
COPY package*.json ./
RUN npm ci --only=production
# 复制 Prisma schema 并生成客户端
COPY prisma ./prisma/
RUN DATABASE_URL="postgresql://user:pass@localhost:5432/db" npx prisma generate
# 复制构建产物
COPY --from=builder /app/dist ./dist/
# 创建非 root 用户
RUN groupadd -g 1001 nodejs && \
useradd -u 1001 -g nodejs nestjs
USER nestjs
EXPOSE 3001
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=40s --retries=3 \
CMD curl -f http://localhost:3001/api/v1/health || exit 1
CMD ["node", "dist/main.js"]

View File

@ -25,15 +25,20 @@
"@nestjs/common": "^10.0.0",
"@nestjs/config": "^3.1.1",
"@nestjs/core": "^10.0.0",
"@nestjs/cqrs": "^10.2.7",
"@nestjs/platform-express": "^10.0.0",
"@nestjs/schedule": "^4.0.0",
"@nestjs/swagger": "^7.1.17",
"@prisma/client": "^5.7.0",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"date-fns": "^3.0.0",
"date-fns-tz": "^2.0.0",
"ioredis": "^5.3.2",
"kafkajs": "^2.2.4",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.8.1"
"rxjs": "^7.8.1",
"uuid": "^9.0.1"
},
"devDependencies": {
"@nestjs/cli": "^10.0.0",
@ -41,6 +46,7 @@
"@nestjs/testing": "^10.0.0",
"@types/express": "^4.17.17",
"@types/node": "^20.3.1",
"@types/uuid": "^9.0.7",
"@typescript-eslint/eslint-plugin": "^6.0.0",
"@typescript-eslint/parser": "^6.0.0",
"eslint": "^8.42.0",

View File

@ -11,82 +11,41 @@ datasource db {
url = env("DATABASE_URL")
}
// =============================================================================
// Heartbeat (在线心跳记录)
// =============================================================================
// 事件日志表 (append-only)
model EventLog {
id BigInt @id @default(autoincrement())
userId BigInt? @map("user_id")
installId String @map("install_id") @db.VarChar(64)
eventName String @map("event_name") @db.VarChar(64)
eventTime DateTime @map("event_time") @db.Timestamptz()
properties Json? @db.JsonB
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz()
model Heartbeat {
id String @id @default(uuid())
installId String // 设备唯一安装ID
sessionId String // 会话ID
userId String? // 用户ID (可选未登录时为null)
timestamp DateTime // 心跳时间戳
createdAt DateTime @default(now())
@@index([installId, timestamp])
@@index([userId, timestamp])
@@index([sessionId])
@@index([timestamp])
@@map("heartbeats")
@@index([eventTime], name: "idx_event_log_event_time")
@@index([eventName], name: "idx_event_log_event_name")
@@index([eventName, eventTime], name: "idx_event_log_event_name_time")
@@map("analytics_event_log")
}
// =============================================================================
// Daily Active Users (DAU 日活统计)
// =============================================================================
// 日活统计表
model DailyActiveStats {
day DateTime @id @map("day") @db.Date
dauCount Int @map("dau_count")
dauByProvince Json? @map("dau_by_province") @db.JsonB
dauByCity Json? @map("dau_by_city") @db.JsonB
calculatedAt DateTime @map("calculated_at") @db.Timestamptz()
version Int @default(1)
model DailyActiveUser {
id String @id @default(uuid())
date DateTime @db.Date // 统计日期 (YYYY-MM-DD)
installId String // 设备唯一安装ID
userId String? // 用户ID (可选)
firstSeen DateTime // 当日首次出现时间
lastSeen DateTime // 当日最后出现时间
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([date, installId])
@@index([date])
@@index([userId, date])
@@map("daily_active_users")
@@map("analytics_daily_active_users")
}
// =============================================================================
// Online Presence (实时在线状态)
// 使用 Redis 存储,此表仅用于历史记录
// =============================================================================
// 在线人数快照表
model OnlineSnapshot {
id BigInt @id @default(autoincrement())
ts DateTime @unique @db.Timestamptz()
onlineCount Int @map("online_count")
windowSeconds Int @default(180) @map("window_seconds")
model OnlineSession {
id String @id @default(uuid())
installId String // 设备唯一安装ID
sessionId String @unique // 会话ID
userId String? // 用户ID (可选)
startTime DateTime // 会话开始时间
lastHeartbeat DateTime // 最后心跳时间
endTime DateTime? // 会话结束时间 (null表示仍在线)
duration Int? // 会话时长 (秒)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@index([installId])
@@index([userId])
@@index([sessionId])
@@index([startTime])
@@map("online_sessions")
}
// =============================================================================
// DAU Statistics (DAU 统计汇总)
// =============================================================================
model DauStatistics {
id String @id @default(uuid())
date DateTime @unique @db.Date // 统计日期
totalDau Int // 总DAU (按installId去重)
uniqueUsers Int // 唯一用户数 (按userId去重, 不含匿名)
anonymousUsers Int // 匿名用户数 (userId为null)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@index([date])
@@map("dau_statistics")
@@index([ts(sort: Desc)], name: "idx_online_snapshots_ts")
@@map("analytics_online_snapshots")
}

View File

@ -0,0 +1,15 @@
import { Module } from '@nestjs/common';
import { ApplicationModule } from '../application/application.module';
import { AnalyticsController } from './controllers/analytics.controller';
import { PresenceController } from './controllers/presence.controller';
import { HealthController } from './controllers/health.controller';
@Module({
imports: [ApplicationModule],
controllers: [
AnalyticsController,
PresenceController,
HealthController,
],
})
export class ApiModule {}

View File

@ -0,0 +1,36 @@
import { Controller, Post, Get, Body, Query, UseGuards } from '@nestjs/common';
import { CommandBus, QueryBus } from '@nestjs/cqrs';
import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger';
import { BatchEventsDto } from '../dto/request/batch-events.dto';
import { QueryDauDto } from '../dto/request/query-dau.dto';
import { DauStatsResponseDto } from '../dto/response/dau-stats.dto';
import { RecordEventsCommand } from '../../application/commands/record-events/record-events.command';
import { GetDauStatsQuery } from '../../application/queries/get-dau-stats/get-dau-stats.query';
import { Public } from '../../shared/decorators/public.decorator';
import { JwtAuthGuard } from '../../shared/guards/jwt-auth.guard';
@ApiTags('Analytics')
@Controller('analytics')
export class AnalyticsController {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus,
) {}
@Post('events')
@Public()
@ApiOperation({ summary: '批量上报事件' })
async batchEvents(@Body() dto: BatchEventsDto) {
return this.commandBus.execute(new RecordEventsCommand(dto.events));
}
@Get('dau')
@UseGuards(JwtAuthGuard)
@ApiBearerAuth()
@ApiOperation({ summary: '查询DAU统计' })
async getDauStats(@Query() dto: QueryDauDto): Promise<DauStatsResponseDto> {
return this.queryBus.execute(
new GetDauStatsQuery(new Date(dto.startDate), new Date(dto.endDate)),
);
}
}

View File

@ -0,0 +1,18 @@
import { Controller, Get } from '@nestjs/common';
import { ApiTags, ApiOperation } from '@nestjs/swagger';
import { Public } from '../../shared/decorators/public.decorator';
@ApiTags('Health')
@Controller('health')
export class HealthController {
@Get()
@Public()
@ApiOperation({ summary: '健康检查' })
check() {
return {
status: 'ok',
service: 'presence-service',
timestamp: new Date().toISOString(),
};
}
}

View File

@ -0,0 +1,49 @@
import { Controller, Post, Get, Body, UseGuards } from '@nestjs/common';
import { CommandBus, QueryBus } from '@nestjs/cqrs';
import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger';
import { HeartbeatDto } from '../dto/request/heartbeat.dto';
import { OnlineCountResponseDto } from '../dto/response/online-count.dto';
import { RecordHeartbeatCommand } from '../../application/commands/record-heartbeat/record-heartbeat.command';
import { GetOnlineCountQuery } from '../../application/queries/get-online-count/get-online-count.query';
import { JwtAuthGuard } from '../../shared/guards/jwt-auth.guard';
import { CurrentUser } from '../../shared/decorators/current-user.decorator';
@ApiTags('Presence')
@Controller('presence')
export class PresenceController {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus,
) {}
@Post('heartbeat')
@UseGuards(JwtAuthGuard)
@ApiBearerAuth()
@ApiOperation({ summary: '心跳上报' })
async heartbeat(
@CurrentUser('userId') userId: bigint,
@Body() dto: HeartbeatDto,
) {
return this.commandBus.execute(
new RecordHeartbeatCommand(
userId,
dto.installId,
dto.appVersion,
dto.clientTs,
),
);
}
@Get('online-count')
@UseGuards(JwtAuthGuard)
@ApiBearerAuth()
@ApiOperation({ summary: '获取当前在线人数' })
async getOnlineCount(): Promise<OnlineCountResponseDto> {
const result = await this.queryBus.execute(new GetOnlineCountQuery());
return {
count: result.count,
windowSeconds: result.windowSeconds,
queriedAt: result.queriedAt.toISOString(),
};
}
}

View File

@ -0,0 +1,35 @@
import { IsArray, IsString, IsOptional, IsNumber, ValidateNested, IsObject } from 'class-validator';
import { Type } from 'class-transformer';
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
export class EventItemDto {
@ApiProperty({ description: '事件名称', example: 'app_session_start' })
@IsString()
eventName: string;
@ApiPropertyOptional({ description: '用户ID (登录用户)', example: '12345' })
@IsOptional()
@IsString()
userId?: string;
@ApiProperty({ description: '安装ID', example: 'uuid-xxx-xxx' })
@IsString()
installId: string;
@ApiProperty({ description: '客户端时间戳 (秒)', example: 1732685100 })
@IsNumber()
clientTs: number;
@ApiPropertyOptional({ description: '事件属性' })
@IsOptional()
@IsObject()
properties?: Record<string, unknown>;
}
export class BatchEventsDto {
@ApiProperty({ type: [EventItemDto], description: '事件列表' })
@IsArray()
@ValidateNested({ each: true })
@Type(() => EventItemDto)
events: EventItemDto[];
}

View File

@ -0,0 +1,16 @@
import { IsString, IsNumber } from 'class-validator';
import { ApiProperty } from '@nestjs/swagger';
export class HeartbeatDto {
@ApiProperty({ description: '安装ID', example: 'uuid-xxx-xxx' })
@IsString()
installId: string;
@ApiProperty({ description: 'App版本', example: '1.0.0' })
@IsString()
appVersion: string;
@ApiProperty({ description: '客户端时间戳 (秒)', example: 1732685100 })
@IsNumber()
clientTs: number;
}

View File

@ -0,0 +1,12 @@
import { IsDateString } from 'class-validator';
import { ApiProperty } from '@nestjs/swagger';
export class QueryDauDto {
@ApiProperty({ description: '开始日期', example: '2025-01-01' })
@IsDateString()
startDate: string;
@ApiProperty({ description: '结束日期', example: '2025-01-15' })
@IsDateString()
endDate: string;
}

View File

@ -0,0 +1,23 @@
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
export class DauDayItemDto {
@ApiProperty({ description: '日期', example: '2025-01-15' })
day: string;
@ApiProperty({ description: 'DAU', example: 5678 })
dauCount: number;
@ApiPropertyOptional({ description: '按省份统计' })
byProvince?: Record<string, number>;
@ApiPropertyOptional({ description: '按城市统计' })
byCity?: Record<string, number>;
}
export class DauStatsResponseDto {
@ApiProperty({ type: [DauDayItemDto], description: 'DAU数据' })
data: DauDayItemDto[];
@ApiProperty({ description: '记录数', example: 15 })
total: number;
}

View File

@ -0,0 +1,12 @@
import { ApiProperty } from '@nestjs/swagger';
export class OnlineCountResponseDto {
@ApiProperty({ description: '在线人数', example: 1234 })
count: number;
@ApiProperty({ description: '时间窗口(秒)', example: 180 })
windowSeconds: number;
@ApiProperty({ description: '查询时间', example: '2025-01-15T10:30:00.000Z' })
queriedAt: string;
}

View File

@ -0,0 +1,22 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ScheduleModule } from '@nestjs/schedule';
import { ApiModule } from './api/api.module';
import { ApplicationModule } from './application/application.module';
import { DomainModule } from './domain/domain.module';
import { InfrastructureModule } from './infrastructure/infrastructure.module';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ['.env.local', '.env'],
}),
ScheduleModule.forRoot(),
DomainModule,
InfrastructureModule,
ApplicationModule,
ApiModule,
],
})
export class AppModule {}

View File

@ -0,0 +1,32 @@
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { DomainModule } from '../domain/domain.module';
import { InfrastructureModule } from '../infrastructure/infrastructure.module';
import { RecordEventsHandler } from './commands/record-events/record-events.handler';
import { RecordHeartbeatHandler } from './commands/record-heartbeat/record-heartbeat.handler';
import { CalculateDauHandler } from './commands/calculate-dau/calculate-dau.handler';
import { GetOnlineCountHandler } from './queries/get-online-count/get-online-count.handler';
import { GetDauStatsHandler } from './queries/get-dau-stats/get-dau-stats.handler';
import { AnalyticsScheduler } from './schedulers/analytics.scheduler';
const CommandHandlers = [
RecordEventsHandler,
RecordHeartbeatHandler,
CalculateDauHandler,
];
const QueryHandlers = [
GetOnlineCountHandler,
GetDauStatsHandler,
];
@Module({
imports: [CqrsModule, DomainModule, InfrastructureModule],
providers: [
...CommandHandlers,
...QueryHandlers,
AnalyticsScheduler,
],
exports: [CqrsModule],
})
export class ApplicationModule {}

View File

@ -0,0 +1,3 @@
export class CalculateDauCommand {
constructor(public readonly date: Date) {}
}

View File

@ -0,0 +1,61 @@
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { CalculateDauCommand } from './calculate-dau.command';
import {
IEventLogRepository,
EVENT_LOG_REPOSITORY,
} from '../../../domain/repositories/event-log.repository.interface';
import {
IDailyActiveStatsRepository,
DAILY_ACTIVE_STATS_REPOSITORY,
} from '../../../domain/repositories/daily-active-stats.repository.interface';
import { DauCalculationService } from '../../../domain/services/dau-calculation.service';
import { EventName } from '../../../domain/value-objects/event-name.vo';
import {
startOfDayInTimezone,
endOfDayInTimezone,
} from '../../../shared/utils/timezone.util';
@Injectable()
@CommandHandler(CalculateDauCommand)
export class CalculateDauHandler implements ICommandHandler<CalculateDauCommand> {
private readonly logger = new Logger(CalculateDauHandler.name);
constructor(
@Inject(EVENT_LOG_REPOSITORY)
private readonly eventLogRepository: IEventLogRepository,
@Inject(DAILY_ACTIVE_STATS_REPOSITORY)
private readonly dauStatsRepository: IDailyActiveStatsRepository,
private readonly dauCalculationService: DauCalculationService,
) {}
async execute(command: CalculateDauCommand): Promise<void> {
const { date } = command;
const timezone = 'Asia/Shanghai';
const startOfDay = startOfDayInTimezone(date, timezone);
const endOfDay = endOfDayInTimezone(date, timezone);
this.logger.log(`Calculating DAU for ${date.toISOString().split('T')[0]}`);
// 1. 查询去重用户数
const result = await this.eventLogRepository.queryDau(
EventName.APP_SESSION_START,
startOfDay,
endOfDay,
);
// 2. 创建或更新统计聚合
const existingStats = await this.dauStatsRepository.findByDate(date);
if (existingStats) {
existingStats.recalculate(result.total, result.byProvince, result.byCity);
await this.dauStatsRepository.upsert(existingStats);
} else {
const stats = this.dauCalculationService.createStatsFromQueryResult(date, result);
await this.dauStatsRepository.upsert(stats);
}
this.logger.log(`DAU calculated: ${result.total} users`);
}
}

View File

@ -0,0 +1,11 @@
export interface EventItemDto {
eventName: string;
userId?: string;
installId: string;
clientTs: number;
properties?: Record<string, unknown>;
}
export class RecordEventsCommand {
constructor(public readonly events: EventItemDto[]) {}
}

View File

@ -0,0 +1,97 @@
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { Inject, Injectable } from '@nestjs/common';
import { RecordEventsCommand, EventItemDto } from './record-events.command';
import { EventLog } from '../../../domain/entities/event-log.entity';
import { InstallId } from '../../../domain/value-objects/install-id.vo';
import { EventName } from '../../../domain/value-objects/event-name.vo';
import { EventProperties } from '../../../domain/value-objects/event-properties.vo';
import {
IEventLogRepository,
EVENT_LOG_REPOSITORY,
} from '../../../domain/repositories/event-log.repository.interface';
import { RedisService } from '../../../infrastructure/redis/redis.service';
import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service';
import { SessionStartedEvent } from '../../../domain/events/session-started.event';
import { formatToDateKey } from '../../../shared/utils/timezone.util';
export interface RecordEventsResult {
accepted: number;
failed: number;
errors?: string[];
}
@Injectable()
@CommandHandler(RecordEventsCommand)
export class RecordEventsHandler implements ICommandHandler<RecordEventsCommand> {
constructor(
@Inject(EVENT_LOG_REPOSITORY)
private readonly eventLogRepository: IEventLogRepository,
private readonly redisService: RedisService,
private readonly eventPublisher: EventPublisherService,
) {}
async execute(command: RecordEventsCommand): Promise<RecordEventsResult> {
const { events } = command;
const errors: string[] = [];
const validLogs: EventLog[] = [];
// 1. 验证并转换事件
for (let i = 0; i < events.length; i++) {
try {
const log = this.toEventLog(events[i]);
validLogs.push(log);
} catch (e) {
errors.push(`Event[${i}]: ${e.message}`);
}
}
if (validLogs.length === 0) {
return { accepted: 0, failed: events.length, errors };
}
// 2. 批量写入数据库
await this.eventLogRepository.batchInsert(validLogs);
// 3. 更新实时DAU (HyperLogLog)
const todayKey = formatToDateKey(new Date());
for (const log of validLogs) {
if (log.eventName.isDauEvent()) {
await this.redisService.pfadd(
`analytics:dau:${todayKey}`,
log.dauIdentifier,
);
// 4. 发布领域事件
await this.eventPublisher.publish(
SessionStartedEvent.EVENT_NAME,
new SessionStartedEvent(
log.userId,
log.installId.value,
log.eventTime,
{
appVersion: log.properties.appVersion,
os: log.properties.os,
osVersion: log.properties.osVersion,
},
),
);
}
}
return {
accepted: validLogs.length,
failed: events.length - validLogs.length,
errors: errors.length > 0 ? errors : undefined,
};
}
private toEventLog(dto: EventItemDto): EventLog {
return EventLog.create({
userId: dto.userId ? BigInt(dto.userId) : null,
installId: InstallId.fromString(dto.installId),
eventName: EventName.fromString(dto.eventName),
eventTime: new Date(dto.clientTs * 1000),
properties: EventProperties.fromData(dto.properties ?? {}),
});
}
}

View File

@ -0,0 +1,8 @@
export class RecordHeartbeatCommand {
constructor(
public readonly userId: bigint,
public readonly installId: string,
public readonly appVersion: string,
public readonly clientTs: number,
) {}
}

View File

@ -0,0 +1,36 @@
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { Injectable } from '@nestjs/common';
import { RecordHeartbeatCommand } from './record-heartbeat.command';
import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository';
import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service';
import { HeartbeatReceivedEvent } from '../../../domain/events/heartbeat-received.event';
export interface RecordHeartbeatResult {
ok: boolean;
serverTs: number;
}
@Injectable()
@CommandHandler(RecordHeartbeatCommand)
export class RecordHeartbeatHandler implements ICommandHandler<RecordHeartbeatCommand> {
constructor(
private readonly presenceRedisRepository: PresenceRedisRepository,
private readonly eventPublisher: EventPublisherService,
) {}
async execute(command: RecordHeartbeatCommand): Promise<RecordHeartbeatResult> {
const { userId, installId, appVersion, clientTs } = command;
const now = Math.floor(Date.now() / 1000);
// 1. 更新Redis在线状态
await this.presenceRedisRepository.updateUserPresence(userId.toString(), now);
// 2. 发布领域事件
await this.eventPublisher.publish(
HeartbeatReceivedEvent.EVENT_NAME,
new HeartbeatReceivedEvent(userId, installId, new Date()),
);
return { ok: true, serverTs: now };
}
}

View File

@ -0,0 +1,46 @@
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { Inject, Injectable } from '@nestjs/common';
import { GetDauStatsQuery } from './get-dau-stats.query';
import {
IDailyActiveStatsRepository,
DAILY_ACTIVE_STATS_REPOSITORY,
} from '../../../domain/repositories/daily-active-stats.repository.interface';
export interface DauStatsItem {
day: string;
dauCount: number;
byProvince?: Record<string, number>;
byCity?: Record<string, number>;
}
export interface DauStatsResult {
data: DauStatsItem[];
total: number;
}
@Injectable()
@QueryHandler(GetDauStatsQuery)
export class GetDauStatsHandler implements IQueryHandler<GetDauStatsQuery> {
constructor(
@Inject(DAILY_ACTIVE_STATS_REPOSITORY)
private readonly dauStatsRepository: IDailyActiveStatsRepository,
) {}
async execute(query: GetDauStatsQuery): Promise<DauStatsResult> {
const { startDate, endDate } = query;
const statsList = await this.dauStatsRepository.findByDateRange(startDate, endDate);
const data: DauStatsItem[] = statsList.map((stats) => ({
day: stats.day.toISOString().split('T')[0],
dauCount: stats.dauCount,
byProvince: Object.fromEntries(stats.dauByProvince),
byCity: Object.fromEntries(stats.dauByCity),
}));
return {
data,
total: data.length,
};
}
}

View File

@ -0,0 +1,6 @@
export class GetDauStatsQuery {
constructor(
public readonly startDate: Date,
public readonly endDate: Date,
) {}
}

View File

@ -0,0 +1,33 @@
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { Injectable } from '@nestjs/common';
import { GetOnlineCountQuery } from './get-online-count.query';
import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository';
import { OnlineDetectionService } from '../../../domain/services/online-detection.service';
export interface OnlineCountResult {
count: number;
windowSeconds: number;
queriedAt: Date;
}
@Injectable()
@QueryHandler(GetOnlineCountQuery)
export class GetOnlineCountHandler implements IQueryHandler<GetOnlineCountQuery> {
constructor(
private readonly presenceRedisRepository: PresenceRedisRepository,
private readonly onlineDetectionService: OnlineDetectionService,
) {}
async execute(query: GetOnlineCountQuery): Promise<OnlineCountResult> {
const now = new Date();
const threshold = this.onlineDetectionService.getOnlineThreshold(now);
const count = await this.presenceRedisRepository.countOnlineUsers(threshold);
return {
count,
windowSeconds: this.onlineDetectionService.getWindowSeconds(),
queriedAt: now,
};
}
}

View File

@ -0,0 +1,3 @@
export class GetOnlineCountQuery {
constructor() {}
}

View File

@ -0,0 +1,86 @@
import { Injectable, Logger, Inject } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { CommandBus } from '@nestjs/cqrs';
import { subDays } from 'date-fns';
import { CalculateDauCommand } from '../commands/calculate-dau/calculate-dau.command';
import { PresenceRedisRepository } from '../../infrastructure/redis/presence-redis.repository';
import { OnlineDetectionService } from '../../domain/services/online-detection.service';
import { OnlineSnapshot } from '../../domain/entities/online-snapshot.entity';
import { IOnlineSnapshotRepository, ONLINE_SNAPSHOT_REPOSITORY } from '../../domain/repositories/online-snapshot.repository.interface';
@Injectable()
export class AnalyticsScheduler {
private readonly logger = new Logger(AnalyticsScheduler.name);
constructor(
private readonly commandBus: CommandBus,
private readonly presenceRedisRepository: PresenceRedisRepository,
private readonly onlineDetectionService: OnlineDetectionService,
@Inject(ONLINE_SNAPSHOT_REPOSITORY)
private readonly snapshotRepository: IOnlineSnapshotRepository,
) {}
/**
* 线
*/
@Cron(CronExpression.EVERY_MINUTE)
async recordOnlineSnapshot(): Promise<void> {
try {
const now = new Date();
const threshold = this.onlineDetectionService.getOnlineThreshold(now);
const count = await this.presenceRedisRepository.countOnlineUsers(threshold);
const snapshot = OnlineSnapshot.create({
ts: now,
onlineCount: count,
windowSeconds: this.onlineDetectionService.getWindowSeconds(),
});
await this.snapshotRepository.insert(snapshot);
this.logger.debug(`Online snapshot recorded: ${count} users`);
} catch (error) {
this.logger.error('Failed to record online snapshot', error);
}
}
/**
* 线
*/
@Cron(CronExpression.EVERY_HOUR)
async cleanupExpiredPresence(): Promise<void> {
try {
const threshold = Math.floor(Date.now() / 1000) - 86400; // 24小时前
await this.presenceRedisRepository.removeExpiredUsers(threshold);
this.logger.log('Expired presence data cleaned up');
} catch (error) {
this.logger.error('Failed to cleanup expired presence', error);
}
}
/**
* 1:00 DAU (Asia/Shanghai)
*/
@Cron('0 0 1 * * *', { timeZone: 'Asia/Shanghai' })
async calculateYesterdayDau(): Promise<void> {
try {
const yesterday = subDays(new Date(), 1);
await this.commandBus.execute(new CalculateDauCommand(yesterday));
this.logger.log('Yesterday DAU calculated');
} catch (error) {
this.logger.error('Failed to calculate yesterday DAU', error);
}
}
/**
* DAU ()
*/
@Cron(CronExpression.EVERY_HOUR)
async calculateTodayDauRolling(): Promise<void> {
try {
await this.commandBus.execute(new CalculateDauCommand(new Date()));
this.logger.debug('Today DAU rolling calculation completed');
} catch (error) {
this.logger.error('Failed to calculate today DAU', error);
}
}
}

View File

@ -0,0 +1,89 @@
import { AggregateRoot } from '@nestjs/cqrs';
import { DauCalculatedEvent } from '../../events/dau-calculated.event';
export class DailyActiveStats extends AggregateRoot {
private _day: Date;
private _dauCount: number;
private _dauByProvince: Map<string, number>;
private _dauByCity: Map<string, number>;
private _calculatedAt: Date;
private _version: number;
private constructor() {
super();
}
// Getters
get day(): Date {
return this._day;
}
get dauCount(): number {
return this._dauCount;
}
get dauByProvince(): Map<string, number> {
return new Map(this._dauByProvince);
}
get dauByCity(): Map<string, number> {
return new Map(this._dauByCity);
}
get calculatedAt(): Date {
return this._calculatedAt;
}
get version(): number {
return this._version;
}
// 工厂方法
static create(props: {
day: Date;
dauCount: number;
dauByProvince?: Map<string, number>;
dauByCity?: Map<string, number>;
}): DailyActiveStats {
const stats = new DailyActiveStats();
stats._day = props.day;
stats._dauCount = props.dauCount;
stats._dauByProvince = props.dauByProvince ?? new Map();
stats._dauByCity = props.dauByCity ?? new Map();
stats._calculatedAt = new Date();
stats._version = 1;
stats.apply(new DauCalculatedEvent(stats._day, stats._dauCount));
return stats;
}
// 重新计算
recalculate(newDauCount: number, byProvince?: Map<string, number>, byCity?: Map<string, number>): void {
this._dauCount = newDauCount;
if (byProvince) this._dauByProvince = byProvince;
if (byCity) this._dauByCity = byCity;
this._calculatedAt = new Date();
this._version++;
this.apply(new DauCalculatedEvent(this._day, this._dauCount));
}
// 从持久化恢复
static reconstitute(props: {
day: Date;
dauCount: number;
dauByProvince: Map<string, number>;
dauByCity: Map<string, number>;
calculatedAt: Date;
version: number;
}): DailyActiveStats {
const stats = new DailyActiveStats();
stats._day = props.day;
stats._dauCount = props.dauCount;
stats._dauByProvince = props.dauByProvince;
stats._dauByCity = props.dauByCity;
stats._calculatedAt = props.calculatedAt;
stats._version = props.version;
return stats;
}
}

View File

@ -0,0 +1,15 @@
import { Module } from '@nestjs/common';
import { DauCalculationService } from './services/dau-calculation.service';
import { OnlineDetectionService } from './services/online-detection.service';
@Module({
providers: [
DauCalculationService,
OnlineDetectionService,
],
exports: [
DauCalculationService,
OnlineDetectionService,
],
})
export class DomainModule {}

View File

@ -0,0 +1,92 @@
import { InstallId } from '../value-objects/install-id.vo';
import { EventName } from '../value-objects/event-name.vo';
import { EventProperties } from '../value-objects/event-properties.vo';
export class EventLog {
private _id: bigint | null;
private _userId: bigint | null;
private _installId: InstallId;
private _eventName: EventName;
private _eventTime: Date;
private _properties: EventProperties;
private _createdAt: Date;
private constructor() {}
// Getters
get id(): bigint | null {
return this._id;
}
get userId(): bigint | null {
return this._userId;
}
get installId(): InstallId {
return this._installId;
}
get eventName(): EventName {
return this._eventName;
}
get eventTime(): Date {
return this._eventTime;
}
get properties(): EventProperties {
return this._properties;
}
get createdAt(): Date {
return this._createdAt;
}
/**
* DAU去重的唯一标识
* 使 userId使 installId
*/
get dauIdentifier(): string {
return this._userId?.toString() ?? this._installId.value;
}
// 工厂方法
static create(props: {
userId?: bigint | null;
installId: InstallId;
eventName: EventName;
eventTime: Date;
properties?: EventProperties;
}): EventLog {
const log = new EventLog();
log._id = null;
log._userId = props.userId ?? null;
log._installId = props.installId;
log._eventName = props.eventName;
log._eventTime = props.eventTime;
log._properties = props.properties ?? EventProperties.empty();
log._createdAt = new Date();
return log;
}
// 从持久化恢复
static reconstitute(props: {
id: bigint;
userId: bigint | null;
installId: InstallId;
eventName: EventName;
eventTime: Date;
properties: EventProperties;
createdAt: Date;
}): EventLog {
const log = new EventLog();
log._id = props.id;
log._userId = props.userId;
log._installId = props.installId;
log._eventName = props.eventName;
log._eventTime = props.eventTime;
log._properties = props.properties;
log._createdAt = props.createdAt;
return log;
}
}

View File

@ -0,0 +1,53 @@
import { TimeWindow } from '../value-objects/time-window.vo';
export class OnlineSnapshot {
private _id: bigint | null;
private _ts: Date;
private _onlineCount: number;
private _windowSeconds: number;
private constructor() {}
get id(): bigint | null {
return this._id;
}
get ts(): Date {
return this._ts;
}
get onlineCount(): number {
return this._onlineCount;
}
get windowSeconds(): number {
return this._windowSeconds;
}
static create(props: {
ts: Date;
onlineCount: number;
windowSeconds?: number;
}): OnlineSnapshot {
const snapshot = new OnlineSnapshot();
snapshot._id = null;
snapshot._ts = props.ts;
snapshot._onlineCount = props.onlineCount;
snapshot._windowSeconds = props.windowSeconds ?? TimeWindow.DEFAULT_ONLINE_WINDOW_SECONDS;
return snapshot;
}
static reconstitute(props: {
id: bigint;
ts: Date;
onlineCount: number;
windowSeconds: number;
}): OnlineSnapshot {
const snapshot = new OnlineSnapshot();
snapshot._id = props.id;
snapshot._ts = props.ts;
snapshot._onlineCount = props.onlineCount;
snapshot._windowSeconds = props.windowSeconds;
return snapshot;
}
}

View File

@ -0,0 +1,8 @@
export class DauCalculatedEvent {
static readonly EVENT_NAME = 'analytics.dau.calculated';
constructor(
public readonly day: Date,
public readonly dauCount: number,
) {}
}

View File

@ -0,0 +1,9 @@
export class HeartbeatReceivedEvent {
static readonly EVENT_NAME = 'presence.heartbeat.received';
constructor(
public readonly userId: bigint,
public readonly installId: string,
public readonly occurredAt: Date,
) {}
}

View File

@ -0,0 +1,14 @@
export class SessionStartedEvent {
static readonly EVENT_NAME = 'analytics.session.started';
constructor(
public readonly userId: bigint | null,
public readonly installId: string,
public readonly occurredAt: Date,
public readonly metadata: {
appVersion?: string;
os?: string;
osVersion?: string;
},
) {}
}

View File

@ -0,0 +1,20 @@
import { DailyActiveStats } from '../aggregates/daily-active-stats/daily-active-stats.aggregate';
export interface IDailyActiveStatsRepository {
/**
*
*/
upsert(stats: DailyActiveStats): Promise<void>;
/**
*
*/
findByDate(day: Date): Promise<DailyActiveStats | null>;
/**
*
*/
findByDateRange(startDate: Date, endDate: Date): Promise<DailyActiveStats[]>;
}
export const DAILY_ACTIVE_STATS_REPOSITORY = Symbol('IDailyActiveStatsRepository');

View File

@ -0,0 +1,41 @@
import { EventLog } from '../entities/event-log.entity';
import { EventName } from '../value-objects/event-name.vo';
export interface DauQueryResult {
total: number;
byProvince: Map<string, number>;
byCity: Map<string, number>;
}
export interface IEventLogRepository {
/**
*
*/
batchInsert(logs: EventLog[]): Promise<void>;
/**
*
*/
insert(log: EventLog): Promise<EventLog>;
/**
* DAU
*/
queryDau(
eventName: EventName,
startTime: Date,
endTime: Date,
): Promise<DauQueryResult>;
/**
*
*/
findByTimeRange(
eventName: EventName,
startTime: Date,
endTime: Date,
limit?: number,
): Promise<EventLog[]>;
}
export const EVENT_LOG_REPOSITORY = Symbol('IEventLogRepository');

View File

@ -0,0 +1,24 @@
import { OnlineSnapshot } from '../entities/online-snapshot.entity';
export interface IOnlineSnapshotRepository {
/**
*
*/
insert(snapshot: OnlineSnapshot): Promise<OnlineSnapshot>;
/**
*
*/
findByTimeRange(
startTime: Date,
endTime: Date,
interval?: '1m' | '5m' | '1h',
): Promise<OnlineSnapshot[]>;
/**
*
*/
findLatest(): Promise<OnlineSnapshot | null>;
}
export const ONLINE_SNAPSHOT_REPOSITORY = Symbol('IOnlineSnapshotRepository');

View File

@ -0,0 +1,49 @@
import { Injectable } from '@nestjs/common';
import { DailyActiveStats } from '../aggregates/daily-active-stats/daily-active-stats.aggregate';
import { DauQueryResult } from '../repositories/event-log.repository.interface';
@Injectable()
export class DauCalculationService {
/**
*
*/
createStatsFromQueryResult(day: Date, result: DauQueryResult): DailyActiveStats {
return DailyActiveStats.create({
day,
dauCount: result.total,
dauByProvince: result.byProvince,
dauByCity: result.byCity,
});
}
/**
*
*/
mergeQueryResults(results: DauQueryResult[]): DauQueryResult {
const allIdentifiers = new Set<string>();
const byProvince = new Map<string, Set<string>>();
const byCity = new Map<string, Set<string>>();
// 注意:这里简化处理,实际需要原始数据才能正确去重
// 生产环境应该在数据库层面完成去重
let total = 0;
const provinceCount = new Map<string, number>();
const cityCount = new Map<string, number>();
for (const result of results) {
total = Math.max(total, result.total);
for (const [province, count] of result.byProvince) {
provinceCount.set(province, Math.max(provinceCount.get(province) ?? 0, count));
}
for (const [city, count] of result.byCity) {
cityCount.set(city, Math.max(cityCount.get(city) ?? 0, count));
}
}
return {
total,
byProvince: provinceCount,
byCity: cityCount,
};
}
}

View File

@ -0,0 +1,33 @@
import { Injectable } from '@nestjs/common';
import { TimeWindow } from '../value-objects/time-window.vo';
@Injectable()
export class OnlineDetectionService {
private readonly timeWindow: TimeWindow;
constructor() {
this.timeWindow = TimeWindow.default();
}
/**
* 线
*/
isOnline(lastHeartbeatTs: number, now: Date = new Date()): boolean {
const threshold = this.timeWindow.getThresholdTimestamp(now);
return lastHeartbeatTs > threshold;
}
/**
* 线
*/
getOnlineThreshold(now: Date = new Date()): number {
return this.timeWindow.getThresholdTimestamp(now);
}
/**
*
*/
getWindowSeconds(): number {
return this.timeWindow.windowSeconds;
}
}

View File

@ -0,0 +1,49 @@
export class DeviceInfo {
private readonly _os: 'Android' | 'iOS';
private readonly _osVersion: string;
private readonly _deviceModel?: string;
private readonly _screenResolution?: string;
private constructor(props: {
os: 'Android' | 'iOS';
osVersion: string;
deviceModel?: string;
screenResolution?: string;
}) {
this._os = props.os;
this._osVersion = props.osVersion;
this._deviceModel = props.deviceModel;
this._screenResolution = props.screenResolution;
}
get os(): 'Android' | 'iOS' {
return this._os;
}
get osVersion(): string {
return this._osVersion;
}
get deviceModel(): string | undefined {
return this._deviceModel;
}
get screenResolution(): string | undefined {
return this._screenResolution;
}
static create(props: {
os: string;
osVersion: string;
deviceModel?: string;
screenResolution?: string;
}): DeviceInfo {
const normalizedOs = props.os.toLowerCase() === 'ios' ? 'iOS' : 'Android';
return new DeviceInfo({
os: normalizedOs,
osVersion: props.osVersion,
deviceModel: props.deviceModel,
screenResolution: props.screenResolution,
});
}
}

View File

@ -0,0 +1,48 @@
import { DomainException } from '../../shared/exceptions/domain.exception';
export class EventName {
// 预定义的事件名
static readonly APP_SESSION_START = new EventName('app_session_start');
static readonly PRESENCE_HEARTBEAT = new EventName('presence_heartbeat');
static readonly APP_SESSION_END = new EventName('app_session_end');
private readonly _value: string;
private constructor(value: string) {
this._value = value;
}
get value(): string {
return this._value;
}
static fromString(value: string): EventName {
if (!value || value.trim() === '') {
throw new DomainException('EventName cannot be empty');
}
const trimmed = value.trim().toLowerCase();
if (trimmed.length > 64) {
throw new DomainException('EventName cannot exceed 64 characters');
}
// 验证格式:字母、数字、下划线
if (!/^[a-z][a-z0-9_]*$/.test(trimmed)) {
throw new DomainException('EventName must start with letter and contain only lowercase letters, numbers, and underscores');
}
return new EventName(trimmed);
}
/**
* DAU统计事件
*/
isDauEvent(): boolean {
return this._value === EventName.APP_SESSION_START.value;
}
equals(other: EventName): boolean {
return this._value === other._value;
}
toString(): string {
return this._value;
}
}

View File

@ -0,0 +1,62 @@
export interface EventPropertiesData {
os?: string;
osVersion?: string;
appVersion?: string;
networkType?: string;
clientTs?: number;
province?: string;
city?: string;
[key: string]: unknown;
}
export class EventProperties {
private readonly _data: EventPropertiesData;
private constructor(data: EventPropertiesData) {
this._data = { ...data };
}
get data(): EventPropertiesData {
return { ...this._data };
}
get os(): string | undefined {
return this._data.os;
}
get osVersion(): string | undefined {
return this._data.osVersion;
}
get appVersion(): string | undefined {
return this._data.appVersion;
}
get networkType(): string | undefined {
return this._data.networkType;
}
get clientTs(): number | undefined {
return this._data.clientTs;
}
get province(): string | undefined {
return this._data.province;
}
get city(): string | undefined {
return this._data.city;
}
static empty(): EventProperties {
return new EventProperties({});
}
static fromData(data: EventPropertiesData): EventProperties {
return new EventProperties(data);
}
toJSON(): EventPropertiesData {
return this._data;
}
}

View File

@ -0,0 +1,37 @@
import { v4 as uuidv4, validate as uuidValidate } from 'uuid';
import { DomainException } from '../../shared/exceptions/domain.exception';
export class InstallId {
private readonly _value: string;
private constructor(value: string) {
this._value = value;
}
get value(): string {
return this._value;
}
static generate(): InstallId {
return new InstallId(uuidv4());
}
static fromString(value: string): InstallId {
if (!value || value.trim() === '') {
throw new DomainException('InstallId cannot be empty');
}
// 允许非UUID格式但需要有基本长度
if (value.length < 8 || value.length > 128) {
throw new DomainException('InstallId length must be between 8 and 128 characters');
}
return new InstallId(value.trim());
}
equals(other: InstallId): boolean {
return this._value === other._value;
}
toString(): string {
return this._value;
}
}

View File

@ -0,0 +1,32 @@
export class TimeWindow {
static readonly DEFAULT_ONLINE_WINDOW_SECONDS = 180; // 3分钟
static readonly DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 60; // 60秒
private readonly _windowSeconds: number;
private constructor(windowSeconds: number) {
this._windowSeconds = windowSeconds;
}
get windowSeconds(): number {
return this._windowSeconds;
}
static default(): TimeWindow {
return new TimeWindow(TimeWindow.DEFAULT_ONLINE_WINDOW_SECONDS);
}
static ofSeconds(seconds: number): TimeWindow {
if (seconds <= 0) {
throw new Error('TimeWindow must be positive');
}
return new TimeWindow(seconds);
}
/**
* 线
*/
getThresholdTimestamp(now: Date = new Date()): number {
return Math.floor(now.getTime() / 1000) - this._windowSeconds;
}
}

View File

@ -0,0 +1,46 @@
import { Module } from '@nestjs/common';
import { PrismaService } from './persistence/prisma/prisma.service';
import { EventLogMapper } from './persistence/mappers/event-log.mapper';
import { DailyActiveStatsMapper } from './persistence/mappers/daily-active-stats.mapper';
import { OnlineSnapshotMapper } from './persistence/mappers/online-snapshot.mapper';
import { EventLogRepositoryImpl } from './persistence/repositories/event-log.repository.impl';
import { DailyActiveStatsRepositoryImpl } from './persistence/repositories/daily-active-stats.repository.impl';
import { OnlineSnapshotRepositoryImpl } from './persistence/repositories/online-snapshot.repository.impl';
import { RedisModule } from './redis/redis.module';
import { KafkaModule } from './kafka/kafka.module';
import {
EVENT_LOG_REPOSITORY,
DAILY_ACTIVE_STATS_REPOSITORY,
ONLINE_SNAPSHOT_REPOSITORY,
} from '../domain/repositories/event-log.repository.interface';
@Module({
imports: [RedisModule, KafkaModule],
providers: [
PrismaService,
EventLogMapper,
DailyActiveStatsMapper,
OnlineSnapshotMapper,
{
provide: EVENT_LOG_REPOSITORY,
useClass: EventLogRepositoryImpl,
},
{
provide: DAILY_ACTIVE_STATS_REPOSITORY,
useClass: DailyActiveStatsRepositoryImpl,
},
{
provide: ONLINE_SNAPSHOT_REPOSITORY,
useClass: OnlineSnapshotRepositoryImpl,
},
],
exports: [
PrismaService,
EVENT_LOG_REPOSITORY,
DAILY_ACTIVE_STATS_REPOSITORY,
ONLINE_SNAPSHOT_REPOSITORY,
RedisModule,
KafkaModule,
],
})
export class InfrastructureModule {}

View File

@ -0,0 +1,68 @@
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Producer, logLevel } from 'kafkajs';
@Injectable()
export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(EventPublisherService.name);
private kafka: Kafka;
private producer: Producer;
private readonly topic: string;
private readonly enabled: boolean;
constructor(private readonly configService: ConfigService) {
this.enabled = this.configService.get<string>('KAFKA_ENABLED', 'false') === 'true';
if (this.enabled) {
this.kafka = new Kafka({
clientId: 'presence-service',
brokers: this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(','),
logLevel: logLevel.WARN,
});
this.producer = this.kafka.producer();
}
this.topic = this.configService.get<string>('KAFKA_TOPIC_ANALYTICS', 'analytics-events');
}
async onModuleInit(): Promise<void> {
if (this.enabled) {
await this.producer.connect();
this.logger.log('Kafka producer connected');
} else {
this.logger.log('Kafka is disabled');
}
}
async onModuleDestroy(): Promise<void> {
if (this.enabled) {
await this.producer.disconnect();
this.logger.log('Kafka producer disconnected');
}
}
async publish(eventType: string, payload: unknown): Promise<void> {
if (!this.enabled) {
this.logger.debug(`Kafka disabled, skipping event: ${eventType}`);
return;
}
try {
await this.producer.send({
topic: this.topic,
messages: [
{
key: eventType,
value: JSON.stringify({
eventType,
payload,
occurredAt: new Date().toISOString(),
}),
},
],
});
} catch (error) {
this.logger.error(`Failed to publish event ${eventType}`, error);
}
}
}

View File

@ -0,0 +1,8 @@
import { Module } from '@nestjs/common';
import { EventPublisherService } from './event-publisher.service';
@Module({
providers: [EventPublisherService],
exports: [EventPublisherService],
})
export class KafkaModule {}

View File

@ -0,0 +1,35 @@
import { Injectable } from '@nestjs/common';
import { DailyActiveStats as PrismaDailyActiveStats } from '@prisma/client';
import { DailyActiveStats } from '../../../domain/aggregates/daily-active-stats/daily-active-stats.aggregate';
@Injectable()
export class DailyActiveStatsMapper {
toDomain(prisma: PrismaDailyActiveStats): DailyActiveStats {
const dauByProvince = new Map<string, number>(
Object.entries((prisma.dauByProvince as Record<string, number>) ?? {}),
);
const dauByCity = new Map<string, number>(
Object.entries((prisma.dauByCity as Record<string, number>) ?? {}),
);
return DailyActiveStats.reconstitute({
day: prisma.day,
dauCount: prisma.dauCount,
dauByProvince,
dauByCity,
calculatedAt: prisma.calculatedAt,
version: prisma.version,
});
}
toPersistence(domain: DailyActiveStats): PrismaDailyActiveStats {
return {
day: domain.day,
dauCount: domain.dauCount,
dauByProvince: Object.fromEntries(domain.dauByProvince) as any,
dauByCity: Object.fromEntries(domain.dauByCity) as any,
calculatedAt: domain.calculatedAt,
version: domain.version,
};
}
}

View File

@ -0,0 +1,31 @@
import { Injectable } from '@nestjs/common';
import { EventLog as PrismaEventLog } from '@prisma/client';
import { EventLog } from '../../../domain/entities/event-log.entity';
import { InstallId } from '../../../domain/value-objects/install-id.vo';
import { EventName } from '../../../domain/value-objects/event-name.vo';
import { EventProperties, EventPropertiesData } from '../../../domain/value-objects/event-properties.vo';
@Injectable()
export class EventLogMapper {
toDomain(prisma: PrismaEventLog): EventLog {
return EventLog.reconstitute({
id: prisma.id,
userId: prisma.userId,
installId: InstallId.fromString(prisma.installId),
eventName: EventName.fromString(prisma.eventName),
eventTime: prisma.eventTime,
properties: EventProperties.fromData((prisma.properties as EventPropertiesData) ?? {}),
createdAt: prisma.createdAt,
});
}
toPersistence(domain: EventLog): Omit<PrismaEventLog, 'id' | 'createdAt'> {
return {
userId: domain.userId,
installId: domain.installId.value,
eventName: domain.eventName.value,
eventTime: domain.eventTime,
properties: domain.properties.toJSON() as any,
};
}
}

View File

@ -0,0 +1,23 @@
import { Injectable } from '@nestjs/common';
import { OnlineSnapshot as PrismaOnlineSnapshot } from '@prisma/client';
import { OnlineSnapshot } from '../../../domain/entities/online-snapshot.entity';
@Injectable()
export class OnlineSnapshotMapper {
toDomain(prisma: PrismaOnlineSnapshot): OnlineSnapshot {
return OnlineSnapshot.reconstitute({
id: prisma.id,
ts: prisma.ts,
onlineCount: prisma.onlineCount,
windowSeconds: prisma.windowSeconds,
});
}
toPersistence(domain: OnlineSnapshot): Omit<PrismaOnlineSnapshot, 'id'> {
return {
ts: domain.ts,
onlineCount: domain.onlineCount,
windowSeconds: domain.windowSeconds,
};
}
}

View File

@ -0,0 +1,28 @@
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
@Injectable()
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(PrismaService.name);
constructor() {
super({
log: [
{ emit: 'event', level: 'query' },
{ emit: 'event', level: 'error' },
{ emit: 'event', level: 'info' },
{ emit: 'event', level: 'warn' },
],
});
}
async onModuleInit() {
await this.$connect();
this.logger.log('Prisma connected');
}
async onModuleDestroy() {
await this.$disconnect();
this.logger.log('Prisma disconnected');
}
}

View File

@ -0,0 +1,44 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import {
IDailyActiveStatsRepository,
} from '../../../domain/repositories/daily-active-stats.repository.interface';
import { DailyActiveStats } from '../../../domain/aggregates/daily-active-stats/daily-active-stats.aggregate';
import { DailyActiveStatsMapper } from '../mappers/daily-active-stats.mapper';
@Injectable()
export class DailyActiveStatsRepositoryImpl implements IDailyActiveStatsRepository {
constructor(
private readonly prisma: PrismaService,
private readonly mapper: DailyActiveStatsMapper,
) {}
async upsert(stats: DailyActiveStats): Promise<void> {
const data = this.mapper.toPersistence(stats);
await this.prisma.dailyActiveStats.upsert({
where: { day: stats.day },
create: data,
update: data,
});
}
async findByDate(day: Date): Promise<DailyActiveStats | null> {
const record = await this.prisma.dailyActiveStats.findUnique({
where: { day },
});
return record ? this.mapper.toDomain(record) : null;
}
async findByDateRange(startDate: Date, endDate: Date): Promise<DailyActiveStats[]> {
const records = await this.prisma.dailyActiveStats.findMany({
where: {
day: {
gte: startDate,
lte: endDate,
},
},
orderBy: { day: 'asc' },
});
return records.map((r) => this.mapper.toDomain(r));
}
}

View File

@ -0,0 +1,97 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import {
IEventLogRepository,
DauQueryResult,
} from '../../../domain/repositories/event-log.repository.interface';
import { EventLog } from '../../../domain/entities/event-log.entity';
import { EventName } from '../../../domain/value-objects/event-name.vo';
import { EventLogMapper } from '../mappers/event-log.mapper';
@Injectable()
export class EventLogRepositoryImpl implements IEventLogRepository {
constructor(
private readonly prisma: PrismaService,
private readonly mapper: EventLogMapper,
) {}
async batchInsert(logs: EventLog[]): Promise<void> {
const data = logs.map((log) => this.mapper.toPersistence(log));
await this.prisma.eventLog.createMany({ data });
}
async insert(log: EventLog): Promise<EventLog> {
const data = this.mapper.toPersistence(log);
const created = await this.prisma.eventLog.create({ data });
return this.mapper.toDomain(created);
}
async queryDau(
eventName: EventName,
startTime: Date,
endTime: Date,
): Promise<DauQueryResult> {
// 使用原生 SQL 进行去重统计
const result = await this.prisma.$queryRaw<
{ total: bigint; province: string | null; city: string | null; count: bigint }[]
>`
WITH base AS (
SELECT
COALESCE(user_id::text, install_id) AS identifier,
properties->>'province' AS province,
properties->>'city' AS city
FROM analytics_event_log
WHERE event_name = ${eventName.value}
AND event_time >= ${startTime}
AND event_time < ${endTime}
),
unique_users AS (
SELECT DISTINCT identifier, province, city FROM base
)
SELECT
COUNT(DISTINCT identifier) AS total,
province,
city,
COUNT(*) AS count
FROM unique_users
GROUP BY GROUPING SETS ((), (province), (city))
`;
let total = 0;
const byProvince = new Map<string, number>();
const byCity = new Map<string, number>();
for (const row of result) {
if (row.province === null && row.city === null) {
total = Number(row.total);
} else if (row.province !== null && row.city === null) {
byProvince.set(row.province, Number(row.count));
} else if (row.city !== null && row.province === null) {
byCity.set(row.city, Number(row.count));
}
}
return { total, byProvince, byCity };
}
async findByTimeRange(
eventName: EventName,
startTime: Date,
endTime: Date,
limit?: number,
): Promise<EventLog[]> {
const records = await this.prisma.eventLog.findMany({
where: {
eventName: eventName.value,
eventTime: {
gte: startTime,
lt: endTime,
},
},
orderBy: { eventTime: 'desc' },
take: limit,
});
return records.map((r) => this.mapper.toDomain(r));
}
}

View File

@ -0,0 +1,47 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import {
IOnlineSnapshotRepository,
} from '../../../domain/repositories/online-snapshot.repository.interface';
import { OnlineSnapshot } from '../../../domain/entities/online-snapshot.entity';
import { OnlineSnapshotMapper } from '../mappers/online-snapshot.mapper';
@Injectable()
export class OnlineSnapshotRepositoryImpl implements IOnlineSnapshotRepository {
constructor(
private readonly prisma: PrismaService,
private readonly mapper: OnlineSnapshotMapper,
) {}
async insert(snapshot: OnlineSnapshot): Promise<OnlineSnapshot> {
const data = this.mapper.toPersistence(snapshot);
const created = await this.prisma.onlineSnapshot.create({ data });
return this.mapper.toDomain(created);
}
async findByTimeRange(
startTime: Date,
endTime: Date,
interval?: '1m' | '5m' | '1h',
): Promise<OnlineSnapshot[]> {
// 简化实现:不做聚合,返回所有记录
// 生产环境可以使用 PostgreSQL 的 time_bucket 或按间隔采样
const records = await this.prisma.onlineSnapshot.findMany({
where: {
ts: {
gte: startTime,
lte: endTime,
},
},
orderBy: { ts: 'asc' },
});
return records.map((r) => this.mapper.toDomain(r));
}
async findLatest(): Promise<OnlineSnapshot | null> {
const record = await this.prisma.onlineSnapshot.findFirst({
orderBy: { ts: 'desc' },
});
return record ? this.mapper.toDomain(record) : null;
}
}

View File

@ -0,0 +1,74 @@
import { Injectable } from '@nestjs/common';
import { RedisService } from './redis.service';
@Injectable()
export class PresenceRedisRepository {
private readonly ONLINE_USERS_KEY = 'presence:online_users';
constructor(private readonly redisService: RedisService) {}
/**
* 线
*/
async updateUserPresence(userId: string, timestamp: number): Promise<void> {
await this.redisService.zadd(this.ONLINE_USERS_KEY, timestamp, userId);
}
/**
* 线
*/
async countOnlineUsers(thresholdTimestamp: number): Promise<number> {
return this.redisService.zcount(
this.ONLINE_USERS_KEY,
thresholdTimestamp,
'+inf',
);
}
/**
* 线
*/
async getOnlineUsers(thresholdTimestamp: number, limit?: number): Promise<string[]> {
const args: [string, number | string, number | string] = [
this.ONLINE_USERS_KEY,
thresholdTimestamp,
'+inf',
];
if (limit) {
return this.redisService.zrangebyscore(
this.ONLINE_USERS_KEY,
thresholdTimestamp,
'+inf',
'LIMIT',
0,
limit,
);
}
return this.redisService.zrangebyscore(
this.ONLINE_USERS_KEY,
thresholdTimestamp,
'+inf',
);
}
/**
*
*/
async removeExpiredUsers(thresholdTimestamp: number): Promise<number> {
return this.redisService.zremrangebyscore(
this.ONLINE_USERS_KEY,
'-inf',
thresholdTimestamp,
);
}
/**
*
*/
async getUserLastHeartbeat(userId: string): Promise<number | null> {
const score = await this.redisService.zscore(this.ONLINE_USERS_KEY, userId);
return score ? Number(score) : null;
}
}

View File

@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { RedisService } from './redis.service';
import { PresenceRedisRepository } from './presence-redis.repository';
@Module({
providers: [RedisService, PresenceRedisRepository],
exports: [RedisService, PresenceRedisRepository],
})
export class RedisModule {}

View File

@ -0,0 +1,65 @@
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import Redis from 'ioredis';
@Injectable()
export class RedisService implements OnModuleDestroy {
private readonly client: Redis;
constructor(private readonly configService: ConfigService) {
this.client = new Redis({
host: this.configService.get<string>('REDIS_HOST', 'localhost'),
port: this.configService.get<number>('REDIS_PORT', 6379),
password: this.configService.get<string>('REDIS_PASSWORD'),
db: this.configService.get<number>('REDIS_DB', 0),
});
}
async onModuleDestroy(): Promise<void> {
await this.client.quit();
}
// ZSET 操作
async zadd(key: string, score: number, member: string): Promise<number> {
return this.client.zadd(key, score, member);
}
async zcount(key: string, min: number | string, max: number | string): Promise<number> {
return this.client.zcount(key, min, max);
}
async zrangebyscore(
key: string,
min: number | string,
max: number | string,
...args: (string | number)[]
): Promise<string[]> {
return this.client.zrangebyscore(key, min, max, ...args);
}
async zremrangebyscore(key: string, min: number | string, max: number | string): Promise<number> {
return this.client.zremrangebyscore(key, min, max);
}
async zscore(key: string, member: string): Promise<string | null> {
return this.client.zscore(key, member);
}
// HyperLogLog 操作
async pfadd(key: string, ...elements: string[]): Promise<number> {
return this.client.pfadd(key, ...elements);
}
async pfcount(...keys: string[]): Promise<number> {
return this.client.pfcount(...keys);
}
// 通用操作
async expire(key: string, seconds: number): Promise<number> {
return this.client.expire(key, seconds);
}
async del(...keys: string[]): Promise<number> {
return this.client.del(...keys);
}
}

View File

@ -0,0 +1,41 @@
import { NestFactory } from '@nestjs/core';
import { ValidationPipe, Logger } from '@nestjs/common';
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';
import { AppModule } from './app.module';
async function bootstrap() {
const logger = new Logger('Bootstrap');
const app = await NestFactory.create(AppModule);
// 全局管道
app.useGlobalPipes(
new ValidationPipe({
whitelist: true,
transform: true,
forbidNonWhitelisted: true,
}),
);
// API 前缀
const apiPrefix = process.env.API_PREFIX || 'api/v1';
app.setGlobalPrefix(apiPrefix);
// Swagger 文档
const config = new DocumentBuilder()
.setTitle('Presence & Analytics Service API')
.setDescription('用户活跃度与在线状态服务')
.setVersion('1.0')
.addBearerAuth()
.build();
const document = SwaggerModule.createDocument(app, config);
SwaggerModule.setup(`${apiPrefix}/docs`, app, document);
// 启动服务
const port = parseInt(process.env.APP_PORT || '3001', 10);
await app.listen(port);
logger.log(`Presence Service is running on: http://localhost:${port}/${apiPrefix}`);
logger.log(`Swagger docs: http://localhost:${port}/${apiPrefix}/docs`);
}
bootstrap();

View File

@ -0,0 +1,10 @@
import { createParamDecorator, ExecutionContext } from '@nestjs/common';
export const CurrentUser = createParamDecorator(
(data: string | undefined, ctx: ExecutionContext) => {
const request = ctx.switchToHttp().getRequest();
const user = request.user;
return data ? user?.[data] : user;
},
);

View File

@ -0,0 +1,4 @@
import { SetMetadata } from '@nestjs/common';
export const IS_PUBLIC_KEY = 'isPublic';
export const Public = () => SetMetadata(IS_PUBLIC_KEY, true);

View File

@ -0,0 +1,6 @@
export class ApplicationException extends Error {
constructor(message: string) {
super(message);
this.name = 'ApplicationException';
}
}

View File

@ -0,0 +1,6 @@
export class DomainException extends Error {
constructor(message: string) {
super(message);
this.name = 'DomainException';
}
}

View File

@ -0,0 +1,28 @@
import { Injectable, CanActivate, ExecutionContext, UnauthorizedException } from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { IS_PUBLIC_KEY } from '../decorators/public.decorator';
@Injectable()
export class JwtAuthGuard implements CanActivate {
constructor(private reflector: Reflector) {}
canActivate(context: ExecutionContext): boolean {
const isPublic = this.reflector.getAllAndOverride<boolean>(IS_PUBLIC_KEY, [
context.getHandler(),
context.getClass(),
]);
if (isPublic) {
return true;
}
const request = context.switchToHttp().getRequest();
const user = request.user;
if (!user || !user.userId) {
throw new UnauthorizedException('未授权访问');
}
return true;
}
}

View File

@ -0,0 +1,30 @@
import { format, startOfDay, endOfDay } from 'date-fns';
import { toZonedTime, fromZonedTime } from 'date-fns-tz';
const DEFAULT_TIMEZONE = 'Asia/Shanghai';
/**
*
*/
export function startOfDayInTimezone(date: Date, timezone: string = DEFAULT_TIMEZONE): Date {
const zonedDate = toZonedTime(date, timezone);
const start = startOfDay(zonedDate);
return fromZonedTime(start, timezone);
}
/**
*
*/
export function endOfDayInTimezone(date: Date, timezone: string = DEFAULT_TIMEZONE): Date {
const zonedDate = toZonedTime(date, timezone);
const end = endOfDay(zonedDate);
return fromZonedTime(end, timezone);
}
/**
* Key (YYYY-MM-DD)
*/
export function formatToDateKey(date: Date, timezone: string = DEFAULT_TIMEZONE): string {
const zonedDate = toZonedTime(date, timezone);
return format(zonedDate, 'yyyy-MM-dd');
}