rwadurian/docs/telemetry-presence-guide.md

13 KiB
Raw Blame History

Flutter + NestJS 实时在线统计 & DAU 系统移植指南

基于 RWADurian 项目提炼,适用于任何需要实时在线人数和**日活用户DAU**统计的 Flutter + NestJS 项目。


系统架构总览

Flutter App
  └─ TelemetryService单例
       ├─ SessionManager      → 监听前台/后台切换
       ├─ HeartbeatService    → 前台时每60s发一次心跳
       └─ TelemetryUploader   → 批量上传行为事件

NestJS presence-service
  ├─ POST /presence/heartbeat     → 记录在线时间戳到 Redis Sorted Set
  ├─ GET  /presence/online-count  → 实时在线人数
  ├─ GET  /presence/online-history → 历史在线人数曲线
  ├─ GET  /analytics/dau          → DAU 查询
  └─ POST /analytics/events       → 批量行为事件上报(可选)

Redis Sorted Set: presence:online_users
  key=userId, score=最后心跳Unix时间戳
  → ZCOUNT(now-180s, +inf) = 当前在线人数

PostgreSQL (rwa_presence schema)
  online_snapshots: 每分钟快照在线人数
  daily_active_stats: 每日DAU汇总
  event_logs: 行为事件记录(可选)

第一部分后端presence-service

哪些代码完全通用(直接复制,零修改)

backend/services/presence-service/
├── src/
│   ├── infrastructure/redis/        ← Redis Sorted Set 全套操作,与业务无关
│   ├── domain/services/             ← 在线判定逻辑(窗口时间)
│   ├── application/schedulers/      ← 定时任务快照、DAU计算、清理
│   ├── application/queries/         ← 查询在线数、历史、DAU
│   └── application/commands/record-heartbeat/ ← 心跳处理

哪些需要按项目调整

1. 环境变量(docker-compose.yml.env

# 必须改的
DATABASE_URL: postgresql://user:pass@postgres:5432/your_db_presence
JWT_SECRET: 与你的 identity-service 共用同一个密钥  # ← 关键!

# 可选调整
PRESENCE_WINDOW_SECONDS: 180   # 多少秒无心跳算离线默认3分钟
SNAPSHOT_INTERVAL_SECONDS: 60  # 快照频率默认1分钟
REDIS_DB: 10                   # Redis DB编号与其他服务隔离
APP_PORT: 3011                 # 服务端口

2. JWT 验证(src/shared/guards/jwt-auth.guard.ts

该文件从 JWT 解码出 userId需确认字段名与你的 token payload 一致:

// 检查你的 JWT payload 里用户ID的字段名
// RWADurian 用的是 userSerialNum (e.g. "D25121400005")
// 如果你的项目用 sub 或 userId需修改 current-user.decorator.ts
// src/shared/decorators/current-user.decorator.ts
// 确认这里取的字段名与你的 token payload 匹配
export const CurrentUser = createParamDecorator(
  (field: string, ctx: ExecutionContext) => {
    const user = ctx.switchToHttp().getRequest().user;
    return field ? user?.[field] : user;
  },
);

// controller 里用法:
// @CurrentUser('userId') userId: string
// 改成你 token payload 里的实际字段名

3. Prisma schema 数据库名

// prisma/schema.prisma
datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
  // DATABASE_URL 里的数据库名改成你的项目名,例如:
  // postgresql://user:pass@localhost:5432/myapp_presence
}

4. Kong API 网关路由(如果用 Kong

# api-gateway/kong.yml 添加:
- name: presence-service
  url: http://presence-service:3011
  routes:
    - name: presence-api
      paths:
        - /api/v1/presence
    - name: presence-analytics
      paths:
        - /api/v1/analytics

如果不用 Kong用 Nginx 或直接暴露端口同理。


第二部分前端Flutter

哪些代码完全通用(直接复制整个目录)

lib/core/telemetry/
├── telemetry_service.dart      ← 主入口,单例
├── models/
│   ├── telemetry_event.dart    ← 事件模型
│   ├── telemetry_config.dart   ← 远程配置模型
│   └── device_context.dart     ← 设备信息模型
├── collectors/
│   └── device_info_collector.dart  ← 收集设备/系统信息
├── storage/
│   └── telemetry_storage.dart  ← SharedPreferences 本地队列
├── uploader/
│   └── telemetry_uploader.dart ← 批量上传事件
├── session/
│   ├── session_manager.dart    ← 前台/后台生命周期监听
│   └── session_events.dart     ← 事件名常量
└── presence/
    ├── heartbeat_service.dart  ← 心跳定时器
    └── presence_config.dart    ← 心跳配置

这些文件与业务零耦合,整个目录直接复制到新项目的 lib/core/telemetry/ 即可。

必须安装的 Flutter 依赖

# pubspec.yaml
dependencies:
  dio: ^5.4.3          # HTTP 客户端(心跳和上传)
  shared_preferences: ^2.2.3  # 本地队列存储
  uuid: ^4.3.3         # 生成 installId 和 eventId
  device_info_plus: ^10.1.0   # 获取设备信息
  package_info_plus: ^8.0.0   # 获取 App 版本

需要按项目修改的3个接入点

接入点1启动时初始化在首屏 或 splash_page 调用)

// 在你的 splash_page.dart 或 bootstrap.dart 里调用
// 需要 BuildContext用于获取屏幕尺寸等设备信息

await TelemetryService().initialize(
  apiBaseUrl: 'https://your-api.example.com',  // ← 改成你的 API 地址(不含 /api/v1
  context: context,
  userId: currentUserId,  // 已登录则传,未登录传 null
);

接入点2登录成功后注入 token在你的 auth/login 处理代码里)

