feat(presence-service): 添加 Prometheus 指标监控和 Grafana Dashboard
Prometheus 指标端点 (/api/v1/metrics): - presence_online_users_total: 实时在线人数 - presence_dau_total: 今日 DAU - presence_heartbeat_total: 心跳计数 - presence_events_received_total: 事件上报计数 - presence_session_start_total: 会话开始计数 - presence_heartbeat_duration_seconds: 心跳处理延迟 - presence_event_batch_duration_seconds: 事件批处理延迟 Grafana Dashboard: - 核心指标概览 (在线人数、DAU、心跳、事件) - 趋势图表 (在线人数趋势、心跳/事件速率) - 事件分布 (饼图、按小时趋势) - 性能指标 (P50/P95/P99 延迟) - 服务资源 (内存、CPU) 配置更新: - prometheus.yml 添加 presence-service 抓取配置 - package.json 添加 prom-client 依赖 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
114a9e611c
commit
9656a6f4c4
File diff suppressed because it is too large
Load Diff
|
|
@ -1,5 +1,5 @@
|
|||
# =============================================================================
|
||||
# Prometheus 配置 - Kong API Gateway 监控
|
||||
# Prometheus 配置 - Kong API Gateway + RWA Services 监控
|
||||
# =============================================================================
|
||||
|
||||
global:
|
||||
|
|
@ -17,3 +17,21 @@ scrape_configs:
|
|||
- job_name: 'prometheus'
|
||||
static_configs:
|
||||
- targets: ['localhost:9090']
|
||||
|
||||
# ==========================================================================
|
||||
# RWA Presence Service - 用户活跃度与在线状态监控
|
||||
# ==========================================================================
|
||||
- job_name: 'presence-service'
|
||||
static_configs:
|
||||
# 生产环境: 使用内网 IP 或 Docker 网络名称
|
||||
# - targets: ['presence-service:3011']
|
||||
# 开发环境: 使用 host.docker.internal 访问宿主机服务
|
||||
- targets: ['host.docker.internal:3011']
|
||||
metrics_path: /api/v1/metrics
|
||||
scrape_interval: 15s
|
||||
scrape_timeout: 10s
|
||||
# 添加标签便于区分
|
||||
relabel_configs:
|
||||
- source_labels: [__address__]
|
||||
target_label: instance
|
||||
replacement: 'presence-service'
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@
|
|||
"date-fns-tz": "^2.0.0",
|
||||
"ioredis": "^5.3.2",
|
||||
"kafkajs": "^2.2.4",
|
||||
"prom-client": "^15.1.0",
|
||||
"reflect-metadata": "^0.1.13",
|
||||
"rxjs": "^7.8.1",
|
||||
"uuid": "^9.0.1"
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import {
|
|||
} from '../../../domain/repositories/event-log.repository.interface';
|
||||
import { RedisService } from '../../../infrastructure/redis/redis.service';
|
||||
import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service';
|
||||
import { MetricsService } from '../../../infrastructure/metrics/metrics.service';
|
||||
import { SessionStartedEvent } from '../../../domain/events/session-started.event';
|
||||
import { formatToDateKey } from '../../../shared/utils/timezone.util';
|
||||
|
||||
|
|
@ -28,9 +29,11 @@ export class RecordEventsHandler implements ICommandHandler<RecordEventsCommand>
|
|||
private readonly eventLogRepository: IEventLogRepository,
|
||||
private readonly redisService: RedisService,
|
||||
private readonly eventPublisher: EventPublisherService,
|
||||
private readonly metricsService: MetricsService,
|
||||
) {}
|
||||
|
||||
async execute(command: RecordEventsCommand): Promise<RecordEventsResult> {
|
||||
const startTime = Date.now();
|
||||
const { events } = command;
|
||||
const errors: string[] = [];
|
||||
const validLogs: EventLog[] = [];
|
||||
|
|
@ -78,6 +81,13 @@ export class RecordEventsHandler implements ICommandHandler<RecordEventsCommand>
|
|||
}
|
||||
}
|
||||
|
||||
// 5. 记录 Prometheus 指标
|
||||
for (const log of validLogs) {
|
||||
this.metricsService.recordEvent(log.eventName.value);
|
||||
}
|
||||
const durationSeconds = (Date.now() - startTime) / 1000;
|
||||
this.metricsService.recordEventBatchDuration(durationSeconds);
|
||||
|
||||
return {
|
||||
accepted: validLogs.length,
|
||||
failed: events.length - validLogs.length,
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { Injectable } from '@nestjs/common';
|
|||
import { RecordHeartbeatCommand } from './record-heartbeat.command';
|
||||
import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository';
|
||||
import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service';
|
||||
import { MetricsService } from '../../../infrastructure/metrics/metrics.service';
|
||||
import { HeartbeatReceivedEvent } from '../../../domain/events/heartbeat-received.event';
|
||||
|
||||
export interface RecordHeartbeatResult {
|
||||
|
|
@ -16,9 +17,11 @@ export class RecordHeartbeatHandler implements ICommandHandler<RecordHeartbeatCo
|
|||
constructor(
|
||||
private readonly presenceRedisRepository: PresenceRedisRepository,
|
||||
private readonly eventPublisher: EventPublisherService,
|
||||
private readonly metricsService: MetricsService,
|
||||
) {}
|
||||
|
||||
async execute(command: RecordHeartbeatCommand): Promise<RecordHeartbeatResult> {
|
||||
const startTime = Date.now();
|
||||
const { userId, installId, appVersion, clientTs } = command;
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
|
||||
|
|
@ -31,6 +34,10 @@ export class RecordHeartbeatHandler implements ICommandHandler<RecordHeartbeatCo
|
|||
new HeartbeatReceivedEvent(userId, installId, new Date()),
|
||||
);
|
||||
|
||||
// 3. 记录 Prometheus 指标
|
||||
const durationSeconds = (Date.now() - startTime) / 1000;
|
||||
this.metricsService.recordHeartbeat(appVersion, durationSeconds);
|
||||
|
||||
return { ok: true, serverTs: now };
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,12 +8,13 @@ import { DailyActiveStatsRepositoryImpl } from './persistence/repositories/daily
|
|||
import { OnlineSnapshotRepositoryImpl } from './persistence/repositories/online-snapshot.repository.impl';
|
||||
import { RedisModule } from './redis/redis.module';
|
||||
import { KafkaModule } from './kafka/kafka.module';
|
||||
import { MetricsModule } from './metrics/metrics.module';
|
||||
import { EVENT_LOG_REPOSITORY } from '../domain/repositories/event-log.repository.interface';
|
||||
import { DAILY_ACTIVE_STATS_REPOSITORY } from '../domain/repositories/daily-active-stats.repository.interface';
|
||||
import { ONLINE_SNAPSHOT_REPOSITORY } from '../domain/repositories/online-snapshot.repository.interface';
|
||||
|
||||
@Module({
|
||||
imports: [RedisModule, KafkaModule],
|
||||
imports: [RedisModule, KafkaModule, MetricsModule],
|
||||
providers: [
|
||||
PrismaService,
|
||||
EventLogMapper,
|
||||
|
|
@ -39,6 +40,7 @@ import { ONLINE_SNAPSHOT_REPOSITORY } from '../domain/repositories/online-snapsh
|
|||
ONLINE_SNAPSHOT_REPOSITORY,
|
||||
RedisModule,
|
||||
KafkaModule,
|
||||
MetricsModule,
|
||||
],
|
||||
})
|
||||
export class InfrastructureModule {}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,4 @@
|
|||
export * from './metrics.module';
|
||||
export * from './metrics.service';
|
||||
export * from './metrics.controller';
|
||||
export * from './metrics-collector.service';
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
import { Injectable, Logger, Inject } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { MetricsService } from './metrics.service';
|
||||
import { PresenceRedisRepository } from '../redis/presence-redis.repository';
|
||||
import {
|
||||
DAILY_ACTIVE_STATS_REPOSITORY,
|
||||
IDailyActiveStatsRepository,
|
||||
} from '../../domain/repositories/daily-active-stats.repository.interface';
|
||||
import {
|
||||
EVENT_LOG_REPOSITORY,
|
||||
IEventLogRepository,
|
||||
} from '../../domain/repositories/event-log.repository.interface';
|
||||
import { format } from 'date-fns';
|
||||
|
||||
@Injectable()
|
||||
export class MetricsCollectorService {
|
||||
private readonly logger = new Logger(MetricsCollectorService.name);
|
||||
private readonly presenceWindowSeconds: number;
|
||||
|
||||
constructor(
|
||||
private readonly metricsService: MetricsService,
|
||||
private readonly presenceRedisRepository: PresenceRedisRepository,
|
||||
@Inject(DAILY_ACTIVE_STATS_REPOSITORY)
|
||||
private readonly dauRepository: IDailyActiveStatsRepository,
|
||||
@Inject(EVENT_LOG_REPOSITORY)
|
||||
private readonly eventLogRepository: IEventLogRepository,
|
||||
) {
|
||||
this.presenceWindowSeconds = parseInt(
|
||||
process.env.PRESENCE_WINDOW_SECONDS || '180',
|
||||
10,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 每 15 秒更新在线人数指标
|
||||
*/
|
||||
@Cron('*/15 * * * * *')
|
||||
async collectOnlineUsersMetric(): Promise<void> {
|
||||
try {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const threshold = now - this.presenceWindowSeconds;
|
||||
const count = await this.presenceRedisRepository.countOnlineUsers(threshold);
|
||||
|
||||
this.metricsService.setOnlineUsers(count);
|
||||
this.logger.debug(`Online users metric updated: ${count}`);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to collect online users metric', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 每 5 分钟更新 DAU 指标
|
||||
*/
|
||||
@Cron(CronExpression.EVERY_5_MINUTES)
|
||||
async collectDauMetric(): Promise<void> {
|
||||
try {
|
||||
const today = new Date();
|
||||
const dateStr = format(today, 'yyyy-MM-dd');
|
||||
|
||||
const stats = await this.dauRepository.findByDay(today);
|
||||
if (stats) {
|
||||
this.metricsService.setDau(dateStr, stats.dauCount);
|
||||
this.logger.debug(`DAU metric updated: ${dateStr} = ${stats.dauCount}`);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to collect DAU metric', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动时立即收集一次
|
||||
*/
|
||||
async onModuleInit(): Promise<void> {
|
||||
await this.collectOnlineUsersMetric();
|
||||
await this.collectDauMetric();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
import { Controller, Get, Header, Res } from '@nestjs/common';
|
||||
import { ApiTags, ApiOperation, ApiExcludeEndpoint } from '@nestjs/swagger';
|
||||
import { Response } from 'express';
|
||||
import { MetricsService } from './metrics.service';
|
||||
|
||||
@ApiTags('Metrics')
|
||||
@Controller('metrics')
|
||||
export class MetricsController {
|
||||
constructor(private readonly metricsService: MetricsService) {}
|
||||
|
||||
@Get()
|
||||
@ApiExcludeEndpoint() // 不在 Swagger 文档中显示
|
||||
@ApiOperation({ summary: 'Prometheus 指标端点' })
|
||||
async getMetrics(@Res() res: Response): Promise<void> {
|
||||
const metrics = await this.metricsService.getMetrics();
|
||||
res.header('Content-Type', this.metricsService.getContentType());
|
||||
res.send(metrics);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
import { Module, Global } from '@nestjs/common';
|
||||
import { MetricsService } from './metrics.service';
|
||||
import { MetricsController } from './metrics.controller';
|
||||
import { MetricsCollectorService } from './metrics-collector.service';
|
||||
|
||||
@Global()
|
||||
@Module({
|
||||
controllers: [MetricsController],
|
||||
providers: [MetricsService, MetricsCollectorService],
|
||||
exports: [MetricsService],
|
||||
})
|
||||
export class MetricsModule {}
|
||||
|
|
@ -0,0 +1,161 @@
|
|||
import { Injectable, OnModuleInit } from '@nestjs/common';
|
||||
import { Registry, Gauge, Counter, Histogram, collectDefaultMetrics } from 'prom-client';
|
||||
|
||||
@Injectable()
|
||||
export class MetricsService implements OnModuleInit {
|
||||
private readonly registry: Registry;
|
||||
|
||||
// ============ Gauges (当前值) ============
|
||||
|
||||
/** 当前在线人数 */
|
||||
public readonly onlineUsersGauge: Gauge<string>;
|
||||
|
||||
/** 今日 DAU */
|
||||
public readonly dauGauge: Gauge<string>;
|
||||
|
||||
/** 事件队列待处理数 */
|
||||
public readonly eventQueueSizeGauge: Gauge<string>;
|
||||
|
||||
// ============ Counters (累计值) ============
|
||||
|
||||
/** 心跳总数 */
|
||||
public readonly heartbeatTotal: Counter<string>;
|
||||
|
||||
/** 事件上报总数 */
|
||||
public readonly eventsReceivedTotal: Counter<string>;
|
||||
|
||||
/** 会话开始总数 */
|
||||
public readonly sessionStartTotal: Counter<string>;
|
||||
|
||||
/** 会话结束总数 */
|
||||
public readonly sessionEndTotal: Counter<string>;
|
||||
|
||||
// ============ Histograms (分布) ============
|
||||
|
||||
/** 心跳处理时间 */
|
||||
public readonly heartbeatDuration: Histogram<string>;
|
||||
|
||||
/** 事件批量上传处理时间 */
|
||||
public readonly eventBatchDuration: Histogram<string>;
|
||||
|
||||
constructor() {
|
||||
this.registry = new Registry();
|
||||
|
||||
// 默认指标 (CPU, 内存, GC 等)
|
||||
collectDefaultMetrics({ register: this.registry, prefix: 'presence_' });
|
||||
|
||||
// ============ 定义 Gauges ============
|
||||
|
||||
this.onlineUsersGauge = new Gauge({
|
||||
name: 'presence_online_users_total',
|
||||
help: 'Current number of online users',
|
||||
registers: [this.registry],
|
||||
});
|
||||
|
||||
this.dauGauge = new Gauge({
|
||||
name: 'presence_dau_total',
|
||||
help: 'Daily Active Users count',
|
||||
labelNames: ['date'],
|
||||
registers: [this.registry],
|
||||
});
|
||||
|
||||
this.eventQueueSizeGauge = new Gauge({
|
||||
name: 'presence_event_queue_size',
|
||||
help: 'Number of events pending in queue',
|
||||
registers: [this.registry],
|
||||
});
|
||||
|
||||
// ============ 定义 Counters ============
|
||||
|
||||
this.heartbeatTotal = new Counter({
|
||||
name: 'presence_heartbeat_total',
|
||||
help: 'Total number of heartbeats received',
|
||||
labelNames: ['app_version'],
|
||||
registers: [this.registry],
|
||||
});
|
||||
|
||||
this.eventsReceivedTotal = new Counter({
|
||||
name: 'presence_events_received_total',
|
||||
help: 'Total number of telemetry events received',
|
||||
labelNames: ['event_name'],
|
||||
registers: [this.registry],
|
||||
});
|
||||
|
||||
this.sessionStartTotal = new Counter({
|
||||
name: 'presence_session_start_total',
|
||||
help: 'Total number of session starts (for DAU)',
|
||||
registers: [this.registry],
|
||||
});
|
||||
|
||||
this.sessionEndTotal = new Counter({
|
||||
name: 'presence_session_end_total',
|
||||
help: 'Total number of session ends',
|
||||
registers: [this.registry],
|
||||
});
|
||||
|
||||
// ============ 定义 Histograms ============
|
||||
|
||||
this.heartbeatDuration = new Histogram({
|
||||
name: 'presence_heartbeat_duration_seconds',
|
||||
help: 'Heartbeat processing duration in seconds',
|
||||
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
|
||||
registers: [this.registry],
|
||||
});
|
||||
|
||||
this.eventBatchDuration = new Histogram({
|
||||
name: 'presence_event_batch_duration_seconds',
|
||||
help: 'Event batch processing duration in seconds',
|
||||
buckets: [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5],
|
||||
registers: [this.registry],
|
||||
});
|
||||
}
|
||||
|
||||
onModuleInit() {
|
||||
// 初始化时可以加载一些初始值
|
||||
}
|
||||
|
||||
/** 获取 Prometheus 格式的指标 */
|
||||
async getMetrics(): Promise<string> {
|
||||
return this.registry.metrics();
|
||||
}
|
||||
|
||||
/** 获取 Content-Type */
|
||||
getContentType(): string {
|
||||
return this.registry.contentType;
|
||||
}
|
||||
|
||||
// ============ 便捷方法 ============
|
||||
|
||||
/** 更新在线人数 */
|
||||
setOnlineUsers(count: number): void {
|
||||
this.onlineUsersGauge.set(count);
|
||||
}
|
||||
|
||||
/** 更新 DAU */
|
||||
setDau(date: string, count: number): void {
|
||||
this.dauGauge.labels(date).set(count);
|
||||
}
|
||||
|
||||
/** 记录心跳 */
|
||||
recordHeartbeat(appVersion: string, durationSeconds: number): void {
|
||||
this.heartbeatTotal.labels(appVersion).inc();
|
||||
this.heartbeatDuration.observe(durationSeconds);
|
||||
}
|
||||
|
||||
/** 记录事件 */
|
||||
recordEvent(eventName: string): void {
|
||||
this.eventsReceivedTotal.labels(eventName).inc();
|
||||
|
||||
// 特殊处理会话事件
|
||||
if (eventName === 'app_session_start') {
|
||||
this.sessionStartTotal.inc();
|
||||
} else if (eventName === 'app_session_end') {
|
||||
this.sessionEndTotal.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/** 记录批量事件处理时间 */
|
||||
recordEventBatchDuration(durationSeconds: number): void {
|
||||
this.eventBatchDuration.observe(durationSeconds);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue