diff --git a/backend/services/presence-service/.dockerignore b/backend/services/presence-service/.dockerignore new file mode 100644 index 00000000..b45d358c --- /dev/null +++ b/backend/services/presence-service/.dockerignore @@ -0,0 +1,10 @@ +node_modules +dist +npm-debug.log +.env +.env.local +.env.*.local +.git +.gitignore +README.md +analytics-presence-service-dev-guide.md diff --git a/backend/services/presence-service/.env.example b/backend/services/presence-service/.env.example new file mode 100644 index 00000000..39c593f0 --- /dev/null +++ b/backend/services/presence-service/.env.example @@ -0,0 +1,25 @@ +# 应用配置 +NODE_ENV=development +APP_PORT=3001 +API_PREFIX=api/v1 + +# 数据库 +DATABASE_URL=postgresql://user:password@localhost:5432/rwa_analytics?schema=public + +# Redis +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_PASSWORD= +REDIS_DB=0 + +# JWT (与 Identity Service 共用) +JWT_SECRET=your-jwt-secret +JWT_EXPIRES_IN=7d + +# Kafka +KAFKA_ENABLED=false +KAFKA_BROKERS=localhost:9092 +KAFKA_TOPIC_ANALYTICS=analytics-events + +# 时区 +TZ=Asia/Shanghai diff --git a/backend/services/presence-service/Dockerfile b/backend/services/presence-service/Dockerfile new file mode 100644 index 00000000..b4be240e --- /dev/null +++ b/backend/services/presence-service/Dockerfile @@ -0,0 +1,56 @@ +# Build stage +FROM node:20-alpine AS builder + +WORKDIR /app + +# 复制依赖文件 +COPY package*.json ./ +RUN npm ci + +# 复制 Prisma schema 并生成客户端 +COPY prisma ./prisma/ +RUN DATABASE_URL="postgresql://user:pass@localhost:5432/db" npx prisma generate + +# 复制源代码并构建 +COPY src ./src/ +COPY tsconfig.json nest-cli.json ./ +RUN npm run build + +# 验证构建产物 +RUN ls -la dist/ && test -f dist/main.js + +# Production stage +FROM node:20-slim + +WORKDIR /app + +# 安装必要的系统依赖 (OpenSSL for Prisma, curl for healthcheck) +RUN apt-get update && apt-get install -y --no-install-recommends \ + openssl \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# 复制依赖文件并安装生产依赖 +COPY package*.json ./ +RUN npm ci --only=production + +# 复制 Prisma schema 并生成客户端 +COPY prisma ./prisma/ +RUN DATABASE_URL="postgresql://user:pass@localhost:5432/db" npx prisma generate + +# 复制构建产物 +COPY --from=builder /app/dist ./dist/ + +# 创建非 root 用户 +RUN groupadd -g 1001 nodejs && \ + useradd -u 1001 -g nodejs nestjs + +USER nestjs + +EXPOSE 3001 + +# 健康检查 +HEALTHCHECK --interval=30s --timeout=3s --start-period=40s --retries=3 \ + CMD curl -f http://localhost:3001/api/v1/health || exit 1 + +CMD ["node", "dist/main.js"] diff --git a/backend/services/presence-service/package.json b/backend/services/presence-service/package.json index 9c0a32af..bd3efef6 100644 --- a/backend/services/presence-service/package.json +++ b/backend/services/presence-service/package.json @@ -25,15 +25,20 @@ "@nestjs/common": "^10.0.0", "@nestjs/config": "^3.1.1", "@nestjs/core": "^10.0.0", + "@nestjs/cqrs": "^10.2.7", "@nestjs/platform-express": "^10.0.0", "@nestjs/schedule": "^4.0.0", "@nestjs/swagger": "^7.1.17", "@prisma/client": "^5.7.0", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", + "date-fns": "^3.0.0", + "date-fns-tz": "^2.0.0", "ioredis": "^5.3.2", + "kafkajs": "^2.2.4", "reflect-metadata": "^0.1.13", - "rxjs": "^7.8.1" + "rxjs": "^7.8.1", + "uuid": "^9.0.1" }, "devDependencies": { "@nestjs/cli": "^10.0.0", @@ -41,6 +46,7 @@ "@nestjs/testing": "^10.0.0", "@types/express": "^4.17.17", "@types/node": "^20.3.1", + "@types/uuid": "^9.0.7", "@typescript-eslint/eslint-plugin": "^6.0.0", "@typescript-eslint/parser": "^6.0.0", "eslint": "^8.42.0", diff --git a/backend/services/presence-service/prisma/schema.prisma b/backend/services/presence-service/prisma/schema.prisma index 14640ee0..87c7e733 100644 --- a/backend/services/presence-service/prisma/schema.prisma +++ b/backend/services/presence-service/prisma/schema.prisma @@ -11,82 +11,41 @@ datasource db { url = env("DATABASE_URL") } -// ============================================================================= -// Heartbeat (在线心跳记录) -// ============================================================================= +// 事件日志表 (append-only) +model EventLog { + id BigInt @id @default(autoincrement()) + userId BigInt? @map("user_id") + installId String @map("install_id") @db.VarChar(64) + eventName String @map("event_name") @db.VarChar(64) + eventTime DateTime @map("event_time") @db.Timestamptz() + properties Json? @db.JsonB + createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz() -model Heartbeat { - id String @id @default(uuid()) - installId String // 设备唯一安装ID - sessionId String // 会话ID - userId String? // 用户ID (可选,未登录时为null) - timestamp DateTime // 心跳时间戳 - createdAt DateTime @default(now()) - - @@index([installId, timestamp]) - @@index([userId, timestamp]) - @@index([sessionId]) - @@index([timestamp]) - @@map("heartbeats") + @@index([eventTime], name: "idx_event_log_event_time") + @@index([eventName], name: "idx_event_log_event_name") + @@index([eventName, eventTime], name: "idx_event_log_event_name_time") + @@map("analytics_event_log") } -// ============================================================================= -// Daily Active Users (DAU 日活统计) -// ============================================================================= +// 日活统计表 +model DailyActiveStats { + day DateTime @id @map("day") @db.Date + dauCount Int @map("dau_count") + dauByProvince Json? @map("dau_by_province") @db.JsonB + dauByCity Json? @map("dau_by_city") @db.JsonB + calculatedAt DateTime @map("calculated_at") @db.Timestamptz() + version Int @default(1) -model DailyActiveUser { - id String @id @default(uuid()) - date DateTime @db.Date // 统计日期 (YYYY-MM-DD) - installId String // 设备唯一安装ID - userId String? // 用户ID (可选) - firstSeen DateTime // 当日首次出现时间 - lastSeen DateTime // 当日最后出现时间 - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt - - @@unique([date, installId]) - @@index([date]) - @@index([userId, date]) - @@map("daily_active_users") + @@map("analytics_daily_active_users") } -// ============================================================================= -// Online Presence (实时在线状态) -// 使用 Redis 存储,此表仅用于历史记录 -// ============================================================================= +// 在线人数快照表 +model OnlineSnapshot { + id BigInt @id @default(autoincrement()) + ts DateTime @unique @db.Timestamptz() + onlineCount Int @map("online_count") + windowSeconds Int @default(180) @map("window_seconds") -model OnlineSession { - id String @id @default(uuid()) - installId String // 设备唯一安装ID - sessionId String @unique // 会话ID - userId String? // 用户ID (可选) - startTime DateTime // 会话开始时间 - lastHeartbeat DateTime // 最后心跳时间 - endTime DateTime? // 会话结束时间 (null表示仍在线) - duration Int? // 会话时长 (秒) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt - - @@index([installId]) - @@index([userId]) - @@index([sessionId]) - @@index([startTime]) - @@map("online_sessions") -} - -// ============================================================================= -// DAU Statistics (DAU 统计汇总) -// ============================================================================= - -model DauStatistics { - id String @id @default(uuid()) - date DateTime @unique @db.Date // 统计日期 - totalDau Int // 总DAU (按installId去重) - uniqueUsers Int // 唯一用户数 (按userId去重, 不含匿名) - anonymousUsers Int // 匿名用户数 (userId为null) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt - - @@index([date]) - @@map("dau_statistics") + @@index([ts(sort: Desc)], name: "idx_online_snapshots_ts") + @@map("analytics_online_snapshots") } diff --git a/backend/services/presence-service/src/api/api.module.ts b/backend/services/presence-service/src/api/api.module.ts new file mode 100644 index 00000000..c8aeb7f8 --- /dev/null +++ b/backend/services/presence-service/src/api/api.module.ts @@ -0,0 +1,15 @@ +import { Module } from '@nestjs/common'; +import { ApplicationModule } from '../application/application.module'; +import { AnalyticsController } from './controllers/analytics.controller'; +import { PresenceController } from './controllers/presence.controller'; +import { HealthController } from './controllers/health.controller'; + +@Module({ + imports: [ApplicationModule], + controllers: [ + AnalyticsController, + PresenceController, + HealthController, + ], +}) +export class ApiModule {} diff --git a/backend/services/presence-service/src/api/controllers/analytics.controller.ts b/backend/services/presence-service/src/api/controllers/analytics.controller.ts new file mode 100644 index 00000000..89bce657 --- /dev/null +++ b/backend/services/presence-service/src/api/controllers/analytics.controller.ts @@ -0,0 +1,36 @@ +import { Controller, Post, Get, Body, Query, UseGuards } from '@nestjs/common'; +import { CommandBus, QueryBus } from '@nestjs/cqrs'; +import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; +import { BatchEventsDto } from '../dto/request/batch-events.dto'; +import { QueryDauDto } from '../dto/request/query-dau.dto'; +import { DauStatsResponseDto } from '../dto/response/dau-stats.dto'; +import { RecordEventsCommand } from '../../application/commands/record-events/record-events.command'; +import { GetDauStatsQuery } from '../../application/queries/get-dau-stats/get-dau-stats.query'; +import { Public } from '../../shared/decorators/public.decorator'; +import { JwtAuthGuard } from '../../shared/guards/jwt-auth.guard'; + +@ApiTags('Analytics') +@Controller('analytics') +export class AnalyticsController { + constructor( + private readonly commandBus: CommandBus, + private readonly queryBus: QueryBus, + ) {} + + @Post('events') + @Public() + @ApiOperation({ summary: '批量上报事件' }) + async batchEvents(@Body() dto: BatchEventsDto) { + return this.commandBus.execute(new RecordEventsCommand(dto.events)); + } + + @Get('dau') + @UseGuards(JwtAuthGuard) + @ApiBearerAuth() + @ApiOperation({ summary: '查询DAU统计' }) + async getDauStats(@Query() dto: QueryDauDto): Promise { + return this.queryBus.execute( + new GetDauStatsQuery(new Date(dto.startDate), new Date(dto.endDate)), + ); + } +} diff --git a/backend/services/presence-service/src/api/controllers/health.controller.ts b/backend/services/presence-service/src/api/controllers/health.controller.ts new file mode 100644 index 00000000..0f1d2c11 --- /dev/null +++ b/backend/services/presence-service/src/api/controllers/health.controller.ts @@ -0,0 +1,18 @@ +import { Controller, Get } from '@nestjs/common'; +import { ApiTags, ApiOperation } from '@nestjs/swagger'; +import { Public } from '../../shared/decorators/public.decorator'; + +@ApiTags('Health') +@Controller('health') +export class HealthController { + @Get() + @Public() + @ApiOperation({ summary: '健康检查' }) + check() { + return { + status: 'ok', + service: 'presence-service', + timestamp: new Date().toISOString(), + }; + } +} diff --git a/backend/services/presence-service/src/api/controllers/presence.controller.ts b/backend/services/presence-service/src/api/controllers/presence.controller.ts new file mode 100644 index 00000000..93483ee0 --- /dev/null +++ b/backend/services/presence-service/src/api/controllers/presence.controller.ts @@ -0,0 +1,49 @@ +import { Controller, Post, Get, Body, UseGuards } from '@nestjs/common'; +import { CommandBus, QueryBus } from '@nestjs/cqrs'; +import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; +import { HeartbeatDto } from '../dto/request/heartbeat.dto'; +import { OnlineCountResponseDto } from '../dto/response/online-count.dto'; +import { RecordHeartbeatCommand } from '../../application/commands/record-heartbeat/record-heartbeat.command'; +import { GetOnlineCountQuery } from '../../application/queries/get-online-count/get-online-count.query'; +import { JwtAuthGuard } from '../../shared/guards/jwt-auth.guard'; +import { CurrentUser } from '../../shared/decorators/current-user.decorator'; + +@ApiTags('Presence') +@Controller('presence') +export class PresenceController { + constructor( + private readonly commandBus: CommandBus, + private readonly queryBus: QueryBus, + ) {} + + @Post('heartbeat') + @UseGuards(JwtAuthGuard) + @ApiBearerAuth() + @ApiOperation({ summary: '心跳上报' }) + async heartbeat( + @CurrentUser('userId') userId: bigint, + @Body() dto: HeartbeatDto, + ) { + return this.commandBus.execute( + new RecordHeartbeatCommand( + userId, + dto.installId, + dto.appVersion, + dto.clientTs, + ), + ); + } + + @Get('online-count') + @UseGuards(JwtAuthGuard) + @ApiBearerAuth() + @ApiOperation({ summary: '获取当前在线人数' }) + async getOnlineCount(): Promise { + const result = await this.queryBus.execute(new GetOnlineCountQuery()); + return { + count: result.count, + windowSeconds: result.windowSeconds, + queriedAt: result.queriedAt.toISOString(), + }; + } +} diff --git a/backend/services/presence-service/src/api/dto/request/batch-events.dto.ts b/backend/services/presence-service/src/api/dto/request/batch-events.dto.ts new file mode 100644 index 00000000..f9a3dd2d --- /dev/null +++ b/backend/services/presence-service/src/api/dto/request/batch-events.dto.ts @@ -0,0 +1,35 @@ +import { IsArray, IsString, IsOptional, IsNumber, ValidateNested, IsObject } from 'class-validator'; +import { Type } from 'class-transformer'; +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; + +export class EventItemDto { + @ApiProperty({ description: '事件名称', example: 'app_session_start' }) + @IsString() + eventName: string; + + @ApiPropertyOptional({ description: '用户ID (登录用户)', example: '12345' }) + @IsOptional() + @IsString() + userId?: string; + + @ApiProperty({ description: '安装ID', example: 'uuid-xxx-xxx' }) + @IsString() + installId: string; + + @ApiProperty({ description: '客户端时间戳 (秒)', example: 1732685100 }) + @IsNumber() + clientTs: number; + + @ApiPropertyOptional({ description: '事件属性' }) + @IsOptional() + @IsObject() + properties?: Record; +} + +export class BatchEventsDto { + @ApiProperty({ type: [EventItemDto], description: '事件列表' }) + @IsArray() + @ValidateNested({ each: true }) + @Type(() => EventItemDto) + events: EventItemDto[]; +} diff --git a/backend/services/presence-service/src/api/dto/request/heartbeat.dto.ts b/backend/services/presence-service/src/api/dto/request/heartbeat.dto.ts new file mode 100644 index 00000000..584ebce4 --- /dev/null +++ b/backend/services/presence-service/src/api/dto/request/heartbeat.dto.ts @@ -0,0 +1,16 @@ +import { IsString, IsNumber } from 'class-validator'; +import { ApiProperty } from '@nestjs/swagger'; + +export class HeartbeatDto { + @ApiProperty({ description: '安装ID', example: 'uuid-xxx-xxx' }) + @IsString() + installId: string; + + @ApiProperty({ description: 'App版本', example: '1.0.0' }) + @IsString() + appVersion: string; + + @ApiProperty({ description: '客户端时间戳 (秒)', example: 1732685100 }) + @IsNumber() + clientTs: number; +} diff --git a/backend/services/presence-service/src/api/dto/request/query-dau.dto.ts b/backend/services/presence-service/src/api/dto/request/query-dau.dto.ts new file mode 100644 index 00000000..63d8114b --- /dev/null +++ b/backend/services/presence-service/src/api/dto/request/query-dau.dto.ts @@ -0,0 +1,12 @@ +import { IsDateString } from 'class-validator'; +import { ApiProperty } from '@nestjs/swagger'; + +export class QueryDauDto { + @ApiProperty({ description: '开始日期', example: '2025-01-01' }) + @IsDateString() + startDate: string; + + @ApiProperty({ description: '结束日期', example: '2025-01-15' }) + @IsDateString() + endDate: string; +} diff --git a/backend/services/presence-service/src/api/dto/response/dau-stats.dto.ts b/backend/services/presence-service/src/api/dto/response/dau-stats.dto.ts new file mode 100644 index 00000000..f15b400a --- /dev/null +++ b/backend/services/presence-service/src/api/dto/response/dau-stats.dto.ts @@ -0,0 +1,23 @@ +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; + +export class DauDayItemDto { + @ApiProperty({ description: '日期', example: '2025-01-15' }) + day: string; + + @ApiProperty({ description: 'DAU', example: 5678 }) + dauCount: number; + + @ApiPropertyOptional({ description: '按省份统计' }) + byProvince?: Record; + + @ApiPropertyOptional({ description: '按城市统计' }) + byCity?: Record; +} + +export class DauStatsResponseDto { + @ApiProperty({ type: [DauDayItemDto], description: 'DAU数据' }) + data: DauDayItemDto[]; + + @ApiProperty({ description: '记录数', example: 15 }) + total: number; +} diff --git a/backend/services/presence-service/src/api/dto/response/online-count.dto.ts b/backend/services/presence-service/src/api/dto/response/online-count.dto.ts new file mode 100644 index 00000000..fac2d072 --- /dev/null +++ b/backend/services/presence-service/src/api/dto/response/online-count.dto.ts @@ -0,0 +1,12 @@ +import { ApiProperty } from '@nestjs/swagger'; + +export class OnlineCountResponseDto { + @ApiProperty({ description: '在线人数', example: 1234 }) + count: number; + + @ApiProperty({ description: '时间窗口(秒)', example: 180 }) + windowSeconds: number; + + @ApiProperty({ description: '查询时间', example: '2025-01-15T10:30:00.000Z' }) + queriedAt: string; +} diff --git a/backend/services/presence-service/src/app.module.ts b/backend/services/presence-service/src/app.module.ts new file mode 100644 index 00000000..4cc144e9 --- /dev/null +++ b/backend/services/presence-service/src/app.module.ts @@ -0,0 +1,22 @@ +import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { ScheduleModule } from '@nestjs/schedule'; +import { ApiModule } from './api/api.module'; +import { ApplicationModule } from './application/application.module'; +import { DomainModule } from './domain/domain.module'; +import { InfrastructureModule } from './infrastructure/infrastructure.module'; + +@Module({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + envFilePath: ['.env.local', '.env'], + }), + ScheduleModule.forRoot(), + DomainModule, + InfrastructureModule, + ApplicationModule, + ApiModule, + ], +}) +export class AppModule {} diff --git a/backend/services/presence-service/src/application/application.module.ts b/backend/services/presence-service/src/application/application.module.ts new file mode 100644 index 00000000..dd54c018 --- /dev/null +++ b/backend/services/presence-service/src/application/application.module.ts @@ -0,0 +1,32 @@ +import { Module } from '@nestjs/common'; +import { CqrsModule } from '@nestjs/cqrs'; +import { DomainModule } from '../domain/domain.module'; +import { InfrastructureModule } from '../infrastructure/infrastructure.module'; +import { RecordEventsHandler } from './commands/record-events/record-events.handler'; +import { RecordHeartbeatHandler } from './commands/record-heartbeat/record-heartbeat.handler'; +import { CalculateDauHandler } from './commands/calculate-dau/calculate-dau.handler'; +import { GetOnlineCountHandler } from './queries/get-online-count/get-online-count.handler'; +import { GetDauStatsHandler } from './queries/get-dau-stats/get-dau-stats.handler'; +import { AnalyticsScheduler } from './schedulers/analytics.scheduler'; + +const CommandHandlers = [ + RecordEventsHandler, + RecordHeartbeatHandler, + CalculateDauHandler, +]; + +const QueryHandlers = [ + GetOnlineCountHandler, + GetDauStatsHandler, +]; + +@Module({ + imports: [CqrsModule, DomainModule, InfrastructureModule], + providers: [ + ...CommandHandlers, + ...QueryHandlers, + AnalyticsScheduler, + ], + exports: [CqrsModule], +}) +export class ApplicationModule {} diff --git a/backend/services/presence-service/src/application/commands/calculate-dau/calculate-dau.command.ts b/backend/services/presence-service/src/application/commands/calculate-dau/calculate-dau.command.ts new file mode 100644 index 00000000..62c5c0a2 --- /dev/null +++ b/backend/services/presence-service/src/application/commands/calculate-dau/calculate-dau.command.ts @@ -0,0 +1,3 @@ +export class CalculateDauCommand { + constructor(public readonly date: Date) {} +} diff --git a/backend/services/presence-service/src/application/commands/calculate-dau/calculate-dau.handler.ts b/backend/services/presence-service/src/application/commands/calculate-dau/calculate-dau.handler.ts new file mode 100644 index 00000000..bbb46d1a --- /dev/null +++ b/backend/services/presence-service/src/application/commands/calculate-dau/calculate-dau.handler.ts @@ -0,0 +1,61 @@ +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { CalculateDauCommand } from './calculate-dau.command'; +import { + IEventLogRepository, + EVENT_LOG_REPOSITORY, +} from '../../../domain/repositories/event-log.repository.interface'; +import { + IDailyActiveStatsRepository, + DAILY_ACTIVE_STATS_REPOSITORY, +} from '../../../domain/repositories/daily-active-stats.repository.interface'; +import { DauCalculationService } from '../../../domain/services/dau-calculation.service'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { + startOfDayInTimezone, + endOfDayInTimezone, +} from '../../../shared/utils/timezone.util'; + +@Injectable() +@CommandHandler(CalculateDauCommand) +export class CalculateDauHandler implements ICommandHandler { + private readonly logger = new Logger(CalculateDauHandler.name); + + constructor( + @Inject(EVENT_LOG_REPOSITORY) + private readonly eventLogRepository: IEventLogRepository, + @Inject(DAILY_ACTIVE_STATS_REPOSITORY) + private readonly dauStatsRepository: IDailyActiveStatsRepository, + private readonly dauCalculationService: DauCalculationService, + ) {} + + async execute(command: CalculateDauCommand): Promise { + const { date } = command; + const timezone = 'Asia/Shanghai'; + + const startOfDay = startOfDayInTimezone(date, timezone); + const endOfDay = endOfDayInTimezone(date, timezone); + + this.logger.log(`Calculating DAU for ${date.toISOString().split('T')[0]}`); + + // 1. 查询去重用户数 + const result = await this.eventLogRepository.queryDau( + EventName.APP_SESSION_START, + startOfDay, + endOfDay, + ); + + // 2. 创建或更新统计聚合 + const existingStats = await this.dauStatsRepository.findByDate(date); + + if (existingStats) { + existingStats.recalculate(result.total, result.byProvince, result.byCity); + await this.dauStatsRepository.upsert(existingStats); + } else { + const stats = this.dauCalculationService.createStatsFromQueryResult(date, result); + await this.dauStatsRepository.upsert(stats); + } + + this.logger.log(`DAU calculated: ${result.total} users`); + } +} diff --git a/backend/services/presence-service/src/application/commands/record-events/record-events.command.ts b/backend/services/presence-service/src/application/commands/record-events/record-events.command.ts new file mode 100644 index 00000000..c59a9781 --- /dev/null +++ b/backend/services/presence-service/src/application/commands/record-events/record-events.command.ts @@ -0,0 +1,11 @@ +export interface EventItemDto { + eventName: string; + userId?: string; + installId: string; + clientTs: number; + properties?: Record; +} + +export class RecordEventsCommand { + constructor(public readonly events: EventItemDto[]) {} +} diff --git a/backend/services/presence-service/src/application/commands/record-events/record-events.handler.ts b/backend/services/presence-service/src/application/commands/record-events/record-events.handler.ts new file mode 100644 index 00000000..3c4ea236 --- /dev/null +++ b/backend/services/presence-service/src/application/commands/record-events/record-events.handler.ts @@ -0,0 +1,97 @@ +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Inject, Injectable } from '@nestjs/common'; +import { RecordEventsCommand, EventItemDto } from './record-events.command'; +import { EventLog } from '../../../domain/entities/event-log.entity'; +import { InstallId } from '../../../domain/value-objects/install-id.vo'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { EventProperties } from '../../../domain/value-objects/event-properties.vo'; +import { + IEventLogRepository, + EVENT_LOG_REPOSITORY, +} from '../../../domain/repositories/event-log.repository.interface'; +import { RedisService } from '../../../infrastructure/redis/redis.service'; +import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service'; +import { SessionStartedEvent } from '../../../domain/events/session-started.event'; +import { formatToDateKey } from '../../../shared/utils/timezone.util'; + +export interface RecordEventsResult { + accepted: number; + failed: number; + errors?: string[]; +} + +@Injectable() +@CommandHandler(RecordEventsCommand) +export class RecordEventsHandler implements ICommandHandler { + constructor( + @Inject(EVENT_LOG_REPOSITORY) + private readonly eventLogRepository: IEventLogRepository, + private readonly redisService: RedisService, + private readonly eventPublisher: EventPublisherService, + ) {} + + async execute(command: RecordEventsCommand): Promise { + const { events } = command; + const errors: string[] = []; + const validLogs: EventLog[] = []; + + // 1. 验证并转换事件 + for (let i = 0; i < events.length; i++) { + try { + const log = this.toEventLog(events[i]); + validLogs.push(log); + } catch (e) { + errors.push(`Event[${i}]: ${e.message}`); + } + } + + if (validLogs.length === 0) { + return { accepted: 0, failed: events.length, errors }; + } + + // 2. 批量写入数据库 + await this.eventLogRepository.batchInsert(validLogs); + + // 3. 更新实时DAU (HyperLogLog) + const todayKey = formatToDateKey(new Date()); + for (const log of validLogs) { + if (log.eventName.isDauEvent()) { + await this.redisService.pfadd( + `analytics:dau:${todayKey}`, + log.dauIdentifier, + ); + + // 4. 发布领域事件 + await this.eventPublisher.publish( + SessionStartedEvent.EVENT_NAME, + new SessionStartedEvent( + log.userId, + log.installId.value, + log.eventTime, + { + appVersion: log.properties.appVersion, + os: log.properties.os, + osVersion: log.properties.osVersion, + }, + ), + ); + } + } + + return { + accepted: validLogs.length, + failed: events.length - validLogs.length, + errors: errors.length > 0 ? errors : undefined, + }; + } + + private toEventLog(dto: EventItemDto): EventLog { + return EventLog.create({ + userId: dto.userId ? BigInt(dto.userId) : null, + installId: InstallId.fromString(dto.installId), + eventName: EventName.fromString(dto.eventName), + eventTime: new Date(dto.clientTs * 1000), + properties: EventProperties.fromData(dto.properties ?? {}), + }); + } +} diff --git a/backend/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.command.ts b/backend/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.command.ts new file mode 100644 index 00000000..a4f8c884 --- /dev/null +++ b/backend/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.command.ts @@ -0,0 +1,8 @@ +export class RecordHeartbeatCommand { + constructor( + public readonly userId: bigint, + public readonly installId: string, + public readonly appVersion: string, + public readonly clientTs: number, + ) {} +} diff --git a/backend/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.handler.ts b/backend/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.handler.ts new file mode 100644 index 00000000..dc942b35 --- /dev/null +++ b/backend/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.handler.ts @@ -0,0 +1,36 @@ +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Injectable } from '@nestjs/common'; +import { RecordHeartbeatCommand } from './record-heartbeat.command'; +import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository'; +import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service'; +import { HeartbeatReceivedEvent } from '../../../domain/events/heartbeat-received.event'; + +export interface RecordHeartbeatResult { + ok: boolean; + serverTs: number; +} + +@Injectable() +@CommandHandler(RecordHeartbeatCommand) +export class RecordHeartbeatHandler implements ICommandHandler { + constructor( + private readonly presenceRedisRepository: PresenceRedisRepository, + private readonly eventPublisher: EventPublisherService, + ) {} + + async execute(command: RecordHeartbeatCommand): Promise { + const { userId, installId, appVersion, clientTs } = command; + const now = Math.floor(Date.now() / 1000); + + // 1. 更新Redis在线状态 + await this.presenceRedisRepository.updateUserPresence(userId.toString(), now); + + // 2. 发布领域事件 + await this.eventPublisher.publish( + HeartbeatReceivedEvent.EVENT_NAME, + new HeartbeatReceivedEvent(userId, installId, new Date()), + ); + + return { ok: true, serverTs: now }; + } +} diff --git a/backend/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.handler.ts b/backend/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.handler.ts new file mode 100644 index 00000000..7a80a783 --- /dev/null +++ b/backend/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.handler.ts @@ -0,0 +1,46 @@ +import { IQueryHandler, QueryHandler } from '@nestjs/cqrs'; +import { Inject, Injectable } from '@nestjs/common'; +import { GetDauStatsQuery } from './get-dau-stats.query'; +import { + IDailyActiveStatsRepository, + DAILY_ACTIVE_STATS_REPOSITORY, +} from '../../../domain/repositories/daily-active-stats.repository.interface'; + +export interface DauStatsItem { + day: string; + dauCount: number; + byProvince?: Record; + byCity?: Record; +} + +export interface DauStatsResult { + data: DauStatsItem[]; + total: number; +} + +@Injectable() +@QueryHandler(GetDauStatsQuery) +export class GetDauStatsHandler implements IQueryHandler { + constructor( + @Inject(DAILY_ACTIVE_STATS_REPOSITORY) + private readonly dauStatsRepository: IDailyActiveStatsRepository, + ) {} + + async execute(query: GetDauStatsQuery): Promise { + const { startDate, endDate } = query; + + const statsList = await this.dauStatsRepository.findByDateRange(startDate, endDate); + + const data: DauStatsItem[] = statsList.map((stats) => ({ + day: stats.day.toISOString().split('T')[0], + dauCount: stats.dauCount, + byProvince: Object.fromEntries(stats.dauByProvince), + byCity: Object.fromEntries(stats.dauByCity), + })); + + return { + data, + total: data.length, + }; + } +} diff --git a/backend/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.query.ts b/backend/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.query.ts new file mode 100644 index 00000000..11930191 --- /dev/null +++ b/backend/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.query.ts @@ -0,0 +1,6 @@ +export class GetDauStatsQuery { + constructor( + public readonly startDate: Date, + public readonly endDate: Date, + ) {} +} diff --git a/backend/services/presence-service/src/application/queries/get-online-count/get-online-count.handler.ts b/backend/services/presence-service/src/application/queries/get-online-count/get-online-count.handler.ts new file mode 100644 index 00000000..af11f083 --- /dev/null +++ b/backend/services/presence-service/src/application/queries/get-online-count/get-online-count.handler.ts @@ -0,0 +1,33 @@ +import { IQueryHandler, QueryHandler } from '@nestjs/cqrs'; +import { Injectable } from '@nestjs/common'; +import { GetOnlineCountQuery } from './get-online-count.query'; +import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository'; +import { OnlineDetectionService } from '../../../domain/services/online-detection.service'; + +export interface OnlineCountResult { + count: number; + windowSeconds: number; + queriedAt: Date; +} + +@Injectable() +@QueryHandler(GetOnlineCountQuery) +export class GetOnlineCountHandler implements IQueryHandler { + constructor( + private readonly presenceRedisRepository: PresenceRedisRepository, + private readonly onlineDetectionService: OnlineDetectionService, + ) {} + + async execute(query: GetOnlineCountQuery): Promise { + const now = new Date(); + const threshold = this.onlineDetectionService.getOnlineThreshold(now); + + const count = await this.presenceRedisRepository.countOnlineUsers(threshold); + + return { + count, + windowSeconds: this.onlineDetectionService.getWindowSeconds(), + queriedAt: now, + }; + } +} diff --git a/backend/services/presence-service/src/application/queries/get-online-count/get-online-count.query.ts b/backend/services/presence-service/src/application/queries/get-online-count/get-online-count.query.ts new file mode 100644 index 00000000..35e422ff --- /dev/null +++ b/backend/services/presence-service/src/application/queries/get-online-count/get-online-count.query.ts @@ -0,0 +1,3 @@ +export class GetOnlineCountQuery { + constructor() {} +} diff --git a/backend/services/presence-service/src/application/schedulers/analytics.scheduler.ts b/backend/services/presence-service/src/application/schedulers/analytics.scheduler.ts new file mode 100644 index 00000000..45dfae9b --- /dev/null +++ b/backend/services/presence-service/src/application/schedulers/analytics.scheduler.ts @@ -0,0 +1,86 @@ +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { CommandBus } from '@nestjs/cqrs'; +import { subDays } from 'date-fns'; +import { CalculateDauCommand } from '../commands/calculate-dau/calculate-dau.command'; +import { PresenceRedisRepository } from '../../infrastructure/redis/presence-redis.repository'; +import { OnlineDetectionService } from '../../domain/services/online-detection.service'; +import { OnlineSnapshot } from '../../domain/entities/online-snapshot.entity'; +import { IOnlineSnapshotRepository, ONLINE_SNAPSHOT_REPOSITORY } from '../../domain/repositories/online-snapshot.repository.interface'; + +@Injectable() +export class AnalyticsScheduler { + private readonly logger = new Logger(AnalyticsScheduler.name); + + constructor( + private readonly commandBus: CommandBus, + private readonly presenceRedisRepository: PresenceRedisRepository, + private readonly onlineDetectionService: OnlineDetectionService, + @Inject(ONLINE_SNAPSHOT_REPOSITORY) + private readonly snapshotRepository: IOnlineSnapshotRepository, + ) {} + + /** + * 每分钟记录在线人数快照 + */ + @Cron(CronExpression.EVERY_MINUTE) + async recordOnlineSnapshot(): Promise { + try { + const now = new Date(); + const threshold = this.onlineDetectionService.getOnlineThreshold(now); + const count = await this.presenceRedisRepository.countOnlineUsers(threshold); + + const snapshot = OnlineSnapshot.create({ + ts: now, + onlineCount: count, + windowSeconds: this.onlineDetectionService.getWindowSeconds(), + }); + + await this.snapshotRepository.insert(snapshot); + this.logger.debug(`Online snapshot recorded: ${count} users`); + } catch (error) { + this.logger.error('Failed to record online snapshot', error); + } + } + + /** + * 每小时清理过期在线数据 + */ + @Cron(CronExpression.EVERY_HOUR) + async cleanupExpiredPresence(): Promise { + try { + const threshold = Math.floor(Date.now() / 1000) - 86400; // 24小时前 + await this.presenceRedisRepository.removeExpiredUsers(threshold); + this.logger.log('Expired presence data cleaned up'); + } catch (error) { + this.logger.error('Failed to cleanup expired presence', error); + } + } + + /** + * 每天凌晨 1:00 计算前一天 DAU (Asia/Shanghai) + */ + @Cron('0 0 1 * * *', { timeZone: 'Asia/Shanghai' }) + async calculateYesterdayDau(): Promise { + try { + const yesterday = subDays(new Date(), 1); + await this.commandBus.execute(new CalculateDauCommand(yesterday)); + this.logger.log('Yesterday DAU calculated'); + } catch (error) { + this.logger.error('Failed to calculate yesterday DAU', error); + } + } + + /** + * 每小时滚动计算当天 DAU (用于实时看板) + */ + @Cron(CronExpression.EVERY_HOUR) + async calculateTodayDauRolling(): Promise { + try { + await this.commandBus.execute(new CalculateDauCommand(new Date())); + this.logger.debug('Today DAU rolling calculation completed'); + } catch (error) { + this.logger.error('Failed to calculate today DAU', error); + } + } +} diff --git a/backend/services/presence-service/src/domain/aggregates/daily-active-stats/daily-active-stats.aggregate.ts b/backend/services/presence-service/src/domain/aggregates/daily-active-stats/daily-active-stats.aggregate.ts new file mode 100644 index 00000000..21ecb64e --- /dev/null +++ b/backend/services/presence-service/src/domain/aggregates/daily-active-stats/daily-active-stats.aggregate.ts @@ -0,0 +1,89 @@ +import { AggregateRoot } from '@nestjs/cqrs'; +import { DauCalculatedEvent } from '../../events/dau-calculated.event'; + +export class DailyActiveStats extends AggregateRoot { + private _day: Date; + private _dauCount: number; + private _dauByProvince: Map; + private _dauByCity: Map; + private _calculatedAt: Date; + private _version: number; + + private constructor() { + super(); + } + + // Getters + get day(): Date { + return this._day; + } + + get dauCount(): number { + return this._dauCount; + } + + get dauByProvince(): Map { + return new Map(this._dauByProvince); + } + + get dauByCity(): Map { + return new Map(this._dauByCity); + } + + get calculatedAt(): Date { + return this._calculatedAt; + } + + get version(): number { + return this._version; + } + + // 工厂方法 + static create(props: { + day: Date; + dauCount: number; + dauByProvince?: Map; + dauByCity?: Map; + }): DailyActiveStats { + const stats = new DailyActiveStats(); + stats._day = props.day; + stats._dauCount = props.dauCount; + stats._dauByProvince = props.dauByProvince ?? new Map(); + stats._dauByCity = props.dauByCity ?? new Map(); + stats._calculatedAt = new Date(); + stats._version = 1; + + stats.apply(new DauCalculatedEvent(stats._day, stats._dauCount)); + return stats; + } + + // 重新计算 + recalculate(newDauCount: number, byProvince?: Map, byCity?: Map): void { + this._dauCount = newDauCount; + if (byProvince) this._dauByProvince = byProvince; + if (byCity) this._dauByCity = byCity; + this._calculatedAt = new Date(); + this._version++; + + this.apply(new DauCalculatedEvent(this._day, this._dauCount)); + } + + // 从持久化恢复 + static reconstitute(props: { + day: Date; + dauCount: number; + dauByProvince: Map; + dauByCity: Map; + calculatedAt: Date; + version: number; + }): DailyActiveStats { + const stats = new DailyActiveStats(); + stats._day = props.day; + stats._dauCount = props.dauCount; + stats._dauByProvince = props.dauByProvince; + stats._dauByCity = props.dauByCity; + stats._calculatedAt = props.calculatedAt; + stats._version = props.version; + return stats; + } +} diff --git a/backend/services/presence-service/src/domain/domain.module.ts b/backend/services/presence-service/src/domain/domain.module.ts new file mode 100644 index 00000000..30d1e4a6 --- /dev/null +++ b/backend/services/presence-service/src/domain/domain.module.ts @@ -0,0 +1,15 @@ +import { Module } from '@nestjs/common'; +import { DauCalculationService } from './services/dau-calculation.service'; +import { OnlineDetectionService } from './services/online-detection.service'; + +@Module({ + providers: [ + DauCalculationService, + OnlineDetectionService, + ], + exports: [ + DauCalculationService, + OnlineDetectionService, + ], +}) +export class DomainModule {} diff --git a/backend/services/presence-service/src/domain/entities/event-log.entity.ts b/backend/services/presence-service/src/domain/entities/event-log.entity.ts new file mode 100644 index 00000000..e8c5fd53 --- /dev/null +++ b/backend/services/presence-service/src/domain/entities/event-log.entity.ts @@ -0,0 +1,92 @@ +import { InstallId } from '../value-objects/install-id.vo'; +import { EventName } from '../value-objects/event-name.vo'; +import { EventProperties } from '../value-objects/event-properties.vo'; + +export class EventLog { + private _id: bigint | null; + private _userId: bigint | null; + private _installId: InstallId; + private _eventName: EventName; + private _eventTime: Date; + private _properties: EventProperties; + private _createdAt: Date; + + private constructor() {} + + // Getters + get id(): bigint | null { + return this._id; + } + + get userId(): bigint | null { + return this._userId; + } + + get installId(): InstallId { + return this._installId; + } + + get eventName(): EventName { + return this._eventName; + } + + get eventTime(): Date { + return this._eventTime; + } + + get properties(): EventProperties { + return this._properties; + } + + get createdAt(): Date { + return this._createdAt; + } + + /** + * 获取用于DAU去重的唯一标识 + * 优先使用 userId,否则使用 installId + */ + get dauIdentifier(): string { + return this._userId?.toString() ?? this._installId.value; + } + + // 工厂方法 + static create(props: { + userId?: bigint | null; + installId: InstallId; + eventName: EventName; + eventTime: Date; + properties?: EventProperties; + }): EventLog { + const log = new EventLog(); + log._id = null; + log._userId = props.userId ?? null; + log._installId = props.installId; + log._eventName = props.eventName; + log._eventTime = props.eventTime; + log._properties = props.properties ?? EventProperties.empty(); + log._createdAt = new Date(); + return log; + } + + // 从持久化恢复 + static reconstitute(props: { + id: bigint; + userId: bigint | null; + installId: InstallId; + eventName: EventName; + eventTime: Date; + properties: EventProperties; + createdAt: Date; + }): EventLog { + const log = new EventLog(); + log._id = props.id; + log._userId = props.userId; + log._installId = props.installId; + log._eventName = props.eventName; + log._eventTime = props.eventTime; + log._properties = props.properties; + log._createdAt = props.createdAt; + return log; + } +} diff --git a/backend/services/presence-service/src/domain/entities/online-snapshot.entity.ts b/backend/services/presence-service/src/domain/entities/online-snapshot.entity.ts new file mode 100644 index 00000000..e2ffc031 --- /dev/null +++ b/backend/services/presence-service/src/domain/entities/online-snapshot.entity.ts @@ -0,0 +1,53 @@ +import { TimeWindow } from '../value-objects/time-window.vo'; + +export class OnlineSnapshot { + private _id: bigint | null; + private _ts: Date; + private _onlineCount: number; + private _windowSeconds: number; + + private constructor() {} + + get id(): bigint | null { + return this._id; + } + + get ts(): Date { + return this._ts; + } + + get onlineCount(): number { + return this._onlineCount; + } + + get windowSeconds(): number { + return this._windowSeconds; + } + + static create(props: { + ts: Date; + onlineCount: number; + windowSeconds?: number; + }): OnlineSnapshot { + const snapshot = new OnlineSnapshot(); + snapshot._id = null; + snapshot._ts = props.ts; + snapshot._onlineCount = props.onlineCount; + snapshot._windowSeconds = props.windowSeconds ?? TimeWindow.DEFAULT_ONLINE_WINDOW_SECONDS; + return snapshot; + } + + static reconstitute(props: { + id: bigint; + ts: Date; + onlineCount: number; + windowSeconds: number; + }): OnlineSnapshot { + const snapshot = new OnlineSnapshot(); + snapshot._id = props.id; + snapshot._ts = props.ts; + snapshot._onlineCount = props.onlineCount; + snapshot._windowSeconds = props.windowSeconds; + return snapshot; + } +} diff --git a/backend/services/presence-service/src/domain/events/dau-calculated.event.ts b/backend/services/presence-service/src/domain/events/dau-calculated.event.ts new file mode 100644 index 00000000..287ee7fc --- /dev/null +++ b/backend/services/presence-service/src/domain/events/dau-calculated.event.ts @@ -0,0 +1,8 @@ +export class DauCalculatedEvent { + static readonly EVENT_NAME = 'analytics.dau.calculated'; + + constructor( + public readonly day: Date, + public readonly dauCount: number, + ) {} +} diff --git a/backend/services/presence-service/src/domain/events/heartbeat-received.event.ts b/backend/services/presence-service/src/domain/events/heartbeat-received.event.ts new file mode 100644 index 00000000..60e55c0f --- /dev/null +++ b/backend/services/presence-service/src/domain/events/heartbeat-received.event.ts @@ -0,0 +1,9 @@ +export class HeartbeatReceivedEvent { + static readonly EVENT_NAME = 'presence.heartbeat.received'; + + constructor( + public readonly userId: bigint, + public readonly installId: string, + public readonly occurredAt: Date, + ) {} +} diff --git a/backend/services/presence-service/src/domain/events/session-started.event.ts b/backend/services/presence-service/src/domain/events/session-started.event.ts new file mode 100644 index 00000000..6d1b0130 --- /dev/null +++ b/backend/services/presence-service/src/domain/events/session-started.event.ts @@ -0,0 +1,14 @@ +export class SessionStartedEvent { + static readonly EVENT_NAME = 'analytics.session.started'; + + constructor( + public readonly userId: bigint | null, + public readonly installId: string, + public readonly occurredAt: Date, + public readonly metadata: { + appVersion?: string; + os?: string; + osVersion?: string; + }, + ) {} +} diff --git a/backend/services/presence-service/src/domain/repositories/daily-active-stats.repository.interface.ts b/backend/services/presence-service/src/domain/repositories/daily-active-stats.repository.interface.ts new file mode 100644 index 00000000..b0ef6375 --- /dev/null +++ b/backend/services/presence-service/src/domain/repositories/daily-active-stats.repository.interface.ts @@ -0,0 +1,20 @@ +import { DailyActiveStats } from '../aggregates/daily-active-stats/daily-active-stats.aggregate'; + +export interface IDailyActiveStatsRepository { + /** + * 保存或更新日活统计 + */ + upsert(stats: DailyActiveStats): Promise; + + /** + * 按日期查询 + */ + findByDate(day: Date): Promise; + + /** + * 按日期范围查询 + */ + findByDateRange(startDate: Date, endDate: Date): Promise; +} + +export const DAILY_ACTIVE_STATS_REPOSITORY = Symbol('IDailyActiveStatsRepository'); diff --git a/backend/services/presence-service/src/domain/repositories/event-log.repository.interface.ts b/backend/services/presence-service/src/domain/repositories/event-log.repository.interface.ts new file mode 100644 index 00000000..3ae777bb --- /dev/null +++ b/backend/services/presence-service/src/domain/repositories/event-log.repository.interface.ts @@ -0,0 +1,41 @@ +import { EventLog } from '../entities/event-log.entity'; +import { EventName } from '../value-objects/event-name.vo'; + +export interface DauQueryResult { + total: number; + byProvince: Map; + byCity: Map; +} + +export interface IEventLogRepository { + /** + * 批量插入事件日志 + */ + batchInsert(logs: EventLog[]): Promise; + + /** + * 插入单条事件日志 + */ + insert(log: EventLog): Promise; + + /** + * 查询DAU(去重用户数) + */ + queryDau( + eventName: EventName, + startTime: Date, + endTime: Date, + ): Promise; + + /** + * 按时间范围查询事件 + */ + findByTimeRange( + eventName: EventName, + startTime: Date, + endTime: Date, + limit?: number, + ): Promise; +} + +export const EVENT_LOG_REPOSITORY = Symbol('IEventLogRepository'); diff --git a/backend/services/presence-service/src/domain/repositories/online-snapshot.repository.interface.ts b/backend/services/presence-service/src/domain/repositories/online-snapshot.repository.interface.ts new file mode 100644 index 00000000..c36a75c6 --- /dev/null +++ b/backend/services/presence-service/src/domain/repositories/online-snapshot.repository.interface.ts @@ -0,0 +1,24 @@ +import { OnlineSnapshot } from '../entities/online-snapshot.entity'; + +export interface IOnlineSnapshotRepository { + /** + * 插入快照 + */ + insert(snapshot: OnlineSnapshot): Promise; + + /** + * 按时间范围查询 + */ + findByTimeRange( + startTime: Date, + endTime: Date, + interval?: '1m' | '5m' | '1h', + ): Promise; + + /** + * 获取最新快照 + */ + findLatest(): Promise; +} + +export const ONLINE_SNAPSHOT_REPOSITORY = Symbol('IOnlineSnapshotRepository'); diff --git a/backend/services/presence-service/src/domain/services/dau-calculation.service.ts b/backend/services/presence-service/src/domain/services/dau-calculation.service.ts new file mode 100644 index 00000000..84bc77e8 --- /dev/null +++ b/backend/services/presence-service/src/domain/services/dau-calculation.service.ts @@ -0,0 +1,49 @@ +import { Injectable } from '@nestjs/common'; +import { DailyActiveStats } from '../aggregates/daily-active-stats/daily-active-stats.aggregate'; +import { DauQueryResult } from '../repositories/event-log.repository.interface'; + +@Injectable() +export class DauCalculationService { + /** + * 从查询结果创建日活统计聚合 + */ + createStatsFromQueryResult(day: Date, result: DauQueryResult): DailyActiveStats { + return DailyActiveStats.create({ + day, + dauCount: result.total, + dauByProvince: result.byProvince, + dauByCity: result.byCity, + }); + } + + /** + * 合并多个查询结果(用于增量计算) + */ + mergeQueryResults(results: DauQueryResult[]): DauQueryResult { + const allIdentifiers = new Set(); + const byProvince = new Map>(); + const byCity = new Map>(); + + // 注意:这里简化处理,实际需要原始数据才能正确去重 + // 生产环境应该在数据库层面完成去重 + let total = 0; + const provinceCount = new Map(); + const cityCount = new Map(); + + for (const result of results) { + total = Math.max(total, result.total); + for (const [province, count] of result.byProvince) { + provinceCount.set(province, Math.max(provinceCount.get(province) ?? 0, count)); + } + for (const [city, count] of result.byCity) { + cityCount.set(city, Math.max(cityCount.get(city) ?? 0, count)); + } + } + + return { + total, + byProvince: provinceCount, + byCity: cityCount, + }; + } +} diff --git a/backend/services/presence-service/src/domain/services/online-detection.service.ts b/backend/services/presence-service/src/domain/services/online-detection.service.ts new file mode 100644 index 00000000..e46badeb --- /dev/null +++ b/backend/services/presence-service/src/domain/services/online-detection.service.ts @@ -0,0 +1,33 @@ +import { Injectable } from '@nestjs/common'; +import { TimeWindow } from '../value-objects/time-window.vo'; + +@Injectable() +export class OnlineDetectionService { + private readonly timeWindow: TimeWindow; + + constructor() { + this.timeWindow = TimeWindow.default(); + } + + /** + * 判断用户是否在线 + */ + isOnline(lastHeartbeatTs: number, now: Date = new Date()): boolean { + const threshold = this.timeWindow.getThresholdTimestamp(now); + return lastHeartbeatTs > threshold; + } + + /** + * 获取在线判定阈值时间戳 + */ + getOnlineThreshold(now: Date = new Date()): number { + return this.timeWindow.getThresholdTimestamp(now); + } + + /** + * 获取窗口秒数 + */ + getWindowSeconds(): number { + return this.timeWindow.windowSeconds; + } +} diff --git a/backend/services/presence-service/src/domain/value-objects/device-info.vo.ts b/backend/services/presence-service/src/domain/value-objects/device-info.vo.ts new file mode 100644 index 00000000..51da821d --- /dev/null +++ b/backend/services/presence-service/src/domain/value-objects/device-info.vo.ts @@ -0,0 +1,49 @@ +export class DeviceInfo { + private readonly _os: 'Android' | 'iOS'; + private readonly _osVersion: string; + private readonly _deviceModel?: string; + private readonly _screenResolution?: string; + + private constructor(props: { + os: 'Android' | 'iOS'; + osVersion: string; + deviceModel?: string; + screenResolution?: string; + }) { + this._os = props.os; + this._osVersion = props.osVersion; + this._deviceModel = props.deviceModel; + this._screenResolution = props.screenResolution; + } + + get os(): 'Android' | 'iOS' { + return this._os; + } + + get osVersion(): string { + return this._osVersion; + } + + get deviceModel(): string | undefined { + return this._deviceModel; + } + + get screenResolution(): string | undefined { + return this._screenResolution; + } + + static create(props: { + os: string; + osVersion: string; + deviceModel?: string; + screenResolution?: string; + }): DeviceInfo { + const normalizedOs = props.os.toLowerCase() === 'ios' ? 'iOS' : 'Android'; + return new DeviceInfo({ + os: normalizedOs, + osVersion: props.osVersion, + deviceModel: props.deviceModel, + screenResolution: props.screenResolution, + }); + } +} diff --git a/backend/services/presence-service/src/domain/value-objects/event-name.vo.ts b/backend/services/presence-service/src/domain/value-objects/event-name.vo.ts new file mode 100644 index 00000000..a671e368 --- /dev/null +++ b/backend/services/presence-service/src/domain/value-objects/event-name.vo.ts @@ -0,0 +1,48 @@ +import { DomainException } from '../../shared/exceptions/domain.exception'; + +export class EventName { + // 预定义的事件名 + static readonly APP_SESSION_START = new EventName('app_session_start'); + static readonly PRESENCE_HEARTBEAT = new EventName('presence_heartbeat'); + static readonly APP_SESSION_END = new EventName('app_session_end'); + + private readonly _value: string; + + private constructor(value: string) { + this._value = value; + } + + get value(): string { + return this._value; + } + + static fromString(value: string): EventName { + if (!value || value.trim() === '') { + throw new DomainException('EventName cannot be empty'); + } + const trimmed = value.trim().toLowerCase(); + if (trimmed.length > 64) { + throw new DomainException('EventName cannot exceed 64 characters'); + } + // 验证格式:字母、数字、下划线 + if (!/^[a-z][a-z0-9_]*$/.test(trimmed)) { + throw new DomainException('EventName must start with letter and contain only lowercase letters, numbers, and underscores'); + } + return new EventName(trimmed); + } + + /** + * 是否为DAU统计事件 + */ + isDauEvent(): boolean { + return this._value === EventName.APP_SESSION_START.value; + } + + equals(other: EventName): boolean { + return this._value === other._value; + } + + toString(): string { + return this._value; + } +} diff --git a/backend/services/presence-service/src/domain/value-objects/event-properties.vo.ts b/backend/services/presence-service/src/domain/value-objects/event-properties.vo.ts new file mode 100644 index 00000000..30b80649 --- /dev/null +++ b/backend/services/presence-service/src/domain/value-objects/event-properties.vo.ts @@ -0,0 +1,62 @@ +export interface EventPropertiesData { + os?: string; + osVersion?: string; + appVersion?: string; + networkType?: string; + clientTs?: number; + province?: string; + city?: string; + [key: string]: unknown; +} + +export class EventProperties { + private readonly _data: EventPropertiesData; + + private constructor(data: EventPropertiesData) { + this._data = { ...data }; + } + + get data(): EventPropertiesData { + return { ...this._data }; + } + + get os(): string | undefined { + return this._data.os; + } + + get osVersion(): string | undefined { + return this._data.osVersion; + } + + get appVersion(): string | undefined { + return this._data.appVersion; + } + + get networkType(): string | undefined { + return this._data.networkType; + } + + get clientTs(): number | undefined { + return this._data.clientTs; + } + + get province(): string | undefined { + return this._data.province; + } + + get city(): string | undefined { + return this._data.city; + } + + static empty(): EventProperties { + return new EventProperties({}); + } + + static fromData(data: EventPropertiesData): EventProperties { + return new EventProperties(data); + } + + toJSON(): EventPropertiesData { + return this._data; + } +} diff --git a/backend/services/presence-service/src/domain/value-objects/install-id.vo.ts b/backend/services/presence-service/src/domain/value-objects/install-id.vo.ts new file mode 100644 index 00000000..c61c32bb --- /dev/null +++ b/backend/services/presence-service/src/domain/value-objects/install-id.vo.ts @@ -0,0 +1,37 @@ +import { v4 as uuidv4, validate as uuidValidate } from 'uuid'; +import { DomainException } from '../../shared/exceptions/domain.exception'; + +export class InstallId { + private readonly _value: string; + + private constructor(value: string) { + this._value = value; + } + + get value(): string { + return this._value; + } + + static generate(): InstallId { + return new InstallId(uuidv4()); + } + + static fromString(value: string): InstallId { + if (!value || value.trim() === '') { + throw new DomainException('InstallId cannot be empty'); + } + // 允许非UUID格式,但需要有基本长度 + if (value.length < 8 || value.length > 128) { + throw new DomainException('InstallId length must be between 8 and 128 characters'); + } + return new InstallId(value.trim()); + } + + equals(other: InstallId): boolean { + return this._value === other._value; + } + + toString(): string { + return this._value; + } +} diff --git a/backend/services/presence-service/src/domain/value-objects/time-window.vo.ts b/backend/services/presence-service/src/domain/value-objects/time-window.vo.ts new file mode 100644 index 00000000..a913b53d --- /dev/null +++ b/backend/services/presence-service/src/domain/value-objects/time-window.vo.ts @@ -0,0 +1,32 @@ +export class TimeWindow { + static readonly DEFAULT_ONLINE_WINDOW_SECONDS = 180; // 3分钟 + static readonly DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 60; // 60秒 + + private readonly _windowSeconds: number; + + private constructor(windowSeconds: number) { + this._windowSeconds = windowSeconds; + } + + get windowSeconds(): number { + return this._windowSeconds; + } + + static default(): TimeWindow { + return new TimeWindow(TimeWindow.DEFAULT_ONLINE_WINDOW_SECONDS); + } + + static ofSeconds(seconds: number): TimeWindow { + if (seconds <= 0) { + throw new Error('TimeWindow must be positive'); + } + return new TimeWindow(seconds); + } + + /** + * 计算在线阈值时间戳 + */ + getThresholdTimestamp(now: Date = new Date()): number { + return Math.floor(now.getTime() / 1000) - this._windowSeconds; + } +} diff --git a/backend/services/presence-service/src/infrastructure/infrastructure.module.ts b/backend/services/presence-service/src/infrastructure/infrastructure.module.ts new file mode 100644 index 00000000..1676d79e --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/infrastructure.module.ts @@ -0,0 +1,46 @@ +import { Module } from '@nestjs/common'; +import { PrismaService } from './persistence/prisma/prisma.service'; +import { EventLogMapper } from './persistence/mappers/event-log.mapper'; +import { DailyActiveStatsMapper } from './persistence/mappers/daily-active-stats.mapper'; +import { OnlineSnapshotMapper } from './persistence/mappers/online-snapshot.mapper'; +import { EventLogRepositoryImpl } from './persistence/repositories/event-log.repository.impl'; +import { DailyActiveStatsRepositoryImpl } from './persistence/repositories/daily-active-stats.repository.impl'; +import { OnlineSnapshotRepositoryImpl } from './persistence/repositories/online-snapshot.repository.impl'; +import { RedisModule } from './redis/redis.module'; +import { KafkaModule } from './kafka/kafka.module'; +import { + EVENT_LOG_REPOSITORY, + DAILY_ACTIVE_STATS_REPOSITORY, + ONLINE_SNAPSHOT_REPOSITORY, +} from '../domain/repositories/event-log.repository.interface'; + +@Module({ + imports: [RedisModule, KafkaModule], + providers: [ + PrismaService, + EventLogMapper, + DailyActiveStatsMapper, + OnlineSnapshotMapper, + { + provide: EVENT_LOG_REPOSITORY, + useClass: EventLogRepositoryImpl, + }, + { + provide: DAILY_ACTIVE_STATS_REPOSITORY, + useClass: DailyActiveStatsRepositoryImpl, + }, + { + provide: ONLINE_SNAPSHOT_REPOSITORY, + useClass: OnlineSnapshotRepositoryImpl, + }, + ], + exports: [ + PrismaService, + EVENT_LOG_REPOSITORY, + DAILY_ACTIVE_STATS_REPOSITORY, + ONLINE_SNAPSHOT_REPOSITORY, + RedisModule, + KafkaModule, + ], +}) +export class InfrastructureModule {} diff --git a/backend/services/presence-service/src/infrastructure/kafka/event-publisher.service.ts b/backend/services/presence-service/src/infrastructure/kafka/event-publisher.service.ts new file mode 100644 index 00000000..31875331 --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/kafka/event-publisher.service.ts @@ -0,0 +1,68 @@ +import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Producer, logLevel } from 'kafkajs'; + +@Injectable() +export class EventPublisherService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(EventPublisherService.name); + private kafka: Kafka; + private producer: Producer; + private readonly topic: string; + private readonly enabled: boolean; + + constructor(private readonly configService: ConfigService) { + this.enabled = this.configService.get('KAFKA_ENABLED', 'false') === 'true'; + + if (this.enabled) { + this.kafka = new Kafka({ + clientId: 'presence-service', + brokers: this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','), + logLevel: logLevel.WARN, + }); + this.producer = this.kafka.producer(); + } + + this.topic = this.configService.get('KAFKA_TOPIC_ANALYTICS', 'analytics-events'); + } + + async onModuleInit(): Promise { + if (this.enabled) { + await this.producer.connect(); + this.logger.log('Kafka producer connected'); + } else { + this.logger.log('Kafka is disabled'); + } + } + + async onModuleDestroy(): Promise { + if (this.enabled) { + await this.producer.disconnect(); + this.logger.log('Kafka producer disconnected'); + } + } + + async publish(eventType: string, payload: unknown): Promise { + if (!this.enabled) { + this.logger.debug(`Kafka disabled, skipping event: ${eventType}`); + return; + } + + try { + await this.producer.send({ + topic: this.topic, + messages: [ + { + key: eventType, + value: JSON.stringify({ + eventType, + payload, + occurredAt: new Date().toISOString(), + }), + }, + ], + }); + } catch (error) { + this.logger.error(`Failed to publish event ${eventType}`, error); + } + } +} diff --git a/backend/services/presence-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/presence-service/src/infrastructure/kafka/kafka.module.ts new file mode 100644 index 00000000..98309286 --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/kafka/kafka.module.ts @@ -0,0 +1,8 @@ +import { Module } from '@nestjs/common'; +import { EventPublisherService } from './event-publisher.service'; + +@Module({ + providers: [EventPublisherService], + exports: [EventPublisherService], +}) +export class KafkaModule {} diff --git a/backend/services/presence-service/src/infrastructure/persistence/mappers/daily-active-stats.mapper.ts b/backend/services/presence-service/src/infrastructure/persistence/mappers/daily-active-stats.mapper.ts new file mode 100644 index 00000000..b8018258 --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/persistence/mappers/daily-active-stats.mapper.ts @@ -0,0 +1,35 @@ +import { Injectable } from '@nestjs/common'; +import { DailyActiveStats as PrismaDailyActiveStats } from '@prisma/client'; +import { DailyActiveStats } from '../../../domain/aggregates/daily-active-stats/daily-active-stats.aggregate'; + +@Injectable() +export class DailyActiveStatsMapper { + toDomain(prisma: PrismaDailyActiveStats): DailyActiveStats { + const dauByProvince = new Map( + Object.entries((prisma.dauByProvince as Record) ?? {}), + ); + const dauByCity = new Map( + Object.entries((prisma.dauByCity as Record) ?? {}), + ); + + return DailyActiveStats.reconstitute({ + day: prisma.day, + dauCount: prisma.dauCount, + dauByProvince, + dauByCity, + calculatedAt: prisma.calculatedAt, + version: prisma.version, + }); + } + + toPersistence(domain: DailyActiveStats): PrismaDailyActiveStats { + return { + day: domain.day, + dauCount: domain.dauCount, + dauByProvince: Object.fromEntries(domain.dauByProvince) as any, + dauByCity: Object.fromEntries(domain.dauByCity) as any, + calculatedAt: domain.calculatedAt, + version: domain.version, + }; + } +} diff --git a/backend/services/presence-service/src/infrastructure/persistence/mappers/event-log.mapper.ts b/backend/services/presence-service/src/infrastructure/persistence/mappers/event-log.mapper.ts new file mode 100644 index 00000000..5ee9d54e --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/persistence/mappers/event-log.mapper.ts @@ -0,0 +1,31 @@ +import { Injectable } from '@nestjs/common'; +import { EventLog as PrismaEventLog } from '@prisma/client'; +import { EventLog } from '../../../domain/entities/event-log.entity'; +import { InstallId } from '../../../domain/value-objects/install-id.vo'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { EventProperties, EventPropertiesData } from '../../../domain/value-objects/event-properties.vo'; + +@Injectable() +export class EventLogMapper { + toDomain(prisma: PrismaEventLog): EventLog { + return EventLog.reconstitute({ + id: prisma.id, + userId: prisma.userId, + installId: InstallId.fromString(prisma.installId), + eventName: EventName.fromString(prisma.eventName), + eventTime: prisma.eventTime, + properties: EventProperties.fromData((prisma.properties as EventPropertiesData) ?? {}), + createdAt: prisma.createdAt, + }); + } + + toPersistence(domain: EventLog): Omit { + return { + userId: domain.userId, + installId: domain.installId.value, + eventName: domain.eventName.value, + eventTime: domain.eventTime, + properties: domain.properties.toJSON() as any, + }; + } +} diff --git a/backend/services/presence-service/src/infrastructure/persistence/mappers/online-snapshot.mapper.ts b/backend/services/presence-service/src/infrastructure/persistence/mappers/online-snapshot.mapper.ts new file mode 100644 index 00000000..5eb70196 --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/persistence/mappers/online-snapshot.mapper.ts @@ -0,0 +1,23 @@ +import { Injectable } from '@nestjs/common'; +import { OnlineSnapshot as PrismaOnlineSnapshot } from '@prisma/client'; +import { OnlineSnapshot } from '../../../domain/entities/online-snapshot.entity'; + +@Injectable() +export class OnlineSnapshotMapper { + toDomain(prisma: PrismaOnlineSnapshot): OnlineSnapshot { + return OnlineSnapshot.reconstitute({ + id: prisma.id, + ts: prisma.ts, + onlineCount: prisma.onlineCount, + windowSeconds: prisma.windowSeconds, + }); + } + + toPersistence(domain: OnlineSnapshot): Omit { + return { + ts: domain.ts, + onlineCount: domain.onlineCount, + windowSeconds: domain.windowSeconds, + }; + } +} diff --git a/backend/services/presence-service/src/infrastructure/persistence/prisma/prisma.service.ts b/backend/services/presence-service/src/infrastructure/persistence/prisma/prisma.service.ts new file mode 100644 index 00000000..ad38ac25 --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/persistence/prisma/prisma.service.ts @@ -0,0 +1,28 @@ +import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; +import { PrismaClient } from '@prisma/client'; + +@Injectable() +export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(PrismaService.name); + + constructor() { + super({ + log: [ + { emit: 'event', level: 'query' }, + { emit: 'event', level: 'error' }, + { emit: 'event', level: 'info' }, + { emit: 'event', level: 'warn' }, + ], + }); + } + + async onModuleInit() { + await this.$connect(); + this.logger.log('Prisma connected'); + } + + async onModuleDestroy() { + await this.$disconnect(); + this.logger.log('Prisma disconnected'); + } +} diff --git a/backend/services/presence-service/src/infrastructure/persistence/repositories/daily-active-stats.repository.impl.ts b/backend/services/presence-service/src/infrastructure/persistence/repositories/daily-active-stats.repository.impl.ts new file mode 100644 index 00000000..bcb2877a --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/persistence/repositories/daily-active-stats.repository.impl.ts @@ -0,0 +1,44 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { + IDailyActiveStatsRepository, +} from '../../../domain/repositories/daily-active-stats.repository.interface'; +import { DailyActiveStats } from '../../../domain/aggregates/daily-active-stats/daily-active-stats.aggregate'; +import { DailyActiveStatsMapper } from '../mappers/daily-active-stats.mapper'; + +@Injectable() +export class DailyActiveStatsRepositoryImpl implements IDailyActiveStatsRepository { + constructor( + private readonly prisma: PrismaService, + private readonly mapper: DailyActiveStatsMapper, + ) {} + + async upsert(stats: DailyActiveStats): Promise { + const data = this.mapper.toPersistence(stats); + await this.prisma.dailyActiveStats.upsert({ + where: { day: stats.day }, + create: data, + update: data, + }); + } + + async findByDate(day: Date): Promise { + const record = await this.prisma.dailyActiveStats.findUnique({ + where: { day }, + }); + return record ? this.mapper.toDomain(record) : null; + } + + async findByDateRange(startDate: Date, endDate: Date): Promise { + const records = await this.prisma.dailyActiveStats.findMany({ + where: { + day: { + gte: startDate, + lte: endDate, + }, + }, + orderBy: { day: 'asc' }, + }); + return records.map((r) => this.mapper.toDomain(r)); + } +} diff --git a/backend/services/presence-service/src/infrastructure/persistence/repositories/event-log.repository.impl.ts b/backend/services/presence-service/src/infrastructure/persistence/repositories/event-log.repository.impl.ts new file mode 100644 index 00000000..4ce3e130 --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/persistence/repositories/event-log.repository.impl.ts @@ -0,0 +1,97 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { + IEventLogRepository, + DauQueryResult, +} from '../../../domain/repositories/event-log.repository.interface'; +import { EventLog } from '../../../domain/entities/event-log.entity'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { EventLogMapper } from '../mappers/event-log.mapper'; + +@Injectable() +export class EventLogRepositoryImpl implements IEventLogRepository { + constructor( + private readonly prisma: PrismaService, + private readonly mapper: EventLogMapper, + ) {} + + async batchInsert(logs: EventLog[]): Promise { + const data = logs.map((log) => this.mapper.toPersistence(log)); + await this.prisma.eventLog.createMany({ data }); + } + + async insert(log: EventLog): Promise { + const data = this.mapper.toPersistence(log); + const created = await this.prisma.eventLog.create({ data }); + return this.mapper.toDomain(created); + } + + async queryDau( + eventName: EventName, + startTime: Date, + endTime: Date, + ): Promise { + // 使用原生 SQL 进行去重统计 + const result = await this.prisma.$queryRaw< + { total: bigint; province: string | null; city: string | null; count: bigint }[] + >` + WITH base AS ( + SELECT + COALESCE(user_id::text, install_id) AS identifier, + properties->>'province' AS province, + properties->>'city' AS city + FROM analytics_event_log + WHERE event_name = ${eventName.value} + AND event_time >= ${startTime} + AND event_time < ${endTime} + ), + unique_users AS ( + SELECT DISTINCT identifier, province, city FROM base + ) + SELECT + COUNT(DISTINCT identifier) AS total, + province, + city, + COUNT(*) AS count + FROM unique_users + GROUP BY GROUPING SETS ((), (province), (city)) + `; + + let total = 0; + const byProvince = new Map(); + const byCity = new Map(); + + for (const row of result) { + if (row.province === null && row.city === null) { + total = Number(row.total); + } else if (row.province !== null && row.city === null) { + byProvince.set(row.province, Number(row.count)); + } else if (row.city !== null && row.province === null) { + byCity.set(row.city, Number(row.count)); + } + } + + return { total, byProvince, byCity }; + } + + async findByTimeRange( + eventName: EventName, + startTime: Date, + endTime: Date, + limit?: number, + ): Promise { + const records = await this.prisma.eventLog.findMany({ + where: { + eventName: eventName.value, + eventTime: { + gte: startTime, + lt: endTime, + }, + }, + orderBy: { eventTime: 'desc' }, + take: limit, + }); + + return records.map((r) => this.mapper.toDomain(r)); + } +} diff --git a/backend/services/presence-service/src/infrastructure/persistence/repositories/online-snapshot.repository.impl.ts b/backend/services/presence-service/src/infrastructure/persistence/repositories/online-snapshot.repository.impl.ts new file mode 100644 index 00000000..ee77e7c1 --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/persistence/repositories/online-snapshot.repository.impl.ts @@ -0,0 +1,47 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { + IOnlineSnapshotRepository, +} from '../../../domain/repositories/online-snapshot.repository.interface'; +import { OnlineSnapshot } from '../../../domain/entities/online-snapshot.entity'; +import { OnlineSnapshotMapper } from '../mappers/online-snapshot.mapper'; + +@Injectable() +export class OnlineSnapshotRepositoryImpl implements IOnlineSnapshotRepository { + constructor( + private readonly prisma: PrismaService, + private readonly mapper: OnlineSnapshotMapper, + ) {} + + async insert(snapshot: OnlineSnapshot): Promise { + const data = this.mapper.toPersistence(snapshot); + const created = await this.prisma.onlineSnapshot.create({ data }); + return this.mapper.toDomain(created); + } + + async findByTimeRange( + startTime: Date, + endTime: Date, + interval?: '1m' | '5m' | '1h', + ): Promise { + // 简化实现:不做聚合,返回所有记录 + // 生产环境可以使用 PostgreSQL 的 time_bucket 或按间隔采样 + const records = await this.prisma.onlineSnapshot.findMany({ + where: { + ts: { + gte: startTime, + lte: endTime, + }, + }, + orderBy: { ts: 'asc' }, + }); + return records.map((r) => this.mapper.toDomain(r)); + } + + async findLatest(): Promise { + const record = await this.prisma.onlineSnapshot.findFirst({ + orderBy: { ts: 'desc' }, + }); + return record ? this.mapper.toDomain(record) : null; + } +} diff --git a/backend/services/presence-service/src/infrastructure/redis/presence-redis.repository.ts b/backend/services/presence-service/src/infrastructure/redis/presence-redis.repository.ts new file mode 100644 index 00000000..d7e7478b --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/redis/presence-redis.repository.ts @@ -0,0 +1,74 @@ +import { Injectable } from '@nestjs/common'; +import { RedisService } from './redis.service'; + +@Injectable() +export class PresenceRedisRepository { + private readonly ONLINE_USERS_KEY = 'presence:online_users'; + + constructor(private readonly redisService: RedisService) {} + + /** + * 更新用户在线状态 + */ + async updateUserPresence(userId: string, timestamp: number): Promise { + await this.redisService.zadd(this.ONLINE_USERS_KEY, timestamp, userId); + } + + /** + * 统计在线用户数 + */ + async countOnlineUsers(thresholdTimestamp: number): Promise { + return this.redisService.zcount( + this.ONLINE_USERS_KEY, + thresholdTimestamp, + '+inf', + ); + } + + /** + * 获取在线用户列表 + */ + async getOnlineUsers(thresholdTimestamp: number, limit?: number): Promise { + const args: [string, number | string, number | string] = [ + this.ONLINE_USERS_KEY, + thresholdTimestamp, + '+inf', + ]; + + if (limit) { + return this.redisService.zrangebyscore( + this.ONLINE_USERS_KEY, + thresholdTimestamp, + '+inf', + 'LIMIT', + 0, + limit, + ); + } + + return this.redisService.zrangebyscore( + this.ONLINE_USERS_KEY, + thresholdTimestamp, + '+inf', + ); + } + + /** + * 移除过期用户 + */ + async removeExpiredUsers(thresholdTimestamp: number): Promise { + return this.redisService.zremrangebyscore( + this.ONLINE_USERS_KEY, + '-inf', + thresholdTimestamp, + ); + } + + /** + * 获取用户最后心跳时间 + */ + async getUserLastHeartbeat(userId: string): Promise { + const score = await this.redisService.zscore(this.ONLINE_USERS_KEY, userId); + return score ? Number(score) : null; + } +} diff --git a/backend/services/presence-service/src/infrastructure/redis/redis.module.ts b/backend/services/presence-service/src/infrastructure/redis/redis.module.ts new file mode 100644 index 00000000..1f276788 --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/redis/redis.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; +import { RedisService } from './redis.service'; +import { PresenceRedisRepository } from './presence-redis.repository'; + +@Module({ + providers: [RedisService, PresenceRedisRepository], + exports: [RedisService, PresenceRedisRepository], +}) +export class RedisModule {} diff --git a/backend/services/presence-service/src/infrastructure/redis/redis.service.ts b/backend/services/presence-service/src/infrastructure/redis/redis.service.ts new file mode 100644 index 00000000..f7604488 --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/redis/redis.service.ts @@ -0,0 +1,65 @@ +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; + +@Injectable() +export class RedisService implements OnModuleDestroy { + private readonly client: Redis; + + constructor(private readonly configService: ConfigService) { + this.client = new Redis({ + host: this.configService.get('REDIS_HOST', 'localhost'), + port: this.configService.get('REDIS_PORT', 6379), + password: this.configService.get('REDIS_PASSWORD'), + db: this.configService.get('REDIS_DB', 0), + }); + } + + async onModuleDestroy(): Promise { + await this.client.quit(); + } + + // ZSET 操作 + async zadd(key: string, score: number, member: string): Promise { + return this.client.zadd(key, score, member); + } + + async zcount(key: string, min: number | string, max: number | string): Promise { + return this.client.zcount(key, min, max); + } + + async zrangebyscore( + key: string, + min: number | string, + max: number | string, + ...args: (string | number)[] + ): Promise { + return this.client.zrangebyscore(key, min, max, ...args); + } + + async zremrangebyscore(key: string, min: number | string, max: number | string): Promise { + return this.client.zremrangebyscore(key, min, max); + } + + async zscore(key: string, member: string): Promise { + return this.client.zscore(key, member); + } + + // HyperLogLog 操作 + async pfadd(key: string, ...elements: string[]): Promise { + return this.client.pfadd(key, ...elements); + } + + async pfcount(...keys: string[]): Promise { + return this.client.pfcount(...keys); + } + + // 通用操作 + async expire(key: string, seconds: number): Promise { + return this.client.expire(key, seconds); + } + + async del(...keys: string[]): Promise { + return this.client.del(...keys); + } +} diff --git a/backend/services/presence-service/src/main.ts b/backend/services/presence-service/src/main.ts new file mode 100644 index 00000000..11be61b1 --- /dev/null +++ b/backend/services/presence-service/src/main.ts @@ -0,0 +1,41 @@ +import { NestFactory } from '@nestjs/core'; +import { ValidationPipe, Logger } from '@nestjs/common'; +import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'; +import { AppModule } from './app.module'; + +async function bootstrap() { + const logger = new Logger('Bootstrap'); + const app = await NestFactory.create(AppModule); + + // 全局管道 + app.useGlobalPipes( + new ValidationPipe({ + whitelist: true, + transform: true, + forbidNonWhitelisted: true, + }), + ); + + // API 前缀 + const apiPrefix = process.env.API_PREFIX || 'api/v1'; + app.setGlobalPrefix(apiPrefix); + + // Swagger 文档 + const config = new DocumentBuilder() + .setTitle('Presence & Analytics Service API') + .setDescription('用户活跃度与在线状态服务') + .setVersion('1.0') + .addBearerAuth() + .build(); + const document = SwaggerModule.createDocument(app, config); + SwaggerModule.setup(`${apiPrefix}/docs`, app, document); + + // 启动服务 + const port = parseInt(process.env.APP_PORT || '3001', 10); + await app.listen(port); + + logger.log(`Presence Service is running on: http://localhost:${port}/${apiPrefix}`); + logger.log(`Swagger docs: http://localhost:${port}/${apiPrefix}/docs`); +} + +bootstrap(); diff --git a/backend/services/presence-service/src/shared/decorators/current-user.decorator.ts b/backend/services/presence-service/src/shared/decorators/current-user.decorator.ts new file mode 100644 index 00000000..758616d6 --- /dev/null +++ b/backend/services/presence-service/src/shared/decorators/current-user.decorator.ts @@ -0,0 +1,10 @@ +import { createParamDecorator, ExecutionContext } from '@nestjs/common'; + +export const CurrentUser = createParamDecorator( + (data: string | undefined, ctx: ExecutionContext) => { + const request = ctx.switchToHttp().getRequest(); + const user = request.user; + + return data ? user?.[data] : user; + }, +); diff --git a/backend/services/presence-service/src/shared/decorators/public.decorator.ts b/backend/services/presence-service/src/shared/decorators/public.decorator.ts new file mode 100644 index 00000000..b3845e12 --- /dev/null +++ b/backend/services/presence-service/src/shared/decorators/public.decorator.ts @@ -0,0 +1,4 @@ +import { SetMetadata } from '@nestjs/common'; + +export const IS_PUBLIC_KEY = 'isPublic'; +export const Public = () => SetMetadata(IS_PUBLIC_KEY, true); diff --git a/backend/services/presence-service/src/shared/exceptions/application.exception.ts b/backend/services/presence-service/src/shared/exceptions/application.exception.ts new file mode 100644 index 00000000..7f74d646 --- /dev/null +++ b/backend/services/presence-service/src/shared/exceptions/application.exception.ts @@ -0,0 +1,6 @@ +export class ApplicationException extends Error { + constructor(message: string) { + super(message); + this.name = 'ApplicationException'; + } +} diff --git a/backend/services/presence-service/src/shared/exceptions/domain.exception.ts b/backend/services/presence-service/src/shared/exceptions/domain.exception.ts new file mode 100644 index 00000000..76ead53c --- /dev/null +++ b/backend/services/presence-service/src/shared/exceptions/domain.exception.ts @@ -0,0 +1,6 @@ +export class DomainException extends Error { + constructor(message: string) { + super(message); + this.name = 'DomainException'; + } +} diff --git a/backend/services/presence-service/src/shared/guards/jwt-auth.guard.ts b/backend/services/presence-service/src/shared/guards/jwt-auth.guard.ts new file mode 100644 index 00000000..296f1121 --- /dev/null +++ b/backend/services/presence-service/src/shared/guards/jwt-auth.guard.ts @@ -0,0 +1,28 @@ +import { Injectable, CanActivate, ExecutionContext, UnauthorizedException } from '@nestjs/common'; +import { Reflector } from '@nestjs/core'; +import { IS_PUBLIC_KEY } from '../decorators/public.decorator'; + +@Injectable() +export class JwtAuthGuard implements CanActivate { + constructor(private reflector: Reflector) {} + + canActivate(context: ExecutionContext): boolean { + const isPublic = this.reflector.getAllAndOverride(IS_PUBLIC_KEY, [ + context.getHandler(), + context.getClass(), + ]); + + if (isPublic) { + return true; + } + + const request = context.switchToHttp().getRequest(); + const user = request.user; + + if (!user || !user.userId) { + throw new UnauthorizedException('未授权访问'); + } + + return true; + } +} diff --git a/backend/services/presence-service/src/shared/utils/timezone.util.ts b/backend/services/presence-service/src/shared/utils/timezone.util.ts new file mode 100644 index 00000000..e605fc0b --- /dev/null +++ b/backend/services/presence-service/src/shared/utils/timezone.util.ts @@ -0,0 +1,30 @@ +import { format, startOfDay, endOfDay } from 'date-fns'; +import { toZonedTime, fromZonedTime } from 'date-fns-tz'; + +const DEFAULT_TIMEZONE = 'Asia/Shanghai'; + +/** + * 获取指定时区的一天开始时间 + */ +export function startOfDayInTimezone(date: Date, timezone: string = DEFAULT_TIMEZONE): Date { + const zonedDate = toZonedTime(date, timezone); + const start = startOfDay(zonedDate); + return fromZonedTime(start, timezone); +} + +/** + * 获取指定时区的一天结束时间 + */ +export function endOfDayInTimezone(date: Date, timezone: string = DEFAULT_TIMEZONE): Date { + const zonedDate = toZonedTime(date, timezone); + const end = endOfDay(zonedDate); + return fromZonedTime(end, timezone); +} + +/** + * 格式化为日期Key (YYYY-MM-DD) + */ +export function formatToDateKey(date: Date, timezone: string = DEFAULT_TIMEZONE): string { + const zonedDate = toZonedTime(date, timezone); + return format(zonedDate, 'yyyy-MM-dd'); +}