// 登录成功,保存 token 之后,立即调用:
if (TelemetryService().isInitialized) {
  TelemetryService().setUserId(response.userId);       // ← 改成你的用户ID字段
  TelemetryService().setAccessToken(response.accessToken);  // ← 改成你的 token 字段
}

接入点3退出登录时清除

// 退出登录时调用:
if (TelemetryService().isInitialized) {
  TelemetryService().clearUserId();
  TelemetryService().clearAccessToken();
}

可选:账号切换时更新 token

如果你的 App 支持多账号切换:

// 账号切换完成SecureStorage 已恢复新账号数据后:
if (TelemetryService().isInitialized) {
  TelemetryService().setUserId(newUserId);
  // 从 SecureStorage 读出恢复后的 token
  final token = await secureStorage.read(key: 'access_token');
  TelemetryService().setAccessToken(token);
}

第三部分事件上报格式Amplitude 风格)

设备字段放顶层,不放 properties

// POST /api/v1/analytics/events 的单条事件格式
{
  "eventName": "page_view",
  "userId": "D25121400005",
  "installId": "uuid-v4-xxx",
  "clientTs": 1709644800,

  // 设备字段:顶层独立列(可走数据库索引)
  "deviceBrand": "Xiaomi",
  "deviceModel": "Redmi Note 12",
  "deviceOs": "13",
  "appVersion": "1.2.0",
  "locale": "zh_CN",

  // properties仅保留事件专属数据
  "properties": {
    "page": "trading",
    "eventId": "uuid-v4-xxx",
    "type": "pageView",
    "sessionId": "uuid-v4-xxx"
  }
}

为什么不放 properties

放进 JSONB properties 顶层独立列
按设备品牌分组 properties->>'deviceBrand',无法走索引 GROUP BY device_brandB-tree 索引直接命中
亿级数据查询 全表扫描(慢) 毫秒级
适用规模 < 百万行 千万/亿级

实现原理

前端本地队列Hive仍将设备字段存在 properties 内,保持本地格式简单;上传时 toServerJson() 自动将它们提取为顶层字段,后端按顶层字段写入独立数据库列。

// telemetry_event.dart - toServerJson() 的核心逻辑
final props = Map<String, dynamic>.from(properties ?? {});
final deviceBrand = props.remove('device_brand');  // 从 props 里取出
// ...
return {
  'deviceBrand': deviceBrand,  // 放顶层
  'properties': { ...props },  // 剩余事件专属数据
};

对应后端数据库列

-- analytics_event_log 表的设备列(均有索引)
device_brand  VARCHAR(64)  -- 索引:按品牌统计设备分布
device_model  VARCHAR(64)
device_os     VARCHAR(32)
app_version   VARCHAR(32)  -- 索引:按版本统计留存/覆盖率
locale        VARCHAR(16)

第四部分:心跳接口规格

前端发送的心跳请求格式(固定,不需要修改):

POST /api/v1/presence/heartbeat
Authorization: Bearer <JWT>

{
  "installId": "uuid-v4-设备唯一标识",
  "appVersion": "1.0.0",
  "clientTs": 1709644800   // Unix 时间戳(秒)
}

Response: { "ok": true, "serverTs": 1709644800 }

后端从 JWT 解码出 userId不需要前端传。


第五部分:查询接口(给管理后台用)

# 当前实时在线人数
GET /api/v1/presence/online-count
Authorization: Bearer <admin-token>
→ { "count": 128, "windowSeconds": 180, "queriedAt": "2026-03-05T15:00:00Z" }

