docs(telemetry): 补充事件上报格式设计决策(Amplitude 风格顶层设备字段)

新增第三部分说明设备字段为何放顶层而非 JSONB properties:
- 对比 JSONB 与独立列在亿级数据下的查询性能差异
- 说明 toServerJson() 的提取原理(本地存储与服务端格式分离)
- 列出对应的数据库列定义和索引
- 各部分编号顺延

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-05 09:52:02 -08:00
parent 6bca65e434
commit b9cfa67835
1 changed files with 412 additions and 0 deletions

View File

@ -0,0 +1,412 @@
# Flutter + NestJS 实时在线统计 & DAU 系统移植指南
基于 RWADurian 项目提炼,适用于任何需要**实时在线人数**和**日活用户DAU**统计的 Flutter + NestJS 项目。
---
## 系统架构总览
```
Flutter App
└─ TelemetryService单例
├─ SessionManager → 监听前台/后台切换
├─ HeartbeatService → 前台时每60s发一次心跳
└─ TelemetryUploader → 批量上传行为事件
NestJS presence-service
├─ POST /presence/heartbeat → 记录在线时间戳到 Redis Sorted Set
├─ GET /presence/online-count → 实时在线人数
├─ GET /presence/online-history → 历史在线人数曲线
├─ GET /analytics/dau → DAU 查询
└─ POST /analytics/events → 批量行为事件上报(可选)
Redis Sorted Set: presence:online_users
key=userId, score=最后心跳Unix时间戳
→ ZCOUNT(now-180s, +inf) = 当前在线人数
PostgreSQL (rwa_presence schema)
online_snapshots: 每分钟快照在线人数
daily_active_stats: 每日DAU汇总
event_logs: 行为事件记录(可选)
```
---
## 第一部分后端presence-service
### 哪些代码完全通用(直接复制,零修改)
```
backend/services/presence-service/
├── src/
│ ├── infrastructure/redis/ ← Redis Sorted Set 全套操作,与业务无关
│ ├── domain/services/ ← 在线判定逻辑(窗口时间)
│ ├── application/schedulers/ ← 定时任务快照、DAU计算、清理
│ ├── application/queries/ ← 查询在线数、历史、DAU
│ └── application/commands/record-heartbeat/ ← 心跳处理
```
### 哪些需要按项目调整
#### 1. 环境变量(`docker-compose.yml` 或 `.env`
```yaml
# 必须改的
DATABASE_URL: postgresql://user:pass@postgres:5432/your_db_presence
JWT_SECRET: 与你的 identity-service 共用同一个密钥 # ← 关键!
# 可选调整
PRESENCE_WINDOW_SECONDS: 180 # 多少秒无心跳算离线默认3分钟
SNAPSHOT_INTERVAL_SECONDS: 60 # 快照频率默认1分钟
REDIS_DB: 10 # Redis DB编号与其他服务隔离
APP_PORT: 3011 # 服务端口
```
#### 2. JWT 验证(`src/shared/guards/jwt-auth.guard.ts`
该文件从 JWT 解码出 userId需确认字段名与你的 token payload 一致:
```typescript
// 检查你的 JWT payload 里用户ID的字段名
// RWADurian 用的是 userSerialNum (e.g. "D25121400005")
// 如果你的项目用 sub 或 userId需修改 current-user.decorator.ts
```
```typescript
// src/shared/decorators/current-user.decorator.ts
// 确认这里取的字段名与你的 token payload 匹配
export const CurrentUser = createParamDecorator(
(field: string, ctx: ExecutionContext) => {
const user = ctx.switchToHttp().getRequest().user;
return field ? user?.[field] : user;
},
);
// controller 里用法:
// @CurrentUser('userId') userId: string
// 改成你 token payload 里的实际字段名
```
#### 3. Prisma schema 数据库名
```prisma
// prisma/schema.prisma
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
// DATABASE_URL 里的数据库名改成你的项目名,例如:
// postgresql://user:pass@localhost:5432/myapp_presence
}
```
#### 4. Kong API 网关路由(如果用 Kong
```yaml
# api-gateway/kong.yml 添加:
- name: presence-service
url: http://presence-service:3011
routes:
- name: presence-api
paths:
- /api/v1/presence
- name: presence-analytics
paths:
- /api/v1/analytics
```
如果不用 Kong用 Nginx 或直接暴露端口同理。
---
## 第二部分前端Flutter
### 哪些代码完全通用(直接复制整个目录)
```
lib/core/telemetry/
├── telemetry_service.dart ← 主入口,单例
├── models/
│ ├── telemetry_event.dart ← 事件模型
│ ├── telemetry_config.dart ← 远程配置模型
│ └── device_context.dart ← 设备信息模型
├── collectors/
│ └── device_info_collector.dart ← 收集设备/系统信息
├── storage/
│ └── telemetry_storage.dart ← SharedPreferences 本地队列
├── uploader/
│ └── telemetry_uploader.dart ← 批量上传事件
├── session/
│ ├── session_manager.dart ← 前台/后台生命周期监听
│ └── session_events.dart ← 事件名常量
└── presence/
├── heartbeat_service.dart ← 心跳定时器
└── presence_config.dart ← 心跳配置
```
这些文件与业务零耦合,整个目录直接复制到新项目的 `lib/core/telemetry/` 即可。
### 必须安装的 Flutter 依赖
```yaml
# pubspec.yaml
dependencies:
dio: ^5.4.3 # HTTP 客户端(心跳和上传)
shared_preferences: ^2.2.3 # 本地队列存储
uuid: ^4.3.3 # 生成 installId 和 eventId
device_info_plus: ^10.1.0 # 获取设备信息
package_info_plus: ^8.0.0 # 获取 App 版本
```
### 需要按项目修改的3个接入点
#### 接入点1启动时初始化在首屏 或 splash_page 调用)
```dart
// 在你的 splash_page.dart 或 bootstrap.dart 里调用
// 需要 BuildContext用于获取屏幕尺寸等设备信息
await TelemetryService().initialize(
apiBaseUrl: 'https://your-api.example.com', // ← 改成你的 API 地址(不含 /api/v1
context: context,
userId: currentUserId, // 已登录则传,未登录传 null
);
```
#### 接入点2登录成功后注入 token在你的 auth/login 处理代码里)
```dart
// 登录成功,保存 token 之后,立即调用:
if (TelemetryService().isInitialized) {
TelemetryService().setUserId(response.userId); // ← 改成你的用户ID字段
TelemetryService().setAccessToken(response.accessToken); // ← 改成你的 token 字段
}
```
#### 接入点3退出登录时清除
```dart
// 退出登录时调用:
if (TelemetryService().isInitialized) {
TelemetryService().clearUserId();
TelemetryService().clearAccessToken();
}
```
### 可选:账号切换时更新 token
如果你的 App 支持多账号切换:
```dart
// 账号切换完成SecureStorage 已恢复新账号数据后:
if (TelemetryService().isInitialized) {
TelemetryService().setUserId(newUserId);
// 从 SecureStorage 读出恢复后的 token
final token = await secureStorage.read(key: 'access_token');
TelemetryService().setAccessToken(token);
}
```
---
## 第三部分事件上报格式Amplitude 风格)
### 设备字段放顶层,不放 properties
```json
// POST /api/v1/analytics/events 的单条事件格式
{
"eventName": "page_view",
"userId": "D25121400005",
"installId": "uuid-v4-xxx",
"clientTs": 1709644800,
// 设备字段:顶层独立列(可走数据库索引)
"deviceBrand": "Xiaomi",
"deviceModel": "Redmi Note 12",
"deviceOs": "13",
"appVersion": "1.2.0",
"locale": "zh_CN",
// properties仅保留事件专属数据
"properties": {
"page": "trading",
"eventId": "uuid-v4-xxx",
"type": "pageView",
"sessionId": "uuid-v4-xxx"
}
}
```
### 为什么不放 properties
| | 放进 JSONB properties | 顶层独立列 |
|--|--|--|
| 按设备品牌分组 | `properties->>'deviceBrand'`,无法走索引 | `GROUP BY device_brand`B-tree 索引直接命中 |
| 亿级数据查询 | 全表扫描(慢) | 毫秒级 |
| 适用规模 | < 百万行 | 千万/亿级 |
### 实现原理
前端本地队列Hive仍将设备字段存在 `properties` 内,保持本地格式简单;**上传时 `toServerJson()` 自动将它们提取为顶层字段**,后端按顶层字段写入独立数据库列。
```dart
// telemetry_event.dart - toServerJson() 的核心逻辑
final props = Map<String, dynamic>.from(properties ?? {});
final deviceBrand = props.remove('device_brand'); // 从 props 里取出
// ...
return {
'deviceBrand': deviceBrand, // 放顶层
'properties': { ...props }, // 剩余事件专属数据
};
```
### 对应后端数据库列
```sql
-- analytics_event_log 表的设备列(均有索引)
device_brand VARCHAR(64) -- 索引:按品牌统计设备分布
device_model VARCHAR(64)
device_os VARCHAR(32)
app_version VARCHAR(32) -- 索引:按版本统计留存/覆盖率
locale VARCHAR(16)
```
---
## 第四部分:心跳接口规格
前端发送的心跳请求格式(固定,不需要修改):
```
POST /api/v1/presence/heartbeat
Authorization: Bearer <JWT>
{
"installId": "uuid-v4-设备唯一标识",
"appVersion": "1.0.0",
"clientTs": 1709644800 // Unix 时间戳(秒)
}
Response: { "ok": true, "serverTs": 1709644800 }
```
后端从 JWT 解码出 userId不需要前端传。
---
## 第五部分:查询接口(给管理后台用)
```
# 当前实时在线人数
GET /api/v1/presence/online-count
Authorization: Bearer <admin-token>
→ { "count": 128, "windowSeconds": 180, "queriedAt": "2026-03-05T15:00:00Z" }
# 历史在线人数(时间段 + 间隔)
GET /api/v1/presence/online-history?startTime=2026-03-05T00:00:00Z&endTime=2026-03-05T23:59:59Z&interval=5m
Authorization: Bearer <admin-token>
# DAU 统计
GET /api/v1/analytics/dau?startDate=2026-03-01&endDate=2026-03-05
Authorization: Bearer <admin-token>
# 行为事件上报(无需认证,批量)
POST /api/v1/analytics/events
{ "events": [ { "eventName": "page_view", "installId": "...", "clientTs": 123, ... } ] }
```
---
## 第六部分DAU 计算逻辑
DAU 不依赖心跳而是依赖行为事件session_start
```
App 进入前台
→ SessionManager._startNewSession()
→ TelemetryService.logEvent('app_session_start', type: session)
→ TelemetryUploader 批量上传到 POST /analytics/events
→ presence-service 记录到 event_logs 表
每天凌晨1点Asia/Shanghai
→ AnalyticsScheduler.calculateYesterdayDau()
→ 统计昨天有 app_session_start 事件的去重 userId/installId 数
→ 写入 daily_active_stats 表
```
因此,**DAU 对未登录用户也有效**(用 installId 去重)。
---
## 第七部分:在线人数 vs DAU 的区别
| | 实时在线人数 | DAU |
|--|--|--|
| 数据来源 | 心跳每60s | 会话开始事件app_session_start |
| 存储 | Redis Sorted Set内存 | PostgreSQL持久化 |
| 统计周期 | 实时180s窗口 | 按自然日 |
| 未登录用户 | 不统计(心跳需要 JWT | 统计(用 installId 去重) |
| 精度 | ±60s | 按天 |
---
## 第八部分:完整接入 Checklist
### 后端
- [ ] 复制 `presence-service/` 整个目录到新项目
- [ ] 修改 `DATABASE_URL`(数据库名改为新项目专用)
- [ ] 确认 `JWT_SECRET` 与 auth 服务共用同一个
- [ ] 确认 `current-user.decorator.ts` 里取的 userId 字段名正确
- [ ] 配置 API 网关路由(`/api/v1/presence` 和 `/api/v1/analytics`
- [ ] 部署并确认容器启动、Prisma migration 自动执行
### 前端
- [ ] 复制 `lib/core/telemetry/` 整个目录到新项目
- [ ] 安装依赖:`dio`, `shared_preferences`, `uuid`, `device_info_plus`, `package_info_plus`
- [ ] 在 splash/首屏调用 `TelemetryService().initialize(apiBaseUrl: '...')`
- [ ] 登录成功后调用 `setUserId()` + `setAccessToken()`
- [ ] 退出登录时调用 `clearUserId()` + `clearAccessToken()`
### 验证
- [ ] 登录后等待60s查看后端日志是否有心跳记录
- [ ] 调用 `GET /api/v1/presence/online-count`count 应该 ≥ 1
- [ ] 次日查看 `GET /api/v1/analytics/dau`,应有昨日数据
---
## 常见问题
**Q: 心跳失败会影响 App 吗?**
A: 不会。心跳完全异步,失败只打 debug 日志等下一个60s周期重试。presence-service 宕机期间 App 正常使用。
**Q: 为什么在线判定窗口是180s不是60s**
A: 心跳每60s发一次考虑网络抖动用3倍窗口180s避免频繁出入"在线"状态。如需更严格,把 `PRESENCE_WINDOW_SECONDS` 改小即可。
**Q: 未登录用户算在线吗?**
A: 默认不算(`PresenceConfig.requiresAuth = true`)。若需统计未登录用户,把 `requiresAuth` 改为 `false`,同时后端心跳接口需去掉 `@UseGuards(JwtAuthGuard)`
**Q: Redis 断了怎么办?**
A: 在线人数数据会丢失,但已写入 PostgreSQL 的快照不受影响。Redis 恢复后重新开始积累数据。
**Q: DAU 和实时在线用同一个 Redis key 吗?**
A: 不是。在线人数用 Redis`presence:online_users`DAU 用 PostgreSQL 的 `event_logs` 表计算,两套数据互不干扰。
**Q: 心跳接口加了 JWT 校验,未登录用户怎么处理?**
A: `HeartbeatService` 在发心跳前会检查 `getUserId?.call() == null`,未登录直接跳过,不发请求,不报错。
---
## 源码位置RWADurian 项目)
| 组件 | 路径 |
|------|------|
| 后端服务 | `backend/services/presence-service/` |
| Flutter 遥测模块 | `frontend/mobile-app/lib/core/telemetry/` |
| 接入示例(初始化) | `frontend/mobile-app/lib/bootstrap.dart` 第132行 |
| 接入示例(登录) | `frontend/mobile-app/lib/core/services/account_service.dart` `_saveAccountData()` |
| 接入示例(退出) | `frontend/mobile-app/lib/core/services/multi_account_service.dart` `deleteAccount()` |
| Kong 路由配置 | `backend/api-gateway/kong.yml` |
| Grafana 看板 | `backend/api-gateway/grafana/provisioning/dashboards/presence-dashboard.json` |