13 KiB
Flutter + NestJS 实时在线统计 & DAU 系统移植指南
基于 RWADurian 项目提炼,适用于任何需要实时在线人数和**日活用户(DAU)**统计的 Flutter + NestJS 项目。
系统架构总览
Flutter App
└─ TelemetryService(单例)
├─ SessionManager → 监听前台/后台切换
├─ HeartbeatService → 前台时每60s发一次心跳
└─ TelemetryUploader → 批量上传行为事件
NestJS presence-service
├─ POST /presence/heartbeat → 记录在线时间戳到 Redis Sorted Set
├─ GET /presence/online-count → 实时在线人数
├─ GET /presence/online-history → 历史在线人数曲线
├─ GET /analytics/dau → DAU 查询
└─ POST /analytics/events → 批量行为事件上报(可选)
Redis Sorted Set: presence:online_users
key=userId, score=最后心跳Unix时间戳
→ ZCOUNT(now-180s, +inf) = 当前在线人数
PostgreSQL (rwa_presence schema)
online_snapshots: 每分钟快照在线人数
daily_active_stats: 每日DAU汇总
event_logs: 行为事件记录(可选)
第一部分:后端(presence-service)
哪些代码完全通用(直接复制,零修改)
backend/services/presence-service/
├── src/
│ ├── infrastructure/redis/ ← Redis Sorted Set 全套操作,与业务无关
│ ├── domain/services/ ← 在线判定逻辑(窗口时间)
│ ├── application/schedulers/ ← 定时任务(快照、DAU计算、清理)
│ ├── application/queries/ ← 查询在线数、历史、DAU
│ └── application/commands/record-heartbeat/ ← 心跳处理
哪些需要按项目调整
1. 环境变量(docker-compose.yml 或 .env)
# 必须改的
DATABASE_URL: postgresql://user:pass@postgres:5432/your_db_presence
JWT_SECRET: 与你的 identity-service 共用同一个密钥 # ← 关键!
# 可选调整
PRESENCE_WINDOW_SECONDS: 180 # 多少秒无心跳算离线,默认3分钟
SNAPSHOT_INTERVAL_SECONDS: 60 # 快照频率,默认1分钟
REDIS_DB: 10 # Redis DB编号,与其他服务隔离
APP_PORT: 3011 # 服务端口
2. JWT 验证(src/shared/guards/jwt-auth.guard.ts)
该文件从 JWT 解码出 userId,需确认字段名与你的 token payload 一致:
// 检查你的 JWT payload 里用户ID的字段名
// RWADurian 用的是 userSerialNum (e.g. "D25121400005")
// 如果你的项目用 sub 或 userId,需修改 current-user.decorator.ts
// src/shared/decorators/current-user.decorator.ts
// 确认这里取的字段名与你的 token payload 匹配
export const CurrentUser = createParamDecorator(
(field: string, ctx: ExecutionContext) => {
const user = ctx.switchToHttp().getRequest().user;
return field ? user?.[field] : user;
},
);
// controller 里用法:
// @CurrentUser('userId') userId: string
// 改成你 token payload 里的实际字段名
3. Prisma schema 数据库名
// prisma/schema.prisma
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
// DATABASE_URL 里的数据库名改成你的项目名,例如:
// postgresql://user:pass@localhost:5432/myapp_presence
}
4. Kong API 网关路由(如果用 Kong)
# api-gateway/kong.yml 添加:
- name: presence-service
url: http://presence-service:3011
routes:
- name: presence-api
paths:
- /api/v1/presence
- name: presence-analytics
paths:
- /api/v1/analytics
如果不用 Kong,用 Nginx 或直接暴露端口同理。
第二部分:前端(Flutter)
哪些代码完全通用(直接复制整个目录)
lib/core/telemetry/
├── telemetry_service.dart ← 主入口,单例
├── models/
│ ├── telemetry_event.dart ← 事件模型
│ ├── telemetry_config.dart ← 远程配置模型
│ └── device_context.dart ← 设备信息模型
├── collectors/
│ └── device_info_collector.dart ← 收集设备/系统信息
├── storage/
│ └── telemetry_storage.dart ← SharedPreferences 本地队列
├── uploader/
│ └── telemetry_uploader.dart ← 批量上传事件
├── session/
│ ├── session_manager.dart ← 前台/后台生命周期监听
│ └── session_events.dart ← 事件名常量
└── presence/
├── heartbeat_service.dart ← 心跳定时器
└── presence_config.dart ← 心跳配置
这些文件与业务零耦合,整个目录直接复制到新项目的 lib/core/telemetry/ 即可。
必须安装的 Flutter 依赖
# pubspec.yaml
dependencies:
dio: ^5.4.3 # HTTP 客户端(心跳和上传)
shared_preferences: ^2.2.3 # 本地队列存储
uuid: ^4.3.3 # 生成 installId 和 eventId
device_info_plus: ^10.1.0 # 获取设备信息
package_info_plus: ^8.0.0 # 获取 App 版本
需要按项目修改的3个接入点
接入点1:启动时初始化(在首屏 或 splash_page 调用)
// 在你的 splash_page.dart 或 bootstrap.dart 里调用
// 需要 BuildContext(用于获取屏幕尺寸等设备信息)
await TelemetryService().initialize(
apiBaseUrl: 'https://your-api.example.com', // ← 改成你的 API 地址(不含 /api/v1)
context: context,
userId: currentUserId, // 已登录则传,未登录传 null
);
接入点2:登录成功后注入 token(在你的 auth/login 处理代码里)
// 登录成功,保存 token 之后,立即调用:
if (TelemetryService().isInitialized) {
TelemetryService().setUserId(response.userId); // ← 改成你的用户ID字段
TelemetryService().setAccessToken(response.accessToken); // ← 改成你的 token 字段
}
接入点3:退出登录时清除
// 退出登录时调用:
if (TelemetryService().isInitialized) {
TelemetryService().clearUserId();
TelemetryService().clearAccessToken();
}
可选:账号切换时更新 token
如果你的 App 支持多账号切换:
// 账号切换完成,SecureStorage 已恢复新账号数据后:
if (TelemetryService().isInitialized) {
TelemetryService().setUserId(newUserId);
// 从 SecureStorage 读出恢复后的 token
final token = await secureStorage.read(key: 'access_token');
TelemetryService().setAccessToken(token);
}
第三部分:事件上报格式(Amplitude 风格)
设备字段放顶层,不放 properties
// POST /api/v1/analytics/events 的单条事件格式
{
"eventName": "page_view",
"userId": "D25121400005",
"installId": "uuid-v4-xxx",
"clientTs": 1709644800,
// 设备字段:顶层独立列(可走数据库索引)
"deviceBrand": "Xiaomi",
"deviceModel": "Redmi Note 12",
"deviceOs": "13",
"appVersion": "1.2.0",
"locale": "zh_CN",
// properties:仅保留事件专属数据
"properties": {
"page": "trading",
"eventId": "uuid-v4-xxx",
"type": "pageView",
"sessionId": "uuid-v4-xxx"
}
}
为什么不放 properties?
| 放进 JSONB properties | 顶层独立列 | |
|---|---|---|
| 按设备品牌分组 | properties->>'deviceBrand',无法走索引 |
GROUP BY device_brand,B-tree 索引直接命中 |
| 亿级数据查询 | 全表扫描(慢) | 毫秒级 |
| 适用规模 | < 百万行 | 千万/亿级 |
实现原理
前端本地队列(Hive)仍将设备字段存在 properties 内,保持本地格式简单;上传时 toServerJson() 自动将它们提取为顶层字段,后端按顶层字段写入独立数据库列。
// telemetry_event.dart - toServerJson() 的核心逻辑
final props = Map<String, dynamic>.from(properties ?? {});
final deviceBrand = props.remove('device_brand'); // 从 props 里取出
// ...
return {
'deviceBrand': deviceBrand, // 放顶层
'properties': { ...props }, // 剩余事件专属数据
};
对应后端数据库列
-- analytics_event_log 表的设备列(均有索引)
device_brand VARCHAR(64) -- 索引:按品牌统计设备分布
device_model VARCHAR(64)
device_os VARCHAR(32)
app_version VARCHAR(32) -- 索引:按版本统计留存/覆盖率
locale VARCHAR(16)
第四部分:心跳接口规格
前端发送的心跳请求格式(固定,不需要修改):
POST /api/v1/presence/heartbeat
Authorization: Bearer <JWT>
{
"installId": "uuid-v4-设备唯一标识",
"appVersion": "1.0.0",
"clientTs": 1709644800 // Unix 时间戳(秒)
}
Response: { "ok": true, "serverTs": 1709644800 }
后端从 JWT 解码出 userId,不需要前端传。
第五部分:查询接口(给管理后台用)
# 当前实时在线人数
GET /api/v1/presence/online-count
Authorization: Bearer <admin-token>
→ { "count": 128, "windowSeconds": 180, "queriedAt": "2026-03-05T15:00:00Z" }
# 历史在线人数(时间段 + 间隔)
GET /api/v1/presence/online-history?startTime=2026-03-05T00:00:00Z&endTime=2026-03-05T23:59:59Z&interval=5m
Authorization: Bearer <admin-token>
# DAU 统计
GET /api/v1/analytics/dau?startDate=2026-03-01&endDate=2026-03-05
Authorization: Bearer <admin-token>
# 行为事件上报(无需认证,批量)
POST /api/v1/analytics/events
{ "events": [ { "eventName": "page_view", "installId": "...", "clientTs": 123, ... } ] }
第六部分:DAU 计算逻辑
DAU 不依赖心跳,而是依赖行为事件(session_start):
App 进入前台
→ SessionManager._startNewSession()
→ TelemetryService.logEvent('app_session_start', type: session)
→ TelemetryUploader 批量上传到 POST /analytics/events
→ presence-service 记录到 event_logs 表
每天凌晨1点(Asia/Shanghai)
→ AnalyticsScheduler.calculateYesterdayDau()
→ 统计昨天有 app_session_start 事件的去重 userId/installId 数
→ 写入 daily_active_stats 表
因此,DAU 对未登录用户也有效(用 installId 去重)。
第七部分:在线人数 vs DAU 的区别
| 实时在线人数 | DAU | |
|---|---|---|
| 数据来源 | 心跳(每60s) | 会话开始事件(app_session_start) |
| 存储 | Redis Sorted Set(内存,快) | PostgreSQL(持久化) |
| 统计周期 | 实时(180s窗口) | 按自然日 |
| 未登录用户 | 不统计(心跳需要 JWT) | 统计(用 installId 去重) |
| 精度 | ±60s | 按天 |
第八部分:完整接入 Checklist
后端
- 复制
presence-service/整个目录到新项目 - 修改
DATABASE_URL(数据库名改为新项目专用) - 确认
JWT_SECRET与 auth 服务共用同一个 - 确认
current-user.decorator.ts里取的 userId 字段名正确 - 配置 API 网关路由(
/api/v1/presence和/api/v1/analytics) - 部署并确认容器启动、Prisma migration 自动执行
前端
- 复制
lib/core/telemetry/整个目录到新项目 - 安装依赖:
dio,shared_preferences,uuid,device_info_plus,package_info_plus - 在 splash/首屏调用
TelemetryService().initialize(apiBaseUrl: '...') - 登录成功后调用
setUserId()+setAccessToken() - 退出登录时调用
clearUserId()+clearAccessToken()
验证
- 登录后等待60s,查看后端日志是否有心跳记录
- 调用
GET /api/v1/presence/online-count,count 应该 ≥ 1 - 次日查看
GET /api/v1/analytics/dau,应有昨日数据
常见问题
Q: 心跳失败会影响 App 吗? A: 不会。心跳完全异步,失败只打 debug 日志,等下一个60s周期重试。presence-service 宕机期间 App 正常使用。
Q: 为什么在线判定窗口是180s,不是60s?
A: 心跳每60s发一次,考虑网络抖动,用3倍窗口(180s)避免频繁出入"在线"状态。如需更严格,把 PRESENCE_WINDOW_SECONDS 改小即可。
Q: 未登录用户算在线吗?
A: 默认不算(PresenceConfig.requiresAuth = true)。若需统计未登录用户,把 requiresAuth 改为 false,同时后端心跳接口需去掉 @UseGuards(JwtAuthGuard)。
Q: Redis 断了怎么办? A: 在线人数数据会丢失,但已写入 PostgreSQL 的快照不受影响。Redis 恢复后重新开始积累数据。
Q: DAU 和实时在线用同一个 Redis key 吗?
A: 不是。在线人数用 Redis(presence:online_users),DAU 用 PostgreSQL 的 event_logs 表计算,两套数据互不干扰。
Q: 心跳接口加了 JWT 校验,未登录用户怎么处理?
A: HeartbeatService 在发心跳前会检查 getUserId?.call() == null,未登录直接跳过,不发请求,不报错。
源码位置(RWADurian 项目)
| 组件 | 路径 |
|---|---|
| 后端服务 | backend/services/presence-service/ |
| Flutter 遥测模块 | frontend/mobile-app/lib/core/telemetry/ |
| 接入示例(初始化) | frontend/mobile-app/lib/bootstrap.dart 第132行 |
| 接入示例(登录) | frontend/mobile-app/lib/core/services/account_service.dart _saveAccountData() |
| 接入示例(退出) | frontend/mobile-app/lib/core/services/multi_account_service.dart deleteAccount() |
| Kong 路由配置 | backend/api-gateway/kong.yml |
| Grafana 看板 | backend/api-gateway/grafana/provisioning/dashboards/presence-dashboard.json |