# 历史在线人数(时间段 + 间隔)
GET /api/v1/presence/online-history?startTime=2026-03-05T00:00:00Z&endTime=2026-03-05T23:59:59Z&interval=5m
Authorization: Bearer <admin-token>

# DAU 统计
GET /api/v1/analytics/dau?startDate=2026-03-01&endDate=2026-03-05
Authorization: Bearer <admin-token>

# 行为事件上报(无需认证,批量)
POST /api/v1/analytics/events
{ "events": [ { "eventName": "page_view", "installId": "...", "clientTs": 123, ... } ] }

第六部分DAU 计算逻辑

DAU 不依赖心跳而是依赖行为事件session_start

App 进入前台
  → SessionManager._startNewSession()
  → TelemetryService.logEvent('app_session_start', type: session)
  → TelemetryUploader 批量上传到 POST /analytics/events
  → presence-service 记录到 event_logs 表

每天凌晨1点Asia/Shanghai
  → AnalyticsScheduler.calculateYesterdayDau()
  → 统计昨天有 app_session_start 事件的去重 userId/installId 数
  → 写入 daily_active_stats 表

因此,DAU 对未登录用户也有效(用 installId 去重)。


第七部分:在线人数 vs DAU 的区别

实时在线人数 DAU
数据来源 心跳每60s 会话开始事件app_session_start
存储 Redis Sorted Set内存 PostgreSQL持久化
统计周期 实时180s窗口 按自然日
未登录用户 不统计(心跳需要 JWT 统计(用 installId 去重)
精度 ±60s 按天

第八部分:完整接入 Checklist

后端

  • 复制 presence-service/ 整个目录到新项目
  • 修改 DATABASE_URL(数据库名改为新项目专用)
  • 确认 JWT_SECRET 与 auth 服务共用同一个
  • 确认 current-user.decorator.ts 里取的 userId 字段名正确
  • 配置 API 网关路由(/api/v1/presence/api/v1/analytics
  • 部署并确认容器启动、Prisma migration 自动执行

前端

  • 复制 lib/core/telemetry/ 整个目录到新项目
  • 安装依赖:dio, shared_preferences, uuid, device_info_plus, package_info_plus
  • 在 splash/首屏调用 TelemetryService().initialize(apiBaseUrl: '...')
  • 登录成功后调用 setUserId() + setAccessToken()
  • 退出登录时调用 clearUserId() + clearAccessToken()

验证

  • 登录后等待60s查看后端日志是否有心跳记录
  • 调用 GET /api/v1/presence/online-countcount 应该 ≥ 1
  • 次日查看 GET /api/v1/analytics/dau,应有昨日数据

常见问题

Q: 心跳失败会影响 App 吗? A: 不会。心跳完全异步,失败只打 debug 日志等下一个60s周期重试。presence-service 宕机期间 App 正常使用。

Q: 为什么在线判定窗口是180s不是60s A: 心跳每60s发一次考虑网络抖动用3倍窗口180s避免频繁出入"在线"状态。如需更严格,把 PRESENCE_WINDOW_SECONDS 改小即可。

Q: 未登录用户算在线吗? A: 默认不算(PresenceConfig.requiresAuth = true)。若需统计未登录用户,把 requiresAuth 改为 false,同时后端心跳接口需去掉 @UseGuards(JwtAuthGuard)

Q: Redis 断了怎么办? A: 在线人数数据会丢失,但已写入 PostgreSQL 的快照不受影响。Redis 恢复后重新开始积累数据。

Q: DAU 和实时在线用同一个 Redis key 吗? A: 不是。在线人数用 Redispresence:online_usersDAU 用 PostgreSQL 的 event_logs 表计算,两套数据互不干扰。

Q: 心跳接口加了 JWT 校验,未登录用户怎么处理? A: HeartbeatService 在发心跳前会检查 getUserId?.call() == null,未登录直接跳过,不发请求,不报错。


源码位置RWADurian 项目)

组件 路径
后端服务 backend/services/presence-service/
Flutter 遥测模块 frontend/mobile-app/lib/core/telemetry/
接入示例(初始化) frontend/mobile-app/lib/bootstrap.dart 第132行
接入示例(登录) frontend/mobile-app/lib/core/services/account_service.dart _saveAccountData()
接入示例(退出) frontend/mobile-app/lib/core/services/multi_account_service.dart deleteAccount()
Kong 路由配置 backend/api-gateway/kong.yml
Grafana 看板 backend/api-gateway/grafana/provisioning/dashboards/presence-dashboard.json