From 996bc231f29d7c93d6b4fe6a3907328dc5e59a5a Mon Sep 17 00:00:00 2001 From: hailin Date: Wed, 3 Dec 2025 03:51:47 +0800 Subject: [PATCH] . --- .../analytics-presence-service-dev-guide.md | 2218 +++++++++++++++++ 1 file changed, 2218 insertions(+) create mode 100644 backend/services/presence-service/analytics-presence-service-dev-guide.md diff --git a/backend/services/presence-service/analytics-presence-service-dev-guide.md b/backend/services/presence-service/analytics-presence-service-dev-guide.md new file mode 100644 index 00000000..61222188 --- /dev/null +++ b/backend/services/presence-service/analytics-presence-service-dev-guide.md @@ -0,0 +1,2218 @@ +# Analytics & Presence Service + +RWA 用户活跃度与在线状态上下文微服务 - 基于 DDD 架构的 NestJS 实现 + +## 技术栈 + +- **框架**: NestJS + TypeScript +- **ORM**: Prisma +- **消息队列**: Kafka +- **缓存**: Redis (ioredis) +- **定时任务**: @nestjs/schedule + +## 项目结构 + +``` +analytics-presence-service/ +├── src/ +│ ├── api/ # 表现层 +│ │ ├── controllers/ +│ │ │ ├── analytics.controller.ts +│ │ │ └── presence.controller.ts +│ │ ├── dto/ +│ │ │ ├── request/ +│ │ │ │ ├── batch-events.dto.ts +│ │ │ │ ├── heartbeat.dto.ts +│ │ │ │ └── query-dau.dto.ts +│ │ │ └── response/ +│ │ │ ├── dau-stats.dto.ts +│ │ │ ├── online-count.dto.ts +│ │ │ └── online-history.dto.ts +│ │ ├── validators/ +│ │ │ └── event.validator.ts +│ │ └── api.module.ts +│ │ +│ ├── application/ # 应用层 +│ │ ├── commands/ +│ │ │ ├── record-events/ +│ │ │ │ ├── record-events.command.ts +│ │ │ │ └── record-events.handler.ts +│ │ │ ├── record-heartbeat/ +│ │ │ │ ├── record-heartbeat.command.ts +│ │ │ │ └── record-heartbeat.handler.ts +│ │ │ └── calculate-dau/ +│ │ │ ├── calculate-dau.command.ts +│ │ │ └── calculate-dau.handler.ts +│ │ ├── queries/ +│ │ │ ├── get-dau-stats/ +│ │ │ │ ├── get-dau-stats.query.ts +│ │ │ │ └── get-dau-stats.handler.ts +│ │ │ ├── get-online-count/ +│ │ │ │ ├── get-online-count.query.ts +│ │ │ │ └── get-online-count.handler.ts +│ │ │ └── get-online-history/ +│ │ │ ├── get-online-history.query.ts +│ │ │ └── get-online-history.handler.ts +│ │ ├── services/ +│ │ │ └── analytics-application.service.ts +│ │ ├── schedulers/ +│ │ │ └── analytics.scheduler.ts +│ │ └── application.module.ts +│ │ +│ ├── domain/ # 领域层 +│ │ ├── aggregates/ +│ │ │ └── daily-active-stats/ +│ │ │ ├── daily-active-stats.aggregate.ts +│ │ │ ├── daily-active-stats.factory.ts +│ │ │ └── daily-active-stats.spec.ts +│ │ ├── entities/ +│ │ │ ├── event-log.entity.ts +│ │ │ └── online-snapshot.entity.ts +│ │ ├── value-objects/ +│ │ │ ├── install-id.vo.ts +│ │ │ ├── event-name.vo.ts +│ │ │ ├── event-properties.vo.ts +│ │ │ ├── device-info.vo.ts +│ │ │ └── time-window.vo.ts +│ │ ├── events/ +│ │ │ ├── session-started.event.ts +│ │ │ ├── heartbeat-received.event.ts +│ │ │ └── dau-calculated.event.ts +│ │ ├── repositories/ +│ │ │ ├── event-log.repository.interface.ts +│ │ │ ├── daily-active-stats.repository.interface.ts +│ │ │ └── online-snapshot.repository.interface.ts +│ │ ├── services/ +│ │ │ ├── dau-calculation.service.ts +│ │ │ └── online-detection.service.ts +│ │ └── domain.module.ts +│ │ +│ ├── infrastructure/ # 基础设施层 +│ │ ├── persistence/ +│ │ │ ├── prisma/ +│ │ │ │ ├── schema.prisma +│ │ │ │ └── prisma.service.ts +│ │ │ ├── entities/ +│ │ │ │ ├── event-log.entity.ts +│ │ │ │ ├── daily-active-stats.entity.ts +│ │ │ │ └── online-snapshot.entity.ts +│ │ │ ├── mappers/ +│ │ │ │ ├── event-log.mapper.ts +│ │ │ │ ├── daily-active-stats.mapper.ts +│ │ │ │ └── online-snapshot.mapper.ts +│ │ │ └── repositories/ +│ │ │ ├── event-log.repository.impl.ts +│ │ │ ├── daily-active-stats.repository.impl.ts +│ │ │ └── online-snapshot.repository.impl.ts +│ │ ├── redis/ +│ │ │ ├── redis.module.ts +│ │ │ ├── redis.service.ts +│ │ │ └── presence-redis.repository.ts +│ │ ├── kafka/ +│ │ │ ├── kafka.module.ts +│ │ │ ├── event-publisher.service.ts +│ │ │ └── event-consumer.service.ts +│ │ └── infrastructure.module.ts +│ │ +│ ├── shared/ # 共享层 +│ │ ├── decorators/ +│ │ │ ├── current-user.decorator.ts +│ │ │ └── public.decorator.ts +│ │ ├── guards/ +│ │ │ ├── jwt-auth.guard.ts +│ │ │ └── optional-auth.guard.ts +│ │ ├── filters/ +│ │ │ └── domain-exception.filter.ts +│ │ ├── interceptors/ +│ │ │ └── transform.interceptor.ts +│ │ ├── exceptions/ +│ │ │ ├── domain.exception.ts +│ │ │ └── application.exception.ts +│ │ └── utils/ +│ │ └── timezone.util.ts +│ │ +│ ├── config/ +│ │ ├── app.config.ts +│ │ ├── database.config.ts +│ │ ├── redis.config.ts +│ │ ├── jwt.config.ts +│ │ └── kafka.config.ts +│ │ +│ ├── app.module.ts +│ └── main.ts +│ +├── test/ +│ ├── unit/ +│ ├── integration/ +│ └── e2e/ +│ +├── database/ +│ └── migrations/ +│ +├── prisma/ +│ └── schema.prisma +│ +├── .env.example +├── .env.development +├── .env.production +├── Dockerfile +├── docker-compose.yml +├── package.json +├── tsconfig.json +└── README.md +``` + +## 核心功能 + +- ✅ 用户行为事件批量上报与存储 +- ✅ 日活 DAU 统计(按自然日去重) +- ✅ 实时在线人数统计(3分钟窗口) +- ✅ 心跳机制(前台60秒间隔) +- ✅ 按省/市维度的活跃用户分析 +- ✅ 在线人数历史趋势查询 +- ✅ 与 Identity Context 的用户标识集成 +- ✅ Kafka 事件发布(供其他服务订阅) + +--- + +## 一、领域层设计 + +### 1.1 聚合根:DailyActiveStats + +```typescript +// src/domain/aggregates/daily-active-stats/daily-active-stats.aggregate.ts + +import { AggregateRoot } from '@nestjs/cqrs'; +import { DauCalculatedEvent } from '../../events/dau-calculated.event'; + +export class DailyActiveStats extends AggregateRoot { + private _day: Date; + private _dauCount: number; + private _dauByProvince: Map; + private _dauByCity: Map; + private _calculatedAt: Date; + private _version: number; + + private constructor() { + super(); + } + + // Getters + get day(): Date { + return this._day; + } + + get dauCount(): number { + return this._dauCount; + } + + get dauByProvince(): Map { + return new Map(this._dauByProvince); + } + + get dauByCity(): Map { + return new Map(this._dauByCity); + } + + get calculatedAt(): Date { + return this._calculatedAt; + } + + get version(): number { + return this._version; + } + + // 工厂方法 + static create(props: { + day: Date; + dauCount: number; + dauByProvince?: Map; + dauByCity?: Map; + }): DailyActiveStats { + const stats = new DailyActiveStats(); + stats._day = props.day; + stats._dauCount = props.dauCount; + stats._dauByProvince = props.dauByProvince ?? new Map(); + stats._dauByCity = props.dauByCity ?? new Map(); + stats._calculatedAt = new Date(); + stats._version = 1; + + stats.apply(new DauCalculatedEvent(stats._day, stats._dauCount)); + return stats; + } + + // 重新计算 + recalculate(newDauCount: number, byProvince?: Map, byCity?: Map): void { + this._dauCount = newDauCount; + if (byProvince) this._dauByProvince = byProvince; + if (byCity) this._dauByCity = byCity; + this._calculatedAt = new Date(); + this._version++; + + this.apply(new DauCalculatedEvent(this._day, this._dauCount)); + } + + // 从持久化恢复 + static reconstitute(props: { + day: Date; + dauCount: number; + dauByProvince: Map; + dauByCity: Map; + calculatedAt: Date; + version: number; + }): DailyActiveStats { + const stats = new DailyActiveStats(); + stats._day = props.day; + stats._dauCount = props.dauCount; + stats._dauByProvince = props.dauByProvince; + stats._dauByCity = props.dauByCity; + stats._calculatedAt = props.calculatedAt; + stats._version = props.version; + return stats; + } +} +``` + +### 1.2 实体:EventLog + +```typescript +// src/domain/entities/event-log.entity.ts + +import { InstallId } from '../value-objects/install-id.vo'; +import { EventName } from '../value-objects/event-name.vo'; +import { EventProperties } from '../value-objects/event-properties.vo'; + +export class EventLog { + private _id: bigint | null; + private _userId: bigint | null; + private _installId: InstallId; + private _eventName: EventName; + private _eventTime: Date; + private _properties: EventProperties; + private _createdAt: Date; + + private constructor() {} + + // Getters + get id(): bigint | null { + return this._id; + } + + get userId(): bigint | null { + return this._userId; + } + + get installId(): InstallId { + return this._installId; + } + + get eventName(): EventName { + return this._eventName; + } + + get eventTime(): Date { + return this._eventTime; + } + + get properties(): EventProperties { + return this._properties; + } + + get createdAt(): Date { + return this._createdAt; + } + + /** + * 获取用于DAU去重的唯一标识 + * 优先使用 userId,否则使用 installId + */ + get dauIdentifier(): string { + return this._userId?.toString() ?? this._installId.value; + } + + // 工厂方法 + static create(props: { + userId?: bigint | null; + installId: InstallId; + eventName: EventName; + eventTime: Date; + properties?: EventProperties; + }): EventLog { + const log = new EventLog(); + log._id = null; + log._userId = props.userId ?? null; + log._installId = props.installId; + log._eventName = props.eventName; + log._eventTime = props.eventTime; + log._properties = props.properties ?? EventProperties.empty(); + log._createdAt = new Date(); + return log; + } + + // 从持久化恢复 + static reconstitute(props: { + id: bigint; + userId: bigint | null; + installId: InstallId; + eventName: EventName; + eventTime: Date; + properties: EventProperties; + createdAt: Date; + }): EventLog { + const log = new EventLog(); + log._id = props.id; + log._userId = props.userId; + log._installId = props.installId; + log._eventName = props.eventName; + log._eventTime = props.eventTime; + log._properties = props.properties; + log._createdAt = props.createdAt; + return log; + } +} +``` + +### 1.3 实体:OnlineSnapshot + +```typescript +// src/domain/entities/online-snapshot.entity.ts + +import { TimeWindow } from '../value-objects/time-window.vo'; + +export class OnlineSnapshot { + private _id: bigint | null; + private _ts: Date; + private _onlineCount: number; + private _windowSeconds: number; + + private constructor() {} + + get id(): bigint | null { + return this._id; + } + + get ts(): Date { + return this._ts; + } + + get onlineCount(): number { + return this._onlineCount; + } + + get windowSeconds(): number { + return this._windowSeconds; + } + + static create(props: { + ts: Date; + onlineCount: number; + windowSeconds?: number; + }): OnlineSnapshot { + const snapshot = new OnlineSnapshot(); + snapshot._id = null; + snapshot._ts = props.ts; + snapshot._onlineCount = props.onlineCount; + snapshot._windowSeconds = props.windowSeconds ?? TimeWindow.DEFAULT_ONLINE_WINDOW_SECONDS; + return snapshot; + } + + static reconstitute(props: { + id: bigint; + ts: Date; + onlineCount: number; + windowSeconds: number; + }): OnlineSnapshot { + const snapshot = new OnlineSnapshot(); + snapshot._id = props.id; + snapshot._ts = props.ts; + snapshot._onlineCount = props.onlineCount; + snapshot._windowSeconds = props.windowSeconds; + return snapshot; + } +} +``` + +### 1.4 值对象 + +```typescript +// src/domain/value-objects/install-id.vo.ts + +import { v4 as uuidv4, validate as uuidValidate } from 'uuid'; +import { DomainException } from '../../shared/exceptions/domain.exception'; + +export class InstallId { + private readonly _value: string; + + private constructor(value: string) { + this._value = value; + } + + get value(): string { + return this._value; + } + + static generate(): InstallId { + return new InstallId(uuidv4()); + } + + static fromString(value: string): InstallId { + if (!value || value.trim() === '') { + throw new DomainException('InstallId cannot be empty'); + } + // 允许非UUID格式,但需要有基本长度 + if (value.length < 8 || value.length > 128) { + throw new DomainException('InstallId length must be between 8 and 128 characters'); + } + return new InstallId(value.trim()); + } + + equals(other: InstallId): boolean { + return this._value === other._value; + } + + toString(): string { + return this._value; + } +} +``` + +```typescript +// src/domain/value-objects/event-name.vo.ts + +import { DomainException } from '../../shared/exceptions/domain.exception'; + +export class EventName { + // 预定义的事件名 + static readonly APP_SESSION_START = new EventName('app_session_start'); + static readonly PRESENCE_HEARTBEAT = new EventName('presence_heartbeat'); + static readonly APP_SESSION_END = new EventName('app_session_end'); + + private readonly _value: string; + + private constructor(value: string) { + this._value = value; + } + + get value(): string { + return this._value; + } + + static fromString(value: string): EventName { + if (!value || value.trim() === '') { + throw new DomainException('EventName cannot be empty'); + } + const trimmed = value.trim().toLowerCase(); + if (trimmed.length > 64) { + throw new DomainException('EventName cannot exceed 64 characters'); + } + // 验证格式:字母、数字、下划线 + if (!/^[a-z][a-z0-9_]*$/.test(trimmed)) { + throw new DomainException('EventName must start with letter and contain only lowercase letters, numbers, and underscores'); + } + return new EventName(trimmed); + } + + /** + * 是否为DAU统计事件 + */ + isDauEvent(): boolean { + return this._value === EventName.APP_SESSION_START.value; + } + + equals(other: EventName): boolean { + return this._value === other._value; + } + + toString(): string { + return this._value; + } +} +``` + +```typescript +// src/domain/value-objects/event-properties.vo.ts + +export interface EventPropertiesData { + os?: string; + osVersion?: string; + appVersion?: string; + networkType?: string; + clientTs?: number; + province?: string; + city?: string; + [key: string]: unknown; +} + +export class EventProperties { + private readonly _data: EventPropertiesData; + + private constructor(data: EventPropertiesData) { + this._data = { ...data }; + } + + get data(): EventPropertiesData { + return { ...this._data }; + } + + get os(): string | undefined { + return this._data.os; + } + + get osVersion(): string | undefined { + return this._data.osVersion; + } + + get appVersion(): string | undefined { + return this._data.appVersion; + } + + get networkType(): string | undefined { + return this._data.networkType; + } + + get clientTs(): number | undefined { + return this._data.clientTs; + } + + get province(): string | undefined { + return this._data.province; + } + + get city(): string | undefined { + return this._data.city; + } + + static empty(): EventProperties { + return new EventProperties({}); + } + + static fromData(data: EventPropertiesData): EventProperties { + return new EventProperties(data); + } + + toJSON(): EventPropertiesData { + return this._data; + } +} +``` + +```typescript +// src/domain/value-objects/device-info.vo.ts + +export class DeviceInfo { + private readonly _os: 'Android' | 'iOS'; + private readonly _osVersion: string; + private readonly _deviceModel?: string; + private readonly _screenResolution?: string; + + private constructor(props: { + os: 'Android' | 'iOS'; + osVersion: string; + deviceModel?: string; + screenResolution?: string; + }) { + this._os = props.os; + this._osVersion = props.osVersion; + this._deviceModel = props.deviceModel; + this._screenResolution = props.screenResolution; + } + + get os(): 'Android' | 'iOS' { + return this._os; + } + + get osVersion(): string { + return this._osVersion; + } + + get deviceModel(): string | undefined { + return this._deviceModel; + } + + get screenResolution(): string | undefined { + return this._screenResolution; + } + + static create(props: { + os: string; + osVersion: string; + deviceModel?: string; + screenResolution?: string; + }): DeviceInfo { + const normalizedOs = props.os.toLowerCase() === 'ios' ? 'iOS' : 'Android'; + return new DeviceInfo({ + os: normalizedOs, + osVersion: props.osVersion, + deviceModel: props.deviceModel, + screenResolution: props.screenResolution, + }); + } +} +``` + +```typescript +// src/domain/value-objects/time-window.vo.ts + +export class TimeWindow { + static readonly DEFAULT_ONLINE_WINDOW_SECONDS = 180; // 3分钟 + static readonly DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 60; // 60秒 + + private readonly _windowSeconds: number; + + private constructor(windowSeconds: number) { + this._windowSeconds = windowSeconds; + } + + get windowSeconds(): number { + return this._windowSeconds; + } + + static default(): TimeWindow { + return new TimeWindow(TimeWindow.DEFAULT_ONLINE_WINDOW_SECONDS); + } + + static ofSeconds(seconds: number): TimeWindow { + if (seconds <= 0) { + throw new Error('TimeWindow must be positive'); + } + return new TimeWindow(seconds); + } + + /** + * 计算在线阈值时间戳 + */ + getThresholdTimestamp(now: Date = new Date()): number { + return Math.floor(now.getTime() / 1000) - this._windowSeconds; + } +} +``` + +### 1.5 领域事件 + +```typescript +// src/domain/events/session-started.event.ts + +export class SessionStartedEvent { + static readonly EVENT_NAME = 'analytics.session.started'; + + constructor( + public readonly userId: bigint | null, + public readonly installId: string, + public readonly occurredAt: Date, + public readonly metadata: { + appVersion?: string; + os?: string; + osVersion?: string; + }, + ) {} +} +``` + +```typescript +// src/domain/events/heartbeat-received.event.ts + +export class HeartbeatReceivedEvent { + static readonly EVENT_NAME = 'presence.heartbeat.received'; + + constructor( + public readonly userId: bigint, + public readonly installId: string, + public readonly occurredAt: Date, + ) {} +} +``` + +```typescript +// src/domain/events/dau-calculated.event.ts + +export class DauCalculatedEvent { + static readonly EVENT_NAME = 'analytics.dau.calculated'; + + constructor( + public readonly day: Date, + public readonly dauCount: number, + ) {} +} +``` + +### 1.6 仓储接口 + +```typescript +// src/domain/repositories/event-log.repository.interface.ts + +import { EventLog } from '../entities/event-log.entity'; +import { EventName } from '../value-objects/event-name.vo'; + +export interface DauQueryResult { + total: number; + byProvince: Map; + byCity: Map; +} + +export interface IEventLogRepository { + /** + * 批量插入事件日志 + */ + batchInsert(logs: EventLog[]): Promise; + + /** + * 插入单条事件日志 + */ + insert(log: EventLog): Promise; + + /** + * 查询DAU(去重用户数) + */ + queryDau( + eventName: EventName, + startTime: Date, + endTime: Date, + ): Promise; + + /** + * 按时间范围查询事件 + */ + findByTimeRange( + eventName: EventName, + startTime: Date, + endTime: Date, + limit?: number, + ): Promise; +} + +export const EVENT_LOG_REPOSITORY = Symbol('IEventLogRepository'); +``` + +```typescript +// src/domain/repositories/daily-active-stats.repository.interface.ts + +import { DailyActiveStats } from '../aggregates/daily-active-stats/daily-active-stats.aggregate'; + +export interface IDailyActiveStatsRepository { + /** + * 保存或更新日活统计 + */ + upsert(stats: DailyActiveStats): Promise; + + /** + * 按日期查询 + */ + findByDate(day: Date): Promise; + + /** + * 按日期范围查询 + */ + findByDateRange(startDate: Date, endDate: Date): Promise; +} + +export const DAILY_ACTIVE_STATS_REPOSITORY = Symbol('IDailyActiveStatsRepository'); +``` + +```typescript +// src/domain/repositories/online-snapshot.repository.interface.ts + +import { OnlineSnapshot } from '../entities/online-snapshot.entity'; + +export interface IOnlineSnapshotRepository { + /** + * 插入快照 + */ + insert(snapshot: OnlineSnapshot): Promise; + + /** + * 按时间范围查询 + */ + findByTimeRange( + startTime: Date, + endTime: Date, + interval?: '1m' | '5m' | '1h', + ): Promise; + + /** + * 获取最新快照 + */ + findLatest(): Promise; +} + +export const ONLINE_SNAPSHOT_REPOSITORY = Symbol('IOnlineSnapshotRepository'); +``` + +### 1.7 领域服务 + +```typescript +// src/domain/services/dau-calculation.service.ts + +import { Injectable } from '@nestjs/common'; +import { DailyActiveStats } from '../aggregates/daily-active-stats/daily-active-stats.aggregate'; +import { DauQueryResult } from '../repositories/event-log.repository.interface'; + +@Injectable() +export class DauCalculationService { + /** + * 从查询结果创建日活统计聚合 + */ + createStatsFromQueryResult(day: Date, result: DauQueryResult): DailyActiveStats { + return DailyActiveStats.create({ + day, + dauCount: result.total, + dauByProvince: result.byProvince, + dauByCity: result.byCity, + }); + } + + /** + * 合并多个查询结果(用于增量计算) + */ + mergeQueryResults(results: DauQueryResult[]): DauQueryResult { + const allIdentifiers = new Set(); + const byProvince = new Map>(); + const byCity = new Map>(); + + // 注意:这里简化处理,实际需要原始数据才能正确去重 + // 生产环境应该在数据库层面完成去重 + let total = 0; + const provinceCount = new Map(); + const cityCount = new Map(); + + for (const result of results) { + total = Math.max(total, result.total); + for (const [province, count] of result.byProvince) { + provinceCount.set(province, Math.max(provinceCount.get(province) ?? 0, count)); + } + for (const [city, count] of result.byCity) { + cityCount.set(city, Math.max(cityCount.get(city) ?? 0, count)); + } + } + + return { + total, + byProvince: provinceCount, + byCity: cityCount, + }; + } +} +``` + +```typescript +// src/domain/services/online-detection.service.ts + +import { Injectable } from '@nestjs/common'; +import { TimeWindow } from '../value-objects/time-window.vo'; + +@Injectable() +export class OnlineDetectionService { + private readonly timeWindow: TimeWindow; + + constructor() { + this.timeWindow = TimeWindow.default(); + } + + /** + * 判断用户是否在线 + */ + isOnline(lastHeartbeatTs: number, now: Date = new Date()): boolean { + const threshold = this.timeWindow.getThresholdTimestamp(now); + return lastHeartbeatTs > threshold; + } + + /** + * 获取在线判定阈值时间戳 + */ + getOnlineThreshold(now: Date = new Date()): number { + return this.timeWindow.getThresholdTimestamp(now); + } + + /** + * 获取窗口秒数 + */ + getWindowSeconds(): number { + return this.timeWindow.windowSeconds; + } +} +``` + +--- + +## 二、应用层设计 + +### 2.1 命令:RecordEvents + +```typescript +// src/application/commands/record-events/record-events.command.ts + +export interface EventItemDto { + eventName: string; + userId?: string; + installId: string; + clientTs: number; + properties?: Record; +} + +export class RecordEventsCommand { + constructor(public readonly events: EventItemDto[]) {} +} +``` + +```typescript +// src/application/commands/record-events/record-events.handler.ts + +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Inject, Injectable } from '@nestjs/common'; +import { RecordEventsCommand, EventItemDto } from './record-events.command'; +import { EventLog } from '../../../domain/entities/event-log.entity'; +import { InstallId } from '../../../domain/value-objects/install-id.vo'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { EventProperties } from '../../../domain/value-objects/event-properties.vo'; +import { + IEventLogRepository, + EVENT_LOG_REPOSITORY, +} from '../../../domain/repositories/event-log.repository.interface'; +import { RedisService } from '../../../infrastructure/redis/redis.service'; +import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service'; +import { SessionStartedEvent } from '../../../domain/events/session-started.event'; +import { formatToDateKey } from '../../../shared/utils/timezone.util'; + +export interface RecordEventsResult { + accepted: number; + failed: number; + errors?: string[]; +} + +@Injectable() +@CommandHandler(RecordEventsCommand) +export class RecordEventsHandler implements ICommandHandler { + constructor( + @Inject(EVENT_LOG_REPOSITORY) + private readonly eventLogRepository: IEventLogRepository, + private readonly redisService: RedisService, + private readonly eventPublisher: EventPublisherService, + ) {} + + async execute(command: RecordEventsCommand): Promise { + const { events } = command; + const errors: string[] = []; + const validLogs: EventLog[] = []; + + // 1. 验证并转换事件 + for (let i = 0; i < events.length; i++) { + try { + const log = this.toEventLog(events[i]); + validLogs.push(log); + } catch (e) { + errors.push(`Event[${i}]: ${e.message}`); + } + } + + if (validLogs.length === 0) { + return { accepted: 0, failed: events.length, errors }; + } + + // 2. 批量写入数据库 + await this.eventLogRepository.batchInsert(validLogs); + + // 3. 更新实时DAU (HyperLogLog) + const todayKey = formatToDateKey(new Date()); + for (const log of validLogs) { + if (log.eventName.isDauEvent()) { + await this.redisService.pfadd( + `analytics:dau:${todayKey}`, + log.dauIdentifier, + ); + + // 4. 发布领域事件 + await this.eventPublisher.publish( + SessionStartedEvent.EVENT_NAME, + new SessionStartedEvent( + log.userId, + log.installId.value, + log.eventTime, + { + appVersion: log.properties.appVersion, + os: log.properties.os, + osVersion: log.properties.osVersion, + }, + ), + ); + } + } + + return { + accepted: validLogs.length, + failed: events.length - validLogs.length, + errors: errors.length > 0 ? errors : undefined, + }; + } + + private toEventLog(dto: EventItemDto): EventLog { + return EventLog.create({ + userId: dto.userId ? BigInt(dto.userId) : null, + installId: InstallId.fromString(dto.installId), + eventName: EventName.fromString(dto.eventName), + eventTime: new Date(dto.clientTs * 1000), + properties: EventProperties.fromData(dto.properties ?? {}), + }); + } +} +``` + +### 2.2 命令:RecordHeartbeat + +```typescript +// src/application/commands/record-heartbeat/record-heartbeat.command.ts + +export class RecordHeartbeatCommand { + constructor( + public readonly userId: bigint, + public readonly installId: string, + public readonly appVersion: string, + public readonly clientTs: number, + ) {} +} +``` + +```typescript +// src/application/commands/record-heartbeat/record-heartbeat.handler.ts + +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Injectable } from '@nestjs/common'; +import { RecordHeartbeatCommand } from './record-heartbeat.command'; +import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository'; +import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service'; +import { HeartbeatReceivedEvent } from '../../../domain/events/heartbeat-received.event'; + +export interface RecordHeartbeatResult { + ok: boolean; + serverTs: number; +} + +@Injectable() +@CommandHandler(RecordHeartbeatCommand) +export class RecordHeartbeatHandler implements ICommandHandler { + constructor( + private readonly presenceRedisRepository: PresenceRedisRepository, + private readonly eventPublisher: EventPublisherService, + ) {} + + async execute(command: RecordHeartbeatCommand): Promise { + const { userId, installId, appVersion, clientTs } = command; + const now = Math.floor(Date.now() / 1000); + + // 1. 更新Redis在线状态 + await this.presenceRedisRepository.updateUserPresence(userId.toString(), now); + + // 2. 发布领域事件 + await this.eventPublisher.publish( + HeartbeatReceivedEvent.EVENT_NAME, + new HeartbeatReceivedEvent(userId, installId, new Date()), + ); + + return { ok: true, serverTs: now }; + } +} +``` + +### 2.3 命令:CalculateDau + +```typescript +// src/application/commands/calculate-dau/calculate-dau.command.ts + +export class CalculateDauCommand { + constructor(public readonly date: Date) {} +} +``` + +```typescript +// src/application/commands/calculate-dau/calculate-dau.handler.ts + +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { CalculateDauCommand } from './calculate-dau.command'; +import { + IEventLogRepository, + EVENT_LOG_REPOSITORY, +} from '../../../domain/repositories/event-log.repository.interface'; +import { + IDailyActiveStatsRepository, + DAILY_ACTIVE_STATS_REPOSITORY, +} from '../../../domain/repositories/daily-active-stats.repository.interface'; +import { DauCalculationService } from '../../../domain/services/dau-calculation.service'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { + startOfDayInTimezone, + endOfDayInTimezone, +} from '../../../shared/utils/timezone.util'; + +@Injectable() +@CommandHandler(CalculateDauCommand) +export class CalculateDauHandler implements ICommandHandler { + private readonly logger = new Logger(CalculateDauHandler.name); + + constructor( + @Inject(EVENT_LOG_REPOSITORY) + private readonly eventLogRepository: IEventLogRepository, + @Inject(DAILY_ACTIVE_STATS_REPOSITORY) + private readonly dauStatsRepository: IDailyActiveStatsRepository, + private readonly dauCalculationService: DauCalculationService, + ) {} + + async execute(command: CalculateDauCommand): Promise { + const { date } = command; + const timezone = 'Asia/Shanghai'; + + const startOfDay = startOfDayInTimezone(date, timezone); + const endOfDay = endOfDayInTimezone(date, timezone); + + this.logger.log(`Calculating DAU for ${date.toISOString().split('T')[0]}`); + + // 1. 查询去重用户数 + const result = await this.eventLogRepository.queryDau( + EventName.APP_SESSION_START, + startOfDay, + endOfDay, + ); + + // 2. 创建或更新统计聚合 + const existingStats = await this.dauStatsRepository.findByDate(date); + + if (existingStats) { + existingStats.recalculate(result.total, result.byProvince, result.byCity); + await this.dauStatsRepository.upsert(existingStats); + } else { + const stats = this.dauCalculationService.createStatsFromQueryResult(date, result); + await this.dauStatsRepository.upsert(stats); + } + + this.logger.log(`DAU calculated: ${result.total} users`); + } +} +``` + +### 2.4 查询:GetOnlineCount + +```typescript +// src/application/queries/get-online-count/get-online-count.query.ts + +export class GetOnlineCountQuery { + constructor() {} +} +``` + +```typescript +// src/application/queries/get-online-count/get-online-count.handler.ts + +import { IQueryHandler, QueryHandler } from '@nestjs/cqrs'; +import { Injectable } from '@nestjs/common'; +import { GetOnlineCountQuery } from './get-online-count.query'; +import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository'; +import { OnlineDetectionService } from '../../../domain/services/online-detection.service'; + +export interface OnlineCountResult { + count: number; + windowSeconds: number; + queriedAt: Date; +} + +@Injectable() +@QueryHandler(GetOnlineCountQuery) +export class GetOnlineCountHandler implements IQueryHandler { + constructor( + private readonly presenceRedisRepository: PresenceRedisRepository, + private readonly onlineDetectionService: OnlineDetectionService, + ) {} + + async execute(query: GetOnlineCountQuery): Promise { + const now = new Date(); + const threshold = this.onlineDetectionService.getOnlineThreshold(now); + + const count = await this.presenceRedisRepository.countOnlineUsers(threshold); + + return { + count, + windowSeconds: this.onlineDetectionService.getWindowSeconds(), + queriedAt: now, + }; + } +} +``` + +### 2.5 查询:GetDauStats + +```typescript +// src/application/queries/get-dau-stats/get-dau-stats.query.ts + +export class GetDauStatsQuery { + constructor( + public readonly startDate: Date, + public readonly endDate: Date, + ) {} +} +``` + +```typescript +// src/application/queries/get-dau-stats/get-dau-stats.handler.ts + +import { IQueryHandler, QueryHandler } from '@nestjs/cqrs'; +import { Inject, Injectable } from '@nestjs/common'; +import { GetDauStatsQuery } from './get-dau-stats.query'; +import { + IDailyActiveStatsRepository, + DAILY_ACTIVE_STATS_REPOSITORY, +} from '../../../domain/repositories/daily-active-stats.repository.interface'; + +export interface DauStatsItem { + day: string; + dauCount: number; + byProvince?: Record; + byCity?: Record; +} + +export interface DauStatsResult { + data: DauStatsItem[]; + total: number; +} + +@Injectable() +@QueryHandler(GetDauStatsQuery) +export class GetDauStatsHandler implements IQueryHandler { + constructor( + @Inject(DAILY_ACTIVE_STATS_REPOSITORY) + private readonly dauStatsRepository: IDailyActiveStatsRepository, + ) {} + + async execute(query: GetDauStatsQuery): Promise { + const { startDate, endDate } = query; + + const statsList = await this.dauStatsRepository.findByDateRange(startDate, endDate); + + const data: DauStatsItem[] = statsList.map((stats) => ({ + day: stats.day.toISOString().split('T')[0], + dauCount: stats.dauCount, + byProvince: Object.fromEntries(stats.dauByProvince), + byCity: Object.fromEntries(stats.dauByCity), + })); + + return { + data, + total: data.length, + }; + } +} +``` + +### 2.6 定时任务 + +```typescript +// src/application/schedulers/analytics.scheduler.ts + +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { CommandBus } from '@nestjs/cqrs'; +import { subDays } from 'date-fns'; +import { CalculateDauCommand } from '../commands/calculate-dau/calculate-dau.command'; +import { PresenceRedisRepository } from '../../infrastructure/redis/presence-redis.repository'; +import { OnlineDetectionService } from '../../domain/services/online-detection.service'; +import { OnlineSnapshot } from '../../domain/entities/online-snapshot.entity'; +import { IOnlineSnapshotRepository, ONLINE_SNAPSHOT_REPOSITORY } from '../../domain/repositories/online-snapshot.repository.interface'; +import { Inject } from '@nestjs/common'; + +@Injectable() +export class AnalyticsScheduler { + private readonly logger = new Logger(AnalyticsScheduler.name); + + constructor( + private readonly commandBus: CommandBus, + private readonly presenceRedisRepository: PresenceRedisRepository, + private readonly onlineDetectionService: OnlineDetectionService, + @Inject(ONLINE_SNAPSHOT_REPOSITORY) + private readonly snapshotRepository: IOnlineSnapshotRepository, + ) {} + + /** + * 每分钟记录在线人数快照 + */ + @Cron(CronExpression.EVERY_MINUTE) + async recordOnlineSnapshot(): Promise { + try { + const now = new Date(); + const threshold = this.onlineDetectionService.getOnlineThreshold(now); + const count = await this.presenceRedisRepository.countOnlineUsers(threshold); + + const snapshot = OnlineSnapshot.create({ + ts: now, + onlineCount: count, + windowSeconds: this.onlineDetectionService.getWindowSeconds(), + }); + + await this.snapshotRepository.insert(snapshot); + this.logger.debug(`Online snapshot recorded: ${count} users`); + } catch (error) { + this.logger.error('Failed to record online snapshot', error); + } + } + + /** + * 每小时清理过期在线数据 + */ + @Cron(CronExpression.EVERY_HOUR) + async cleanupExpiredPresence(): Promise { + try { + const threshold = Math.floor(Date.now() / 1000) - 86400; // 24小时前 + await this.presenceRedisRepository.removeExpiredUsers(threshold); + this.logger.log('Expired presence data cleaned up'); + } catch (error) { + this.logger.error('Failed to cleanup expired presence', error); + } + } + + /** + * 每天凌晨 1:00 计算前一天 DAU (Asia/Shanghai) + */ + @Cron('0 0 1 * * *', { timeZone: 'Asia/Shanghai' }) + async calculateYesterdayDau(): Promise { + try { + const yesterday = subDays(new Date(), 1); + await this.commandBus.execute(new CalculateDauCommand(yesterday)); + this.logger.log('Yesterday DAU calculated'); + } catch (error) { + this.logger.error('Failed to calculate yesterday DAU', error); + } + } + + /** + * 每小时滚动计算当天 DAU (用于实时看板) + */ + @Cron(CronExpression.EVERY_HOUR) + async calculateTodayDauRolling(): Promise { + try { + await this.commandBus.execute(new CalculateDauCommand(new Date())); + this.logger.debug('Today DAU rolling calculation completed'); + } catch (error) { + this.logger.error('Failed to calculate today DAU', error); + } + } +} +``` + +--- + +## 三、基础设施层设计 + +### 3.1 Prisma Schema + +```prisma +// prisma/schema.prisma + +generator client { + provider = "prisma-client-js" +} + +datasource db { + provider = "postgresql" + url = env("DATABASE_URL") +} + +// 事件日志表 (append-only) +model EventLog { + id BigInt @id @default(autoincrement()) + userId BigInt? @map("user_id") + installId String @map("install_id") @db.VarChar(64) + eventName String @map("event_name") @db.VarChar(64) + eventTime DateTime @map("event_time") @db.Timestamptz() + properties Json? @db.JsonB + createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz() + + @@index([eventTime], name: "idx_event_log_event_time") + @@index([eventName], name: "idx_event_log_event_name") + @@index([eventName, eventTime], name: "idx_event_log_event_name_time") + @@map("analytics_event_log") +} + +// 日活统计表 +model DailyActiveStats { + day DateTime @id @map("day") @db.Date + dauCount Int @map("dau_count") + dauByProvince Json? @map("dau_by_province") @db.JsonB + dauByCity Json? @map("dau_by_city") @db.JsonB + calculatedAt DateTime @map("calculated_at") @db.Timestamptz() + version Int @default(1) + + @@map("analytics_daily_active_users") +} + +// 在线人数快照表 +model OnlineSnapshot { + id BigInt @id @default(autoincrement()) + ts DateTime @unique @db.Timestamptz() + onlineCount Int @map("online_count") + windowSeconds Int @default(180) @map("window_seconds") + + @@index([ts(sort: Desc)], name: "idx_online_snapshots_ts") + @@map("analytics_online_snapshots") +} +``` + +### 3.2 Redis 仓储实现 + +```typescript +// src/infrastructure/redis/presence-redis.repository.ts + +import { Injectable } from '@nestjs/common'; +import { RedisService } from './redis.service'; + +@Injectable() +export class PresenceRedisRepository { + private readonly ONLINE_USERS_KEY = 'presence:online_users'; + + constructor(private readonly redisService: RedisService) {} + + /** + * 更新用户在线状态 + */ + async updateUserPresence(userId: string, timestamp: number): Promise { + await this.redisService.zadd(this.ONLINE_USERS_KEY, timestamp, userId); + } + + /** + * 统计在线用户数 + */ + async countOnlineUsers(thresholdTimestamp: number): Promise { + return this.redisService.zcount( + this.ONLINE_USERS_KEY, + thresholdTimestamp, + '+inf', + ); + } + + /** + * 获取在线用户列表 + */ + async getOnlineUsers(thresholdTimestamp: number, limit?: number): Promise { + const args: [string, number | string, number | string] = [ + this.ONLINE_USERS_KEY, + thresholdTimestamp, + '+inf', + ]; + + if (limit) { + return this.redisService.zrangebyscore( + this.ONLINE_USERS_KEY, + thresholdTimestamp, + '+inf', + 'LIMIT', + 0, + limit, + ); + } + + return this.redisService.zrangebyscore( + this.ONLINE_USERS_KEY, + thresholdTimestamp, + '+inf', + ); + } + + /** + * 移除过期用户 + */ + async removeExpiredUsers(thresholdTimestamp: number): Promise { + return this.redisService.zremrangebyscore( + this.ONLINE_USERS_KEY, + '-inf', + thresholdTimestamp, + ); + } + + /** + * 获取用户最后心跳时间 + */ + async getUserLastHeartbeat(userId: string): Promise { + const score = await this.redisService.zscore(this.ONLINE_USERS_KEY, userId); + return score ? Number(score) : null; + } +} +``` + +```typescript +// src/infrastructure/redis/redis.service.ts + +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; + +@Injectable() +export class RedisService implements OnModuleDestroy { + private readonly client: Redis; + + constructor(private readonly configService: ConfigService) { + this.client = new Redis({ + host: this.configService.get('REDIS_HOST', 'localhost'), + port: this.configService.get('REDIS_PORT', 6379), + password: this.configService.get('REDIS_PASSWORD'), + db: this.configService.get('REDIS_DB', 0), + }); + } + + async onModuleDestroy(): Promise { + await this.client.quit(); + } + + // ZSET 操作 + async zadd(key: string, score: number, member: string): Promise { + return this.client.zadd(key, score, member); + } + + async zcount(key: string, min: number | string, max: number | string): Promise { + return this.client.zcount(key, min, max); + } + + async zrangebyscore( + key: string, + min: number | string, + max: number | string, + ...args: (string | number)[] + ): Promise { + return this.client.zrangebyscore(key, min, max, ...args); + } + + async zremrangebyscore(key: string, min: number | string, max: number | string): Promise { + return this.client.zremrangebyscore(key, min, max); + } + + async zscore(key: string, member: string): Promise { + return this.client.zscore(key, member); + } + + // HyperLogLog 操作 + async pfadd(key: string, ...elements: string[]): Promise { + return this.client.pfadd(key, ...elements); + } + + async pfcount(...keys: string[]): Promise { + return this.client.pfcount(...keys); + } + + // 通用操作 + async expire(key: string, seconds: number): Promise { + return this.client.expire(key, seconds); + } + + async del(...keys: string[]): Promise { + return this.client.del(...keys); + } +} +``` + +### 3.3 仓储实现 + +```typescript +// src/infrastructure/persistence/repositories/event-log.repository.impl.ts + +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { + IEventLogRepository, + DauQueryResult, +} from '../../../domain/repositories/event-log.repository.interface'; +import { EventLog } from '../../../domain/entities/event-log.entity'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { EventLogMapper } from '../mappers/event-log.mapper'; + +@Injectable() +export class EventLogRepositoryImpl implements IEventLogRepository { + constructor( + private readonly prisma: PrismaService, + private readonly mapper: EventLogMapper, + ) {} + + async batchInsert(logs: EventLog[]): Promise { + const data = logs.map((log) => this.mapper.toPersistence(log)); + await this.prisma.eventLog.createMany({ data }); + } + + async insert(log: EventLog): Promise { + const data = this.mapper.toPersistence(log); + const created = await this.prisma.eventLog.create({ data }); + return this.mapper.toDomain(created); + } + + async queryDau( + eventName: EventName, + startTime: Date, + endTime: Date, + ): Promise { + // 使用原生 SQL 进行去重统计 + const result = await this.prisma.$queryRaw< + { total: bigint; province: string | null; city: string | null; count: bigint }[] + >` + WITH base AS ( + SELECT + COALESCE(user_id::text, install_id) AS identifier, + properties->>'province' AS province, + properties->>'city' AS city + FROM analytics_event_log + WHERE event_name = ${eventName.value} + AND event_time >= ${startTime} + AND event_time < ${endTime} + ), + unique_users AS ( + SELECT DISTINCT identifier, province, city FROM base + ) + SELECT + COUNT(DISTINCT identifier) AS total, + province, + city, + COUNT(*) AS count + FROM unique_users + GROUP BY GROUPING SETS ((), (province), (city)) + `; + + let total = 0; + const byProvince = new Map(); + const byCity = new Map(); + + for (const row of result) { + if (row.province === null && row.city === null) { + total = Number(row.total); + } else if (row.province !== null && row.city === null) { + byProvince.set(row.province, Number(row.count)); + } else if (row.city !== null && row.province === null) { + byCity.set(row.city, Number(row.count)); + } + } + + return { total, byProvince, byCity }; + } + + async findByTimeRange( + eventName: EventName, + startTime: Date, + endTime: Date, + limit?: number, + ): Promise { + const records = await this.prisma.eventLog.findMany({ + where: { + eventName: eventName.value, + eventTime: { + gte: startTime, + lt: endTime, + }, + }, + orderBy: { eventTime: 'desc' }, + take: limit, + }); + + return records.map((r) => this.mapper.toDomain(r)); + } +} +``` + +### 3.4 Kafka 事件发布 + +```typescript +// src/infrastructure/kafka/event-publisher.service.ts + +import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Producer, logLevel } from 'kafkajs'; + +@Injectable() +export class EventPublisherService implements OnModuleInit, OnModuleDestroy { + private kafka: Kafka; + private producer: Producer; + private readonly topic: string; + + constructor(private readonly configService: ConfigService) { + this.kafka = new Kafka({ + clientId: 'analytics-presence-service', + brokers: this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','), + logLevel: logLevel.WARN, + }); + this.producer = this.kafka.producer(); + this.topic = this.configService.get('KAFKA_TOPIC_ANALYTICS', 'analytics-events'); + } + + async onModuleInit(): Promise { + await this.producer.connect(); + } + + async onModuleDestroy(): Promise { + await this.producer.disconnect(); + } + + async publish(eventType: string, payload: unknown): Promise { + await this.producer.send({ + topic: this.topic, + messages: [ + { + key: eventType, + value: JSON.stringify({ + eventType, + payload, + occurredAt: new Date().toISOString(), + }), + }, + ], + }); + } +} +``` + +--- + +## 四、表现层设计 + +### 4.1 DTO 定义 + +```typescript +// src/api/dto/request/batch-events.dto.ts + +import { IsArray, IsString, IsOptional, IsNumber, ValidateNested, IsObject } from 'class-validator'; +import { Type } from 'class-transformer'; +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; + +export class EventItemDto { + @ApiProperty({ description: '事件名称', example: 'app_session_start' }) + @IsString() + eventName: string; + + @ApiPropertyOptional({ description: '用户ID (登录用户)', example: '12345' }) + @IsOptional() + @IsString() + userId?: string; + + @ApiProperty({ description: '安装ID', example: 'uuid-xxx-xxx' }) + @IsString() + installId: string; + + @ApiProperty({ description: '客户端时间戳 (秒)', example: 1732685100 }) + @IsNumber() + clientTs: number; + + @ApiPropertyOptional({ description: '事件属性' }) + @IsOptional() + @IsObject() + properties?: Record; +} + +export class BatchEventsDto { + @ApiProperty({ type: [EventItemDto], description: '事件列表' }) + @IsArray() + @ValidateNested({ each: true }) + @Type(() => EventItemDto) + events: EventItemDto[]; +} +``` + +```typescript +// src/api/dto/request/heartbeat.dto.ts + +import { IsString, IsNumber } from 'class-validator'; +import { ApiProperty } from '@nestjs/swagger'; + +export class HeartbeatDto { + @ApiProperty({ description: '安装ID', example: 'uuid-xxx-xxx' }) + @IsString() + installId: string; + + @ApiProperty({ description: 'App版本', example: '1.0.0' }) + @IsString() + appVersion: string; + + @ApiProperty({ description: '客户端时间戳 (秒)', example: 1732685100 }) + @IsNumber() + clientTs: number; +} +``` + +```typescript +// src/api/dto/request/query-dau.dto.ts + +import { IsDateString, IsOptional } from 'class-validator'; +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; + +export class QueryDauDto { + @ApiProperty({ description: '开始日期', example: '2025-01-01' }) + @IsDateString() + startDate: string; + + @ApiProperty({ description: '结束日期', example: '2025-01-15' }) + @IsDateString() + endDate: string; +} +``` + +```typescript +// src/api/dto/response/online-count.dto.ts + +import { ApiProperty } from '@nestjs/swagger'; + +export class OnlineCountResponseDto { + @ApiProperty({ description: '在线人数', example: 1234 }) + count: number; + + @ApiProperty({ description: '时间窗口(秒)', example: 180 }) + windowSeconds: number; + + @ApiProperty({ description: '查询时间', example: '2025-01-15T10:30:00.000Z' }) + queriedAt: string; +} +``` + +```typescript +// src/api/dto/response/dau-stats.dto.ts + +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; + +export class DauDayItemDto { + @ApiProperty({ description: '日期', example: '2025-01-15' }) + day: string; + + @ApiProperty({ description: 'DAU', example: 5678 }) + dauCount: number; + + @ApiPropertyOptional({ description: '按省份统计' }) + byProvince?: Record; + + @ApiPropertyOptional({ description: '按城市统计' }) + byCity?: Record; +} + +export class DauStatsResponseDto { + @ApiProperty({ type: [DauDayItemDto], description: 'DAU数据' }) + data: DauDayItemDto[]; + + @ApiProperty({ description: '记录数', example: 15 }) + total: number; +} +``` + +### 4.2 控制器 + +```typescript +// src/api/controllers/analytics.controller.ts + +import { Controller, Post, Get, Body, Query, UseGuards } from '@nestjs/common'; +import { CommandBus, QueryBus } from '@nestjs/cqrs'; +import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; +import { BatchEventsDto } from '../dto/request/batch-events.dto'; +import { QueryDauDto } from '../dto/request/query-dau.dto'; +import { DauStatsResponseDto } from '../dto/response/dau-stats.dto'; +import { RecordEventsCommand } from '../../application/commands/record-events/record-events.command'; +import { GetDauStatsQuery } from '../../application/queries/get-dau-stats/get-dau-stats.query'; +import { Public } from '../../shared/decorators/public.decorator'; +import { JwtAuthGuard } from '../../shared/guards/jwt-auth.guard'; + +@ApiTags('Analytics') +@Controller('analytics') +export class AnalyticsController { + constructor( + private readonly commandBus: CommandBus, + private readonly queryBus: QueryBus, + ) {} + + @Post('events') + @Public() + @ApiOperation({ summary: '批量上报事件' }) + async batchEvents(@Body() dto: BatchEventsDto) { + return this.commandBus.execute(new RecordEventsCommand(dto.events)); + } + + @Get('dau') + @UseGuards(JwtAuthGuard) + @ApiBearerAuth() + @ApiOperation({ summary: '查询DAU统计' }) + async getDauStats(@Query() dto: QueryDauDto): Promise { + return this.queryBus.execute( + new GetDauStatsQuery(new Date(dto.startDate), new Date(dto.endDate)), + ); + } +} +``` + +```typescript +// src/api/controllers/presence.controller.ts + +import { Controller, Post, Get, Body, UseGuards, Req } from '@nestjs/common'; +import { CommandBus, QueryBus } from '@nestjs/cqrs'; +import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; +import { HeartbeatDto } from '../dto/request/heartbeat.dto'; +import { OnlineCountResponseDto } from '../dto/response/online-count.dto'; +import { RecordHeartbeatCommand } from '../../application/commands/record-heartbeat/record-heartbeat.command'; +import { GetOnlineCountQuery } from '../../application/queries/get-online-count/get-online-count.query'; +import { JwtAuthGuard } from '../../shared/guards/jwt-auth.guard'; +import { CurrentUser } from '../../shared/decorators/current-user.decorator'; + +@ApiTags('Presence') +@Controller('presence') +export class PresenceController { + constructor( + private readonly commandBus: CommandBus, + private readonly queryBus: QueryBus, + ) {} + + @Post('heartbeat') + @UseGuards(JwtAuthGuard) + @ApiBearerAuth() + @ApiOperation({ summary: '心跳上报' }) + async heartbeat( + @CurrentUser('userId') userId: bigint, + @Body() dto: HeartbeatDto, + ) { + return this.commandBus.execute( + new RecordHeartbeatCommand( + userId, + dto.installId, + dto.appVersion, + dto.clientTs, + ), + ); + } + + @Get('online-count') + @UseGuards(JwtAuthGuard) + @ApiBearerAuth() + @ApiOperation({ summary: '获取当前在线人数' }) + async getOnlineCount(): Promise { + const result = await this.queryBus.execute(new GetOnlineCountQuery()); + return { + count: result.count, + windowSeconds: result.windowSeconds, + queriedAt: result.queriedAt.toISOString(), + }; + } +} +``` + +--- + +## 五、共享层工具 + +```typescript +// src/shared/utils/timezone.util.ts + +import { format, startOfDay, endOfDay } from 'date-fns'; +import { toZonedTime, fromZonedTime } from 'date-fns-tz'; + +const DEFAULT_TIMEZONE = 'Asia/Shanghai'; + +/** + * 获取指定时区的一天开始时间 + */ +export function startOfDayInTimezone(date: Date, timezone: string = DEFAULT_TIMEZONE): Date { + const zonedDate = toZonedTime(date, timezone); + const start = startOfDay(zonedDate); + return fromZonedTime(start, timezone); +} + +/** + * 获取指定时区的一天结束时间 + */ +export function endOfDayInTimezone(date: Date, timezone: string = DEFAULT_TIMEZONE): Date { + const zonedDate = toZonedTime(date, timezone); + const end = endOfDay(zonedDate); + return fromZonedTime(end, timezone); +} + +/** + * 格式化为日期Key (YYYY-MM-DD) + */ +export function formatToDateKey(date: Date, timezone: string = DEFAULT_TIMEZONE): string { + const zonedDate = toZonedTime(date, timezone); + return format(zonedDate, 'yyyy-MM-dd'); +} +``` + +--- + +## 六、环境配置 + +```bash +# .env.example + +# 应用配置 +NODE_ENV=development +PORT=3001 + +# 数据库 +DATABASE_URL=postgresql://user:password@localhost:5432/rwa_analytics?schema=public + +# Redis +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_PASSWORD= +REDIS_DB=0 + +# JWT (与 Identity Service 共用) +JWT_SECRET=your-jwt-secret +JWT_EXPIRES_IN=7d + +# Kafka +KAFKA_BROKERS=localhost:9092 +KAFKA_TOPIC_ANALYTICS=analytics-events + +# 时区 +TZ=Asia/Shanghai +``` + +--- + +## 七、领域不变式 + +1. **InstallId 不可为空** - 每个事件必须携带 InstallId +2. **DAU 去重逻辑** - 优先使用 userId,其次使用 installId +3. **时区统一** - 所有 DAU 计算使用 Asia/Shanghai 时区 +4. **心跳仅限登录用户** - 未登录用户不参与在线统计 +5. **事件日志 append-only** - 不允许修改或删除事件记录 +6. **在线判定窗口** - 默认 180 秒(3分钟) +7. **心跳间隔** - 客户端前台状态下每 60 秒上报一次 + +--- + +## 八、与其他上下文的集成 + +### 8.1 消费 Identity Context 事件 + +```typescript +// src/infrastructure/kafka/event-consumer.service.ts + +import { Injectable, OnModuleInit } from '@nestjs/common'; +import { CommandBus } from '@nestjs/cqrs'; +import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; +import { ConfigService } from '@nestjs/config'; + +@Injectable() +export class EventConsumerService implements OnModuleInit { + private consumer: Consumer; + + constructor( + private readonly configService: ConfigService, + private readonly commandBus: CommandBus, + ) { + const kafka = new Kafka({ + clientId: 'analytics-presence-consumer', + brokers: this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','), + }); + this.consumer = kafka.consumer({ groupId: 'analytics-presence-group' }); + } + + async onModuleInit(): Promise { + await this.consumer.connect(); + + // 订阅 Identity Context 的用户创建事件 + await this.consumer.subscribe({ topic: 'identity-events', fromBeginning: false }); + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + } + + private async handleMessage(payload: EachMessagePayload): Promise { + const { message } = payload; + const key = message.key?.toString(); + const value = message.value?.toString(); + + if (!value) return; + + const event = JSON.parse(value); + + // 处理用户创建事件 - 可用于初始化用户统计 + if (key === 'user.account.created') { + // 可选:记录注册事件 + } + } +} +``` + +### 8.2 为 Reporting Context 提供数据 + +```typescript +// src/api/controllers/internal.controller.ts + +import { Controller, Get, Query, UseGuards } from '@nestjs/common'; +import { QueryBus } from '@nestjs/cqrs'; +import { ApiTags, ApiOperation } from '@nestjs/swagger'; +import { InternalGuard } from '../../shared/guards/internal.guard'; +import { GetDauStatsQuery } from '../../application/queries/get-dau-stats/get-dau-stats.query'; + +@ApiTags('Internal') +@Controller('internal/analytics') +@UseGuards(InternalGuard) +export class InternalController { + constructor(private readonly queryBus: QueryBus) {} + + @Get('dau') + @ApiOperation({ summary: '内部接口: 获取DAU数据' }) + async getDauForReporting( + @Query('startDate') startDate: string, + @Query('endDate') endDate: string, + ) { + return this.queryBus.execute( + new GetDauStatsQuery(new Date(startDate), new Date(endDate)), + ); + } +} +``` + +--- + +## 九、快速开始 + +### 1. 安装依赖 + +```bash +npm install +``` + +### 2. 配置环境变量 + +```bash +cp .env.example .env +# 编辑 .env 文件 +``` + +### 3. 初始化数据库 + +```bash +npm run prisma:generate +npm run prisma:migrate +``` + +### 4. 启动服务 + +```bash +# 开发模式 +npm run start:dev + +# 生产模式 +npm run build +npm run start:prod +``` + +### 5. Docker 部署 + +```bash +docker-compose up -d +``` + +--- + +## API 文档 + +启动服务后访问: http://localhost:3001/api/docs + +## 主要 API + +| 方法 | 路径 | 说明 | 认证 | +|------|------|------|------| +| POST | /analytics/events | 批量上报事件 | 可选 | +| GET | /analytics/dau | 查询DAU统计 | 必需 | +| POST | /presence/heartbeat | 心跳上报 | 必需 | +| GET | /presence/online-count | 获取在线人数 | 必需 | + +--- + +## License + +Proprietary