feat(telemetry): add presence-service + Flutter telemetry module

## Backend — packages/services/presence-service (新微服务)

完整的 DDD + Clean Architecture 实现,移植自 RWADurian presence-service,
针对 IT0 架构做了以下适配:

### 核心功能
- 心跳接口: POST /api/v1/presence/heartbeat(JWT 验证,60s 间隔)
  → Redis Sorted Set `presence:online_users` 记录在线时间戳
  → 默认 5 分钟窗口判断在线(PRESENCE_WINDOW_SECONDS=300)
- 事件上报: POST /api/v1/analytics/events(批量,最多 50 条)
  → 写入 presence_event_log 表 + 更新 presence_device_profile
  → Redis HyperLogLog `presence:dau:{date}` 实时 DAU 估算
- 查询接口(需 AdminGuard):
  - GET /api/v1/analytics/online-count  — 实时在线人数
  - GET /api/v1/analytics/online-history — 历史在线快照
  - GET /api/v1/analytics/dau — DAU 统计

### IT0 适配要点
- JWT payload: `sub` = UUID userId(非 RWADurian 的 userSerialNum)
  → JwtAuthGuard: request.user = { userId: payload.sub, roles, tenantId }
- AdminGuard: 改为检查 `roles.includes('admin')`(非 type==='admin')
- 移除 Kafka EventPublisherService(IT0 无 Kafka)
- 移除 Prometheus MetricsService(IT0 无 Prometheus)
- 表前缀改为 `presence_`(避免与其他服务冲突)
- userId 字段 VarChar(36)(UUID 格式,非原来的 VarChar(20))
- Redis DB=10 隔离(独立 key 空间)

### 数据库表(public schema)
- presence_event_log       — 事件流水(append-only)
- presence_device_profile  — 设备快照(upsert,每台设备一行)
- presence_daily_active_users — DAU 日统计
- presence_online_snapshots   — 在线人数每分钟快照

### 定时任务(@nestjs/schedule)
- 每分钟: 采集在线人数快照 → presence_online_snapshots
- 每天 01:05 (UTC+8): 计算前一天 DAU → presence_daily_active_users

---

## Flutter — it0_app/lib/core/telemetry (新模块)

### 文件结构
- telemetry_service.dart      — 单例入口,统筹所有组件
- models/telemetry_event.dart — 事件模型,toServerJson() 将设备字段提升为顶层列
- models/device_context.dart  — 设备上下文(Android/iOS 信息)
- models/telemetry_config.dart — 远程配置(采样率/开关,支持远端同步)
- collectors/device_info_collector.dart — 采集 device_info_plus 设备信息
- storage/telemetry_storage.dart  — SharedPreferences 队列(最多 500 条)
- uploader/telemetry_uploader.dart — 批量上传到 /api/v1/analytics/events
- session/session_manager.dart    — WidgetsBindingObserver 监听前后台切换
- session/session_events.dart     — 会话事件常量
- presence/heartbeat_service.dart — 定时心跳 POST /api/v1/presence/heartbeat
- presence/presence_config.dart   — 心跳配置(间隔/requiresAuth)
- telemetry.dart                  — barrel 导出

### 集成点
- app_router.dart _tryRestore(): TelemetryService().initialize() 在 auth 之前
- auth_provider.dart login/loginWithOtp: setUserId + setAccessToken + resumeAfterLogin
- auth_provider.dart tryRestoreSession: 恢复 userId + accessToken
- auth_provider.dart logout: pauseForLogout + clearUserId + clearAccessToken

### 新增依赖
- device_info_plus: ^10.1.0
- equatable: ^2.0.5

---

## 基础设施

### Dockerfile.service
- 在 builder 和 production 阶段均添加 presence-service/package.json 的 COPY

### docker-compose.yml
- 新增 presence-service 容器(端口 3011/13011)
  - DATABASE_URL: postgresql://... (Prisma 所需连接串格式)
  - REDIS_HOST/PORT/DB: 10(presence 独立 Redis DB)
  - APP_PORT=3011, JWT_SECRET, PRESENCE_WINDOW_SECONDS=300
- api-gateway depends_on 新增 presence-service

### kong.yml (dbless 声明式)
- 新增 presence-service 服务(http://presence-service:3011)
  - presence-routes: /api/v1/presence
  - analytics-routes: /api/v1/analytics
- 对整个 presence-service 启用 JWT 插件(Kong 层鉴权)

### DB 迁移
- packages/shared/database/src/migrations/010-create-presence-tables.sql
  — 4 张 presence_ 前缀表 + 完整索引(IF NOT EXISTS 幂等)
- run-migrations.ts: runSharedSchema() 中新增执行 010-create-presence-tables.sql

---

## 部署步骤(服务器)

1. git pull
2. 执行 presence 表迁移(首次):
   docker exec it0-postgres psql -U it0 -d it0 \
     -f /path/to/010-create-presence-tables.sql
   或通过 migration runner:
   cd /home/ceshi/it0 && node packages/shared/database/dist/run-migrations.js
3. 重建并启动 presence-service:
   docker compose build presence-service api-gateway
   docker compose up -d presence-service api-gateway

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-07 17:44:01 -08:00
parent d56486a4aa
commit 8d2fd3335a
88 changed files with 3412 additions and 0 deletions

View File

@ -26,6 +26,7 @@ COPY packages/services/comm-service/package.json packages/services/comm-service/
COPY packages/services/audit-service/package.json packages/services/audit-service/
COPY packages/services/billing-service/package.json packages/services/billing-service/
COPY packages/services/version-service/package.json packages/services/version-service/
COPY packages/services/presence-service/package.json packages/services/presence-service/
# Install all dependencies (cached unless package.json changes)
RUN pnpm install --frozen-lockfile
@ -64,6 +65,7 @@ COPY --from=builder /app/packages/services/comm-service/package.json packages/se
COPY --from=builder /app/packages/services/audit-service/package.json packages/services/audit-service/
COPY --from=builder /app/packages/services/billing-service/package.json packages/services/billing-service/
COPY --from=builder /app/packages/services/version-service/package.json packages/services/version-service/
COPY --from=builder /app/packages/services/presence-service/package.json packages/services/presence-service/
# Install production dependencies only
RUN pnpm install --frozen-lockfile --prod

View File

@ -67,6 +67,8 @@ services:
condition: service_healthy
version-service:
condition: service_healthy
presence-service:
condition: service_healthy
healthcheck:
test: ["CMD", "kong", "health"]
interval: 10s
@ -396,6 +398,39 @@ services:
networks:
- it0-network
presence-service:
build:
context: ../..
dockerfile: Dockerfile.service
args:
SERVICE_NAME: presence-service
SERVICE_PORT: 3011
container_name: it0-presence-service
restart: unless-stopped
ports:
- "13011:3011"
environment:
- DATABASE_URL=postgresql://${POSTGRES_USER:-it0}:${POSTGRES_PASSWORD:-it0_dev}@postgres:5432/${POSTGRES_DB:-it0}
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_DB=10
- APP_PORT=3011
- JWT_SECRET=${JWT_SECRET:-dev-jwt-secret}
- PRESENCE_WINDOW_SECONDS=300
healthcheck:
test: ["CMD-SHELL", "node -e \"require('http').get('http://localhost:3011/',r=>{process.exit(r.statusCode<500?0:1)}).on('error',()=>process.exit(1))\""]
interval: 30s
timeout: 5s
retries: 3
start_period: 15s
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
networks:
- it0-network
# ===== LiveKit Infrastructure =====
# NOTE: livekit-server, voice-agent, voice-service use host networking
# to eliminate docker-proxy overhead for real-time audio (WebRTC UDP).

View File

@ -6,6 +6,8 @@ import '../updater/update_service.dart';
import '../network/connectivity_provider.dart';
import '../widgets/offline_banner.dart';
import '../widgets/floating_robot_fab.dart';
import '../config/app_config.dart';
import '../telemetry/telemetry.dart';
import '../../features/auth/data/providers/auth_provider.dart';
import '../../features/auth/presentation/pages/login_page.dart';
import '../../features/home/presentation/pages/home_page.dart';
@ -198,6 +200,12 @@ class _SplashPageState extends ConsumerState<_SplashPage> {
}
Future<void> _tryRestore() async {
final config = ref.read(appConfigProvider);
await TelemetryService().initialize(
apiBaseUrl: config.apiBaseUrl,
context: context,
);
final auth = ref.read(authStateProvider.notifier);
final restored = await auth.tryRestoreSession();
if (!mounted) return;

View File

@ -0,0 +1,102 @@
import 'dart:io';
import 'package:device_info_plus/device_info_plus.dart';
import 'package:package_info_plus/package_info_plus.dart';
import 'package:flutter/material.dart';
import 'package:flutter/foundation.dart';
import '../models/device_context.dart';
class DeviceInfoCollector {
static DeviceInfoCollector? _instance;
DeviceInfoCollector._();
factory DeviceInfoCollector() {
_instance ??= DeviceInfoCollector._();
return _instance!;
}
DeviceContext? _cachedContext;
Future<DeviceContext> collect(BuildContext context) async {
if (_cachedContext != null) return _cachedContext!;
final deviceInfo = DeviceInfoPlugin();
final packageInfo = await PackageInfo.fromPlatform();
final mediaQuery = MediaQuery.of(context);
DeviceContext result;
if (Platform.isAndroid) {
final androidInfo = await deviceInfo.androidInfo;
result = DeviceContext(
platform: 'android',
brand: androidInfo.brand,
model: androidInfo.model,
manufacturer: androidInfo.manufacturer,
isPhysicalDevice: androidInfo.isPhysicalDevice,
osVersion: androidInfo.version.release,
sdkInt: androidInfo.version.sdkInt,
androidId: androidInfo.id,
screen: _collectScreenInfo(mediaQuery),
appName: packageInfo.appName,
packageName: packageInfo.packageName,
appVersion: packageInfo.version,
buildNumber: packageInfo.buildNumber,
buildMode: _getBuildMode(),
locale: Platform.localeName,
timezone: DateTime.now().timeZoneName,
isDarkMode: mediaQuery.platformBrightness == Brightness.dark,
networkType: 'unknown',
collectedAt: DateTime.now(),
);
} else if (Platform.isIOS) {
final iosInfo = await deviceInfo.iosInfo;
result = DeviceContext(
platform: 'ios',
brand: 'Apple',
model: iosInfo.model,
manufacturer: 'Apple',
isPhysicalDevice: iosInfo.isPhysicalDevice,
osVersion: iosInfo.systemVersion,
sdkInt: 0,
androidId: iosInfo.identifierForVendor ?? '',
screen: _collectScreenInfo(mediaQuery),
appName: packageInfo.appName,
packageName: packageInfo.packageName,
appVersion: packageInfo.version,
buildNumber: packageInfo.buildNumber,
buildMode: _getBuildMode(),
locale: Platform.localeName,
timezone: DateTime.now().timeZoneName,
isDarkMode: mediaQuery.platformBrightness == Brightness.dark,
networkType: 'unknown',
collectedAt: DateTime.now(),
);
} else {
throw UnsupportedError('Unsupported platform');
}
_cachedContext = result;
return result;
}
ScreenInfo _collectScreenInfo(MediaQueryData mediaQuery) {
final size = mediaQuery.size;
final density = mediaQuery.devicePixelRatio;
return ScreenInfo(
widthPx: size.width * density,
heightPx: size.height * density,
density: density,
widthDp: size.width,
heightDp: size.height,
hasNotch: mediaQuery.padding.top > 24,
);
}
String _getBuildMode() {
if (kReleaseMode) return 'release';
if (kProfileMode) return 'profile';
return 'debug';
}
void clearCache() => _cachedContext = null;
DeviceContext? get cachedContext => _cachedContext;
}

View File

@ -0,0 +1,135 @@
import 'package:equatable/equatable.dart';
class ScreenInfo extends Equatable {
final double widthPx;
final double heightPx;
final double density;
final double widthDp;
final double heightDp;
final bool hasNotch;
const ScreenInfo({
required this.widthPx,
required this.heightPx,
required this.density,
required this.widthDp,
required this.heightDp,
required this.hasNotch,
});
factory ScreenInfo.fromJson(Map<String, dynamic> json) {
return ScreenInfo(
widthPx: (json['widthPx'] as num).toDouble(),
heightPx: (json['heightPx'] as num).toDouble(),
density: (json['density'] as num).toDouble(),
widthDp: (json['widthDp'] as num).toDouble(),
heightDp: (json['heightDp'] as num).toDouble(),
hasNotch: json['hasNotch'] as bool? ?? false,
);
}
Map<String, dynamic> toJson() => {
'widthPx': widthPx,
'heightPx': heightPx,
'density': density,
'widthDp': widthDp,
'heightDp': heightDp,
'hasNotch': hasNotch,
};
@override
List<Object?> get props => [widthPx, heightPx, density, widthDp, heightDp, hasNotch];
}
class DeviceContext extends Equatable {
final String platform;
final String brand;
final String model;
final String manufacturer;
final bool isPhysicalDevice;
final String osVersion;
final int sdkInt;
final String androidId;
final ScreenInfo screen;
final String appName;
final String packageName;
final String appVersion;
final String buildNumber;
final String buildMode;
final String locale;
final String timezone;
final bool isDarkMode;
final String networkType;
final DateTime collectedAt;
const DeviceContext({
required this.platform,
required this.brand,
required this.model,
required this.manufacturer,
required this.isPhysicalDevice,
required this.osVersion,
required this.sdkInt,
required this.androidId,
required this.screen,
required this.appName,
required this.packageName,
required this.appVersion,
required this.buildNumber,
required this.buildMode,
required this.locale,
required this.timezone,
required this.isDarkMode,
required this.networkType,
required this.collectedAt,
});
factory DeviceContext.fromJson(Map<String, dynamic> json) {
return DeviceContext(
platform: json['platform'] as String,
brand: json['brand'] as String,
model: json['model'] as String,
manufacturer: json['manufacturer'] as String,
isPhysicalDevice: json['isPhysicalDevice'] as bool,
osVersion: json['osVersion'] as String,
sdkInt: json['sdkInt'] as int,
androidId: json['androidId'] as String,
screen: ScreenInfo.fromJson(json['screen'] as Map<String, dynamic>),
appName: json['appName'] as String,
packageName: json['packageName'] as String,
appVersion: json['appVersion'] as String,
buildNumber: json['buildNumber'] as String,
buildMode: json['buildMode'] as String,
locale: json['locale'] as String,
timezone: json['timezone'] as String,
isDarkMode: json['isDarkMode'] as bool,
networkType: json['networkType'] as String,
collectedAt: DateTime.parse(json['collectedAt'] as String),
);
}
Map<String, dynamic> toJson() => {
'platform': platform,
'brand': brand,
'model': model,
'manufacturer': manufacturer,
'isPhysicalDevice': isPhysicalDevice,
'osVersion': osVersion,
'sdkInt': sdkInt,
'androidId': androidId,
'screen': screen.toJson(),
'appName': appName,
'packageName': packageName,
'appVersion': appVersion,
'buildNumber': buildNumber,
'buildMode': buildMode,
'locale': locale,
'timezone': timezone,
'isDarkMode': isDarkMode,
'networkType': networkType,
'collectedAt': collectedAt.toIso8601String(),
};
@override
List<Object?> get props => [platform, brand, model, manufacturer, isPhysicalDevice, osVersion, sdkInt, androidId, screen, appName, packageName, appVersion, buildNumber, buildMode, locale, timezone, isDarkMode, networkType, collectedAt];
}

View File

@ -0,0 +1,114 @@
import 'package:dio/dio.dart';
import 'package:shared_preferences/shared_preferences.dart';
import 'package:flutter/foundation.dart';
import 'telemetry_event.dart';
import '../presence/presence_config.dart';
class TelemetryConfig {
bool globalEnabled = true;
bool errorReportEnabled = true;
bool performanceEnabled = true;
bool userActionEnabled = true;
bool pageViewEnabled = true;
bool sessionEnabled = true;
double samplingRate = 0.1;
List<String> disabledEvents = [];
String configVersion = '1.0.0';
bool userOptIn = true;
PresenceConfig? presenceConfig;
static final TelemetryConfig _instance = TelemetryConfig._();
TelemetryConfig._();
factory TelemetryConfig() => _instance;
Future<void> syncFromRemote(String apiBaseUrl) async {
try {
final dio = Dio(BaseOptions(
connectTimeout: const Duration(seconds: 5),
receiveTimeout: const Duration(seconds: 5),
));
final response = await dio.get('$apiBaseUrl/telemetry/config');
final data = response.data as Map<String, dynamic>;
globalEnabled = data['global_enabled'] ?? true;
errorReportEnabled = data['error_report_enabled'] ?? true;
performanceEnabled = data['performance_enabled'] ?? true;
userActionEnabled = data['user_action_enabled'] ?? true;
pageViewEnabled = data['page_view_enabled'] ?? true;
sessionEnabled = data['session_enabled'] ?? true;
samplingRate = (data['sampling_rate'] ?? 0.1).toDouble();
disabledEvents = List<String>.from(data['disabled_events'] ?? []);
configVersion = data['version'] ?? '1.0.0';
if (data['presence_config'] != null) {
presenceConfig = PresenceConfig.fromJson(data['presence_config']);
}
await _saveToLocal();
} catch (e) {
debugPrint('[TelemetryConfig] Remote sync failed (non-blocking): $e');
await _loadFromLocal();
}
}
Future<void> _saveToLocal() async {
final prefs = await SharedPreferences.getInstance();
await prefs.setBool('telemetry_global_enabled', globalEnabled);
await prefs.setBool('telemetry_error_enabled', errorReportEnabled);
await prefs.setBool('telemetry_performance_enabled', performanceEnabled);
await prefs.setBool('telemetry_user_action_enabled', userActionEnabled);
await prefs.setBool('telemetry_page_view_enabled', pageViewEnabled);
await prefs.setBool('telemetry_session_enabled', sessionEnabled);
await prefs.setDouble('telemetry_sampling_rate', samplingRate);
await prefs.setStringList('telemetry_disabled_events', disabledEvents);
await prefs.setString('telemetry_config_version', configVersion);
}
Future<void> _loadFromLocal() async {
final prefs = await SharedPreferences.getInstance();
globalEnabled = prefs.getBool('telemetry_global_enabled') ?? true;
errorReportEnabled = prefs.getBool('telemetry_error_enabled') ?? true;
performanceEnabled = prefs.getBool('telemetry_performance_enabled') ?? true;
userActionEnabled = prefs.getBool('telemetry_user_action_enabled') ?? true;
pageViewEnabled = prefs.getBool('telemetry_page_view_enabled') ?? true;
sessionEnabled = prefs.getBool('telemetry_session_enabled') ?? true;
samplingRate = prefs.getDouble('telemetry_sampling_rate') ?? 0.1;
disabledEvents = prefs.getStringList('telemetry_disabled_events') ?? [];
configVersion = prefs.getString('telemetry_config_version') ?? '1.0.0';
}
bool shouldLog(EventType type, String eventName) {
if (!globalEnabled) return false;
if (!userOptIn) return false;
if (disabledEvents.contains(eventName)) return false;
switch (type) {
case EventType.error:
case EventType.crash:
return errorReportEnabled;
case EventType.performance:
return performanceEnabled;
case EventType.userAction:
return userActionEnabled;
case EventType.pageView:
return pageViewEnabled;
case EventType.apiCall:
return performanceEnabled;
case EventType.session:
return sessionEnabled;
case EventType.presence:
return presenceConfig?.enabled ?? true;
}
}
Future<void> setUserOptIn(bool optIn) async {
userOptIn = optIn;
final prefs = await SharedPreferences.getInstance();
await prefs.setBool('telemetry_user_opt_in', optIn);
}
Future<void> loadUserOptIn() async {
final prefs = await SharedPreferences.getInstance();
userOptIn = prefs.getBool('telemetry_user_opt_in') ?? true;
}
}

View File

@ -0,0 +1,109 @@
import 'package:equatable/equatable.dart';
enum EventLevel { debug, info, warning, error, fatal }
enum EventType {
pageView,
userAction,
apiCall,
performance,
error,
crash,
session,
presence,
}
class TelemetryEvent extends Equatable {
final String eventId;
final EventType type;
final EventLevel level;
final String name;
final Map<String, dynamic>? properties;
final DateTime timestamp;
final String? userId;
final String? sessionId;
final String installId;
final String deviceContextId;
const TelemetryEvent({
required this.eventId,
required this.type,
required this.level,
required this.name,
this.properties,
required this.timestamp,
this.userId,
this.sessionId,
required this.installId,
required this.deviceContextId,
});
factory TelemetryEvent.fromJson(Map<String, dynamic> json) {
return TelemetryEvent(
eventId: json['eventId'] as String,
type: EventType.values.firstWhere(
(e) => e.name == json['type'],
orElse: () => EventType.userAction,
),
level: EventLevel.values.firstWhere(
(e) => e.name == json['level'],
orElse: () => EventLevel.info,
),
name: json['name'] as String,
properties: json['properties'] as Map<String, dynamic>?,
timestamp: DateTime.parse(json['timestamp'] as String),
userId: json['userId'] as String?,
sessionId: json['sessionId'] as String?,
installId: json['installId'] as String,
deviceContextId: json['deviceContextId'] as String,
);
}
Map<String, dynamic> toJson() {
return {
'eventId': eventId,
'type': type.name,
'level': level.name,
'name': name,
'properties': properties,
'timestamp': timestamp.toIso8601String(),
'userId': userId,
'sessionId': sessionId,
'installId': installId,
'deviceContextId': deviceContextId,
};
}
/// API Amplitude
Map<String, dynamic> toServerJson() {
final props = Map<String, dynamic>.from(properties ?? {});
final deviceBrand = props.remove('device_brand');
final deviceModel = props.remove('device_model');
final deviceOs = props.remove('device_os');
final appVersion = props.remove('app_version');
final locale = props.remove('locale');
return {
'eventName': name,
'userId': userId,
'installId': installId,
'clientTs': timestamp.millisecondsSinceEpoch ~/ 1000,
'deviceBrand': deviceBrand,
'deviceModel': deviceModel,
'deviceOs': deviceOs,
'appVersion': appVersion,
'locale': locale,
'properties': {
...props,
'eventId': eventId,
'type': type.name,
'level': level.name,
'sessionId': sessionId,
'deviceContextId': deviceContextId,
},
};
}
@override
List<Object?> get props => [eventId, type, level, name, properties, timestamp, userId, sessionId, installId, deviceContextId];
}

View File

@ -0,0 +1,140 @@
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:dio/dio.dart';
import '../session/session_manager.dart';
import '../session/session_events.dart';
import 'presence_config.dart';
class HeartbeatService {
static HeartbeatService? _instance;
HeartbeatService._();
factory HeartbeatService() {
_instance ??= HeartbeatService._();
return _instance!;
}
PresenceConfig _config = PresenceConfig.defaultConfig;
Timer? _heartbeatTimer;
bool _isRunning = false;
bool get isRunning => _isRunning;
DateTime? _lastHeartbeatAt;
DateTime? get lastHeartbeatAt => _lastHeartbeatAt;
int _heartbeatCount = 0;
int get heartbeatCount => _heartbeatCount;
String Function()? getInstallId;
String? Function()? getUserId;
String Function()? getAppVersion;
Map<String, String> Function()? getAuthHeaders;
late Dio _dio;
bool _isInitialized = false;
void initialize({
required String apiBaseUrl,
PresenceConfig? config,
required String Function() getInstallId,
required String? Function() getUserId,
required String Function() getAppVersion,
Map<String, String> Function()? getAuthHeaders,
}) {
if (_isInitialized) return;
_config = config ?? PresenceConfig.defaultConfig;
this.getInstallId = getInstallId;
this.getUserId = getUserId;
this.getAppVersion = getAppVersion;
this.getAuthHeaders = getAuthHeaders;
_dio = Dio(BaseOptions(
baseUrl: apiBaseUrl,
connectTimeout: const Duration(seconds: 5),
receiveTimeout: const Duration(seconds: 5),
));
final sessionManager = SessionManager();
sessionManager.onSessionStart = _onSessionStart;
sessionManager.onSessionEnd = _onSessionEnd;
if (sessionManager.state == SessionState.foreground) {
_startHeartbeat();
}
_isInitialized = true;
debugPrint('[Heartbeat] Initialized, interval: ${_config.heartbeatIntervalSeconds}s');
}
void updateConfig(PresenceConfig config) {
final wasRunning = _isRunning;
if (wasRunning) _stopHeartbeat();
_config = config;
if (wasRunning && _config.enabled) _startHeartbeat();
}
void dispose() {
_stopHeartbeat();
_isInitialized = false;
_instance = null;
}
void _onSessionStart() {
if (_config.enabled) _startHeartbeat();
}
void _onSessionEnd() {
_stopHeartbeat();
}
void _startHeartbeat() {
if (_isRunning || !_config.enabled) return;
_isRunning = true;
_heartbeatCount = 0;
_sendHeartbeat();
_heartbeatTimer = Timer.periodic(
Duration(seconds: _config.heartbeatIntervalSeconds),
(_) => _sendHeartbeat(),
);
debugPrint('[Heartbeat] Started');
}
void _stopHeartbeat() {
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
_isRunning = false;
debugPrint('[Heartbeat] Stopped (count: $_heartbeatCount)');
}
Future<void> _sendHeartbeat() async {
if (_config.requiresAuth && (getUserId?.call() == null)) {
debugPrint('[Heartbeat] Skipped: user not logged in');
return;
}
try {
final response = await _dio.post(
'/api/v1/presence/heartbeat',
data: {
'installId': getInstallId?.call() ?? '',
'appVersion': getAppVersion?.call() ?? '',
'clientTs': DateTime.now().millisecondsSinceEpoch ~/ 1000,
},
options: Options(headers: getAuthHeaders?.call()),
);
if (response.statusCode == 200) {
_lastHeartbeatAt = DateTime.now();
_heartbeatCount++;
debugPrint('[Heartbeat] Sent #$_heartbeatCount');
}
} on DioException catch (e) {
debugPrint('[Heartbeat] Failed: ${e.message}');
} catch (e) {
debugPrint('[Heartbeat] Failed: $e');
}
}
@visibleForTesting
Future<void> forceHeartbeat() async => _sendHeartbeat();
}

View File

@ -0,0 +1,39 @@
class PresenceConfig {
final int heartbeatIntervalSeconds;
final bool requiresAuth;
final bool enabled;
const PresenceConfig({
this.heartbeatIntervalSeconds = 60,
this.requiresAuth = true,
this.enabled = true,
});
static const PresenceConfig defaultConfig = PresenceConfig();
factory PresenceConfig.fromJson(Map<String, dynamic> json) {
return PresenceConfig(
heartbeatIntervalSeconds: json['heartbeat_interval_seconds'] ?? 60,
requiresAuth: json['requires_auth'] ?? true,
enabled: json['presence_enabled'] ?? json['enabled'] ?? true,
);
}
Map<String, dynamic> toJson() => {
'heartbeat_interval_seconds': heartbeatIntervalSeconds,
'requires_auth': requiresAuth,
'presence_enabled': enabled,
};
PresenceConfig copyWith({
int? heartbeatIntervalSeconds,
bool? requiresAuth,
bool? enabled,
}) {
return PresenceConfig(
heartbeatIntervalSeconds: heartbeatIntervalSeconds ?? this.heartbeatIntervalSeconds,
requiresAuth: requiresAuth ?? this.requiresAuth,
enabled: enabled ?? this.enabled,
);
}
}

View File

@ -0,0 +1,9 @@
class SessionEvents {
static const String sessionStart = 'app_session_start';
static const String sessionEnd = 'app_session_end';
static const String heartbeat = 'presence_heartbeat';
SessionEvents._();
}
enum SessionState { foreground, background, unknown }

View File

@ -0,0 +1,106 @@
import 'package:flutter/widgets.dart';
import 'package:uuid/uuid.dart';
import '../telemetry_service.dart';
import '../models/telemetry_event.dart';
import 'session_events.dart';
class SessionManager with WidgetsBindingObserver {
static SessionManager? _instance;
SessionManager._();
factory SessionManager() {
_instance ??= SessionManager._();
return _instance!;
}
String? _currentSessionId;
String? get currentSessionId => _currentSessionId;
SessionState _state = SessionState.unknown;
SessionState get state => _state;
DateTime? _sessionStartTime;
VoidCallback? onSessionStart;
VoidCallback? onSessionEnd;
TelemetryService? _telemetryService;
bool _isInitialized = false;
void initialize(TelemetryService telemetryService) {
if (_isInitialized) return;
_telemetryService = telemetryService;
WidgetsBinding.instance.addObserver(this);
_handleForeground();
_isInitialized = true;
debugPrint('[Session] Manager initialized');
}
void dispose() {
WidgetsBinding.instance.removeObserver(this);
_isInitialized = false;
_instance = null;
}
@override
void didChangeAppLifecycleState(AppLifecycleState state) {
switch (state) {
case AppLifecycleState.resumed:
_handleForeground();
break;
case AppLifecycleState.paused:
_handleBackground();
break;
default:
break;
}
}
void _handleForeground() {
if (_state == SessionState.foreground) return;
_state = SessionState.foreground;
_startNewSession();
_telemetryService?.flushOnBackground();
}
void _handleBackground() {
if (_state == SessionState.background) return;
_state = SessionState.background;
_endCurrentSession();
_telemetryService?.flushOnBackground();
}
void _startNewSession() {
_currentSessionId = const Uuid().v4();
_sessionStartTime = DateTime.now();
_telemetryService?.logEvent(
SessionEvents.sessionStart,
type: EventType.session,
level: EventLevel.info,
properties: {'session_id': _currentSessionId},
);
onSessionStart?.call();
debugPrint('[Session] Started: $_currentSessionId');
}
void _endCurrentSession() {
if (_currentSessionId == null) return;
final duration = _sessionStartTime != null
? DateTime.now().difference(_sessionStartTime!).inSeconds
: 0;
_telemetryService?.logEvent(
SessionEvents.sessionEnd,
type: EventType.session,
level: EventLevel.info,
properties: {'session_id': _currentSessionId, 'duration_seconds': duration},
);
onSessionEnd?.call();
debugPrint('[Session] Ended: $_currentSessionId (${duration}s)');
_currentSessionId = null;
_sessionStartTime = null;
}
int get sessionDurationSeconds {
if (_sessionStartTime == null) return 0;
return DateTime.now().difference(_sessionStartTime!).inSeconds;
}
}

View File

@ -0,0 +1,88 @@
import 'dart:convert';
import 'package:shared_preferences/shared_preferences.dart';
import 'package:flutter/foundation.dart';
import '../models/telemetry_event.dart';
class TelemetryStorage {
static const String _keyEventQueue = 'telemetry_event_queue';
static const String _keyDeviceContext = 'telemetry_device_context';
static const String _keyInstallId = 'telemetry_install_id';
static const int _maxQueueSize = 500;
late SharedPreferences _prefs;
bool _isInitialized = false;
Future<void> init() async {
if (_isInitialized) return;
_prefs = await SharedPreferences.getInstance();
_isInitialized = true;
}
Future<void> saveDeviceContext(Map<String, dynamic> context) async {
await _prefs.setString(_keyDeviceContext, jsonEncode(context));
}
Map<String, dynamic>? getDeviceContext() {
final str = _prefs.getString(_keyDeviceContext);
if (str == null) return null;
return jsonDecode(str) as Map<String, dynamic>;
}
Future<void> saveInstallId(String installId) async {
await _prefs.setString(_keyInstallId, installId);
}
String? getInstallId() => _prefs.getString(_keyInstallId);
Future<void> enqueueEvent(TelemetryEvent event) async {
final queue = _getEventQueue();
if (queue.length >= _maxQueueSize) {
queue.removeAt(0);
debugPrint('[TelemetryStorage] Queue full, dropped oldest event');
}
queue.add(event.toJson());
await _saveEventQueue(queue);
}
List<TelemetryEvent> dequeueEvents(int limit) {
final queue = _getEventQueue();
final count = queue.length > limit ? limit : queue.length;
return queue.take(count).map((json) => TelemetryEvent.fromJson(json)).toList();
}
Future<void> removeEvents(int count) async {
final queue = _getEventQueue();
if (count >= queue.length) {
await clearEventQueue();
} else {
queue.removeRange(0, count);
await _saveEventQueue(queue);
}
}
int getQueueSize() => _getEventQueue().length;
Future<void> clearEventQueue() async {
await _prefs.remove(_keyEventQueue);
}
Future<void> clearUserData() async {
if (!_isInitialized) await init();
await _prefs.remove(_keyEventQueue);
}
List<Map<String, dynamic>> _getEventQueue() {
final str = _prefs.getString(_keyEventQueue);
if (str == null) return [];
try {
return (jsonDecode(str) as List).cast<Map<String, dynamic>>();
} catch (e) {
debugPrint('[TelemetryStorage] Failed to parse queue: $e');
return [];
}
}
Future<void> _saveEventQueue(List<Map<String, dynamic>> queue) async {
await _prefs.setString(_keyEventQueue, jsonEncode(queue));
}
}

View File

@ -0,0 +1,8 @@
export 'telemetry_service.dart';
export 'models/telemetry_event.dart';
export 'models/device_context.dart';
export 'models/telemetry_config.dart';
export 'presence/presence_config.dart';
export 'presence/heartbeat_service.dart';
export 'session/session_events.dart';
export 'session/session_manager.dart';

View File

@ -0,0 +1,224 @@
import 'dart:async';
import 'dart:math';
import 'package:uuid/uuid.dart';
import 'package:flutter/material.dart';
import 'models/telemetry_event.dart';
import 'models/device_context.dart';
import 'models/telemetry_config.dart';
import 'collectors/device_info_collector.dart';
import 'storage/telemetry_storage.dart';
import 'uploader/telemetry_uploader.dart';
import 'session/session_manager.dart';
import 'presence/heartbeat_service.dart';
import 'presence/presence_config.dart';
class TelemetryService {
static TelemetryService? _instance;
TelemetryService._();
factory TelemetryService() {
_instance ??= TelemetryService._();
return _instance!;
}
final _storage = TelemetryStorage();
late TelemetryUploader _uploader;
DeviceContext? _deviceContext;
late String _installId;
String get installId => _installId;
String? _userId;
String? get userId => _userId;
String? _accessToken;
bool _isInitialized = false;
bool get isInitialized => _isInitialized;
late SessionManager _sessionManager;
late HeartbeatService _heartbeatService;
Timer? _configSyncTimer;
Future<void> initialize({
required String apiBaseUrl,
required BuildContext context,
String? userId,
Duration configSyncInterval = const Duration(hours: 1),
PresenceConfig? presenceConfig,
}) async {
if (_isInitialized) return;
await _storage.init();
await _initInstallId();
await TelemetryConfig().loadUserOptIn();
TelemetryConfig().syncFromRemote(apiBaseUrl).catchError((e) {
debugPrint('[Telemetry] Remote config sync failed: $e');
});
_deviceContext = await DeviceInfoCollector().collect(context);
await _storage.saveDeviceContext(_deviceContext!.toJson());
_userId = userId;
_uploader = TelemetryUploader(
apiBaseUrl: apiBaseUrl,
storage: _storage,
getAuthHeaders: _getAuthHeaders,
);
if (TelemetryConfig().globalEnabled) {
_uploader.startPeriodicUpload();
}
_configSyncTimer = Timer.periodic(configSyncInterval, (_) async {
await TelemetryConfig().syncFromRemote(apiBaseUrl);
if (TelemetryConfig().globalEnabled) {
_uploader.startPeriodicUpload();
} else {
_uploader.stopPeriodicUpload();
}
if (TelemetryConfig().presenceConfig != null) {
_heartbeatService.updateConfig(TelemetryConfig().presenceConfig!);
}
});
_sessionManager = SessionManager();
_sessionManager.initialize(this);
_heartbeatService = HeartbeatService();
_heartbeatService.initialize(
apiBaseUrl: apiBaseUrl,
config: presenceConfig ?? TelemetryConfig().presenceConfig,
getInstallId: () => _installId,
getUserId: () => _userId,
getAppVersion: () => _deviceContext?.appVersion ?? 'unknown',
getAuthHeaders: _getAuthHeaders,
);
_isInitialized = true;
debugPrint('[Telemetry] Initialized | installId: $_installId | userId: $_userId');
}
Future<void> _initInstallId() async {
final storedId = _storage.getInstallId();
if (storedId != null) {
_installId = storedId;
} else {
_installId = const Uuid().v4();
await _storage.saveInstallId(_installId);
}
}
Map<String, String> _getAuthHeaders() {
if (_accessToken == null) return {};
return {'Authorization': 'Bearer $_accessToken'};
}
void setAccessToken(String? token) => _accessToken = token;
void clearAccessToken() => _accessToken = null;
void setUserId(String? userId) {
_userId = userId;
debugPrint('[Telemetry] userId set: $userId');
}
void clearUserId() {
_userId = null;
debugPrint('[Telemetry] userId cleared');
}
void logEvent(
String eventName, {
EventType type = EventType.userAction,
EventLevel level = EventLevel.info,
Map<String, dynamic>? properties,
}) {
if (!_isInitialized) return;
if (!TelemetryConfig().shouldLog(type, eventName)) return;
if (_needsSampling(type)) {
if (Random().nextDouble() > TelemetryConfig().samplingRate) return;
}
final deviceProps = _deviceContext != null
? {
'device_brand': _deviceContext!.brand,
'device_model': _deviceContext!.model,
'device_os': _deviceContext!.osVersion,
'app_version': _deviceContext!.appVersion,
'locale': _deviceContext!.locale,
}
: <String, dynamic>{};
final event = TelemetryEvent(
eventId: const Uuid().v4(),
type: type,
level: level,
name: eventName,
properties: {...deviceProps, ...?properties},
timestamp: DateTime.now(),
userId: _userId,
sessionId: _sessionManager.currentSessionId,
installId: _installId,
deviceContextId: _deviceContext?.androidId ?? '',
);
_storage.enqueueEvent(event);
_uploader.uploadIfNeeded();
}
bool _needsSampling(EventType type) =>
type != EventType.error && type != EventType.crash && type != EventType.session;
void logPageView(String pageName, {Map<String, dynamic>? extra}) {
logEvent('page_view', type: EventType.pageView, properties: {'page': pageName, ...?extra});
}
void logUserAction(String action, {Map<String, dynamic>? properties}) {
logEvent(action, type: EventType.userAction, properties: properties);
}
void logError(String errorMessage, {Object? error, StackTrace? stackTrace, Map<String, dynamic>? extra}) {
logEvent('error_occurred', type: EventType.error, level: EventLevel.error, properties: {
'message': errorMessage,
'error': error?.toString(),
'stack_trace': stackTrace?.toString(),
...?extra,
});
}
Future<void> flushOnBackground() async {
await _uploader.uploadBatch(batchSize: 50);
}
Future<void> pauseForLogout() async {
_uploader.stopPeriodicUpload();
await _storage.clearEventQueue();
_userId = null;
}
void resumeAfterLogin() {
if (TelemetryConfig().globalEnabled) {
_uploader.startPeriodicUpload();
}
}
String? get currentSessionId => _sessionManager.currentSessionId;
int get sessionDurationSeconds => _sessionManager.sessionDurationSeconds;
bool get isHeartbeatRunning => _heartbeatService.isRunning;
int get heartbeatCount => _heartbeatService.heartbeatCount;
DeviceContext? get deviceContext => _deviceContext;
Future<void> dispose() async {
_configSyncTimer?.cancel();
_sessionManager.dispose();
_heartbeatService.dispose();
await _uploader.forceUploadAll();
_isInitialized = false;
}
static void reset() => _instance = null;
}

View File

@ -0,0 +1,88 @@
import 'dart:async';
import 'package:dio/dio.dart';
import 'package:flutter/foundation.dart';
import '../storage/telemetry_storage.dart';
class TelemetryUploader {
final String apiBaseUrl;
final TelemetryStorage storage;
final Dio _dio;
Timer? _uploadTimer;
bool _isUploading = false;
Map<String, String> Function()? getAuthHeaders;
TelemetryUploader({
required this.apiBaseUrl,
required this.storage,
this.getAuthHeaders,
}) : _dio = Dio(BaseOptions(
baseUrl: apiBaseUrl,
connectTimeout: const Duration(seconds: 10),
receiveTimeout: const Duration(seconds: 10),
));
void startPeriodicUpload({
Duration interval = const Duration(seconds: 30),
int batchSize = 20,
}) {
_uploadTimer?.cancel();
_uploadTimer = Timer.periodic(interval, (_) {
uploadIfNeeded(batchSize: batchSize);
});
}
void stopPeriodicUpload() {
_uploadTimer?.cancel();
_uploadTimer = null;
}
Future<void> uploadIfNeeded({int batchSize = 20}) async {
if (_isUploading) return;
if (storage.getQueueSize() < 10) return;
await uploadBatch(batchSize: batchSize);
}
Future<bool> uploadBatch({int batchSize = 20}) async {
if (_isUploading) return false;
_isUploading = true;
try {
final events = storage.dequeueEvents(batchSize);
if (events.isEmpty) return true;
final response = await _dio.post(
'/api/v1/analytics/events',
data: {'events': events.map((e) => e.toServerJson()).toList()},
options: Options(headers: getAuthHeaders?.call()),
);
if (response.statusCode == 200) {
await storage.removeEvents(events.length);
debugPrint('[TelemetryUploader] Uploaded ${events.length} events');
return true;
}
return false;
} on DioException catch (e) {
debugPrint('[TelemetryUploader] Upload error: ${e.message}');
return false;
} catch (e) {
debugPrint('[TelemetryUploader] Upload error: $e');
return false;
} finally {
_isUploading = false;
}
}
Future<void> forceUploadAll() async {
stopPeriodicUpload();
int retries = 0;
while (storage.getQueueSize() > 0 && retries < 3) {
final success = await uploadBatch(batchSize: 50);
if (!success) {
retries++;
await Future.delayed(const Duration(seconds: 1));
}
}
}
}

View File

@ -6,6 +6,7 @@ import 'package:flutter_secure_storage/flutter_secure_storage.dart';
import '../../../../core/config/api_endpoints.dart';
import '../../../../core/config/app_config.dart';
import '../../../../core/errors/error_handler.dart';
import '../../../../core/telemetry/telemetry.dart';
import '../../../notifications/presentation/providers/notification_providers.dart';
import '../models/auth_response.dart';
import 'tenant_provider.dart';
@ -95,6 +96,11 @@ class AuthNotifier extends StateNotifier<AuthState> {
// Reconnect notifications
_connectNotifications(user.tenantId);
// Restore telemetry session
TelemetryService().setUserId(user.id);
TelemetryService().setAccessToken(accessToken);
TelemetryService().resumeAfterLogin();
return true;
}
@ -162,6 +168,9 @@ class AuthNotifier extends StateNotifier<AuthState> {
_ref.read(currentTenantIdProvider.notifier).state = authResponse.user.tenantId;
}
_connectNotifications(authResponse.user.tenantId);
TelemetryService().setUserId(authResponse.user.id);
TelemetryService().setAccessToken(authResponse.accessToken);
TelemetryService().resumeAfterLogin();
return true;
} on DioException catch (e) {
final message = (e.response?.data is Map) ? e.response?.data['message'] : null;
@ -211,6 +220,10 @@ class AuthNotifier extends StateNotifier<AuthState> {
// Connect notification service after login
_connectNotifications(authResponse.user.tenantId);
TelemetryService().setUserId(authResponse.user.id);
TelemetryService().setAccessToken(authResponse.accessToken);
TelemetryService().resumeAfterLogin();
return true;
} on DioException catch (e) {
final message =
@ -238,6 +251,11 @@ class AuthNotifier extends StateNotifier<AuthState> {
// Clear tenant context
_ref.read(currentTenantIdProvider.notifier).state = null;
// Pause telemetry before clearing tokens
await TelemetryService().pauseForLogout();
TelemetryService().clearUserId();
TelemetryService().clearAccessToken();
final storage = _ref.read(secureStorageProvider);
await storage.delete(key: _keyAccessToken);
await storage.delete(key: _keyRefreshToken);

View File

@ -69,6 +69,10 @@ dependencies:
package_info_plus: ^8.0.0
crypto: ^3.0.3
# Telemetry
device_info_plus: ^10.1.0
equatable: ^2.0.5
dev_dependencies:
flutter_test:
sdk: flutter

View File

@ -141,6 +141,18 @@ services:
- /api/v1/billing/webhooks
strip_path: false
- name: presence-service
url: http://presence-service:3011
routes:
- name: presence-routes
paths:
- /api/v1/presence
strip_path: false
- name: analytics-routes
paths:
- /api/v1/analytics
strip_path: false
plugins:
# ===== Global plugins (apply to ALL routes) =====
- name: cors
@ -246,6 +258,13 @@ plugins:
claims_to_verify:
- exp
- name: jwt
service: presence-service
config:
key_claim_name: kid
claims_to_verify:
- exp
- name: jwt
route: admin-routes
config:

View File

@ -0,0 +1,8 @@
{
"$schema": "https://json.schemastore.org/nest-cli",
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"compilerOptions": {
"deleteOutDir": true
}
}

View File

@ -0,0 +1,38 @@
{
"name": "@it0/presence-service",
"version": "0.1.0",
"private": true,
"scripts": {
"build": "nest build",
"dev": "nest start --watch",
"start": "node dist/main",
"prisma:generate": "prisma generate",
"prisma:migrate": "prisma migrate deploy"
},
"dependencies": {
"@nestjs/common": "^10.3.0",
"@nestjs/core": "^10.3.0",
"@nestjs/config": "^3.2.0",
"@nestjs/cqrs": "^10.2.7",
"@nestjs/jwt": "^10.2.0",
"@nestjs/platform-express": "^10.3.0",
"@nestjs/schedule": "^4.0.1",
"@nestjs/swagger": "^7.3.0",
"@prisma/client": "^5.9.0",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.1",
"date-fns": "^2.30.0",
"date-fns-tz": "^2.0.1",
"ioredis": "^5.3.0",
"reflect-metadata": "^0.2.0",
"rxjs": "^7.8.0",
"uuid": "^9.0.0"
},
"devDependencies": {
"@nestjs/cli": "^10.3.0",
"@types/express": "^4.17.21",
"@types/node": "^20.11.0",
"prisma": "^5.9.0",
"typescript": "^5.4.0"
}
}

View File

@ -0,0 +1,91 @@
-- CreateTable
CREATE TABLE "presence_event_log" (
"id" BIGSERIAL NOT NULL,
"user_id" VARCHAR(36),
"install_id" VARCHAR(64) NOT NULL,
"event_name" VARCHAR(64) NOT NULL,
"event_time" TIMESTAMPTZ NOT NULL,
"device_brand" VARCHAR(64),
"device_model" VARCHAR(64),
"device_os" VARCHAR(32),
"app_version" VARCHAR(32),
"locale" VARCHAR(16),
"properties" JSONB,
"created_at" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "presence_event_log_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "presence_device_profile" (
"install_id" VARCHAR(64) NOT NULL,
"user_id" VARCHAR(36),
"device_brand" VARCHAR(64),
"device_model" VARCHAR(64),
"device_os" VARCHAR(32),
"app_version" VARCHAR(32),
"locale" VARCHAR(16),
"first_seen_at" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
"last_seen_at" TIMESTAMPTZ NOT NULL,
"event_count" INTEGER NOT NULL DEFAULT 1,
CONSTRAINT "presence_device_profile_pkey" PRIMARY KEY ("install_id")
);
-- CreateTable
CREATE TABLE "presence_daily_active_users" (
"day" DATE NOT NULL,
"dau_count" INTEGER NOT NULL,
"dau_by_province" JSONB,
"dau_by_city" JSONB,
"calculated_at" TIMESTAMPTZ NOT NULL,
"version" INTEGER NOT NULL DEFAULT 1,
CONSTRAINT "presence_daily_active_users_pkey" PRIMARY KEY ("day")
);
-- CreateTable
CREATE TABLE "presence_online_snapshots" (
"id" BIGSERIAL NOT NULL,
"ts" TIMESTAMPTZ NOT NULL,
"online_count" INTEGER NOT NULL,
"window_seconds" INTEGER NOT NULL DEFAULT 300,
CONSTRAINT "presence_online_snapshots_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "idx_presence_event_log_event_time" ON "presence_event_log"("event_time");
-- CreateIndex
CREATE INDEX "idx_presence_event_log_event_name" ON "presence_event_log"("event_name");
-- CreateIndex
CREATE INDEX "idx_presence_event_log_event_name_time" ON "presence_event_log"("event_name", "event_time");
-- CreateIndex
CREATE INDEX "idx_presence_event_log_user_id" ON "presence_event_log"("user_id");
-- CreateIndex
CREATE INDEX "idx_presence_event_log_device_brand" ON "presence_event_log"("device_brand");
-- CreateIndex
CREATE INDEX "idx_presence_event_log_app_version" ON "presence_event_log"("app_version");
-- CreateIndex
CREATE INDEX "idx_presence_device_profile_brand" ON "presence_device_profile"("device_brand");
-- CreateIndex
CREATE INDEX "idx_presence_device_profile_app_version" ON "presence_device_profile"("app_version");
-- CreateIndex
CREATE INDEX "idx_presence_device_profile_user_id" ON "presence_device_profile"("user_id");
-- CreateIndex
CREATE INDEX "idx_presence_device_profile_last_seen" ON "presence_device_profile"("last_seen_at" DESC);
-- CreateIndex
CREATE UNIQUE INDEX "presence_online_snapshots_ts_key" ON "presence_online_snapshots"("ts");
-- CreateIndex
CREATE INDEX "idx_presence_online_snapshots_ts" ON "presence_online_snapshots"("ts" DESC);

View File

@ -0,0 +1,3 @@
# Please do not edit this file manually
# It should be added in your version-control system (e.g., Git)
provider = "postgresql"

View File

@ -0,0 +1,81 @@
// =============================================================================
// Presence Service - Prisma Schema (IT0)
// Table prefix: presence_ to avoid collision with other IT0 services
// userId: VarChar(36) for UUID format
// =============================================================================
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
// 事件日志表 (append-only)
model EventLog {
id BigInt @id @default(autoincrement())
userId String? @map("user_id") @db.VarChar(36)
installId String @map("install_id") @db.VarChar(64)
eventName String @map("event_name") @db.VarChar(64)
eventTime DateTime @map("event_time") @db.Timestamptz()
deviceBrand String? @map("device_brand") @db.VarChar(64)
deviceModel String? @map("device_model") @db.VarChar(64)
deviceOs String? @map("device_os") @db.VarChar(32)
appVersion String? @map("app_version") @db.VarChar(32)
locale String? @map("locale") @db.VarChar(16)
properties Json? @db.JsonB
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz()
@@index([eventTime], name: "idx_presence_event_log_event_time")
@@index([eventName], name: "idx_presence_event_log_event_name")
@@index([eventName, eventTime], name: "idx_presence_event_log_event_name_time")
@@index([userId], name: "idx_presence_event_log_user_id")
@@index([deviceBrand], name: "idx_presence_event_log_device_brand")
@@index([appVersion], name: "idx_presence_event_log_app_version")
@@map("presence_event_log")
}
// 设备档案表 (每台设备一行, upsert 更新)
model DeviceProfile {
installId String @id @map("install_id") @db.VarChar(64)
userId String? @map("user_id") @db.VarChar(36)
deviceBrand String? @map("device_brand") @db.VarChar(64)
deviceModel String? @map("device_model") @db.VarChar(64)
deviceOs String? @map("device_os") @db.VarChar(32)
appVersion String? @map("app_version") @db.VarChar(32)
locale String? @map("locale") @db.VarChar(16)
firstSeenAt DateTime @default(now()) @map("first_seen_at") @db.Timestamptz()
lastSeenAt DateTime @updatedAt @map("last_seen_at") @db.Timestamptz()
eventCount Int @default(1) @map("event_count")
@@index([deviceBrand], name: "idx_presence_device_profile_brand")
@@index([appVersion], name: "idx_presence_device_profile_app_version")
@@index([userId], name: "idx_presence_device_profile_user_id")
@@index([lastSeenAt(sort: Desc)], name: "idx_presence_device_profile_last_seen")
@@map("presence_device_profile")
}
// 日活统计表
model DailyActiveStats {
day DateTime @id @map("day") @db.Date
dauCount Int @map("dau_count")
dauByProvince Json? @map("dau_by_province") @db.JsonB
dauByCity Json? @map("dau_by_city") @db.JsonB
calculatedAt DateTime @map("calculated_at") @db.Timestamptz()
version Int @default(1)
@@map("presence_daily_active_users")
}
// 在线人数快照表
model OnlineSnapshot {
id BigInt @id @default(autoincrement())
ts DateTime @unique @db.Timestamptz()
onlineCount Int @map("online_count")
windowSeconds Int @default(300) @map("window_seconds")
@@index([ts(sort: Desc)], name: "idx_presence_online_snapshots_ts")
@@map("presence_online_snapshots")
}

View File

@ -0,0 +1,14 @@
import { Module } from '@nestjs/common';
import { ApplicationModule } from '../application/application.module';
import { AnalyticsController } from './controllers/analytics.controller';
import { PresenceController } from './controllers/presence.controller';
import { HealthController } from './controllers/health.controller';
import { JwtAuthGuard } from '../shared/guards/jwt-auth.guard';
import { AdminGuard } from '../shared/guards/admin.guard';
@Module({
imports: [ApplicationModule],
controllers: [AnalyticsController, PresenceController, HealthController],
providers: [JwtAuthGuard, AdminGuard],
})
export class ApiModule {}

View File

@ -0,0 +1,35 @@
import { Controller, Post, Get, Body, Query, UseGuards } from '@nestjs/common';
import { CommandBus, QueryBus } from '@nestjs/cqrs';
import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger';
import { BatchEventsDto } from '../dto/request/batch-events.dto';
import { QueryDauDto } from '../dto/request/query-dau.dto';
import { RecordEventsCommand } from '../../application/commands/record-events/record-events.command';
import { GetDauStatsQuery } from '../../application/queries/get-dau-stats/get-dau-stats.query';
import { Public } from '../../shared/decorators/public.decorator';
import { AdminGuard } from '../../shared/guards/admin.guard';
@ApiTags('Analytics')
@Controller('analytics')
export class AnalyticsController {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus,
) {}
@Post('events')
@Public()
@ApiOperation({ summary: '批量上报事件(无需认证)' })
async batchEvents(@Body() dto: BatchEventsDto) {
return this.commandBus.execute(new RecordEventsCommand(dto.events));
}
@Get('dau')
@UseGuards(AdminGuard)
@ApiBearerAuth()
@ApiOperation({ summary: 'DAU 统计(管理员)' })
async getDauStats(@Query() dto: QueryDauDto) {
return this.queryBus.execute(
new GetDauStatsQuery(new Date(dto.startDate), new Date(dto.endDate)),
);
}
}

View File

@ -0,0 +1,11 @@
import { Controller, Get } from '@nestjs/common';
import { ApiTags } from '@nestjs/swagger';
@ApiTags('Health')
@Controller()
export class HealthController {
@Get()
health() {
return { status: 'ok', service: 'presence-service', timestamp: new Date().toISOString() };
}
}

View File

@ -0,0 +1,56 @@
import { Controller, Post, Get, Body, Query, UseGuards } from '@nestjs/common';
import { CommandBus, QueryBus } from '@nestjs/cqrs';
import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger';
import { HeartbeatDto } from '../dto/request/heartbeat.dto';
import { QueryOnlineHistoryDto } from '../dto/request/query-online-history.dto';
import { RecordHeartbeatCommand } from '../../application/commands/record-heartbeat/record-heartbeat.command';
import { GetOnlineCountQuery } from '../../application/queries/get-online-count/get-online-count.query';
import { GetOnlineHistoryQuery } from '../../application/queries/get-online-history/get-online-history.query';
import { JwtAuthGuard } from '../../shared/guards/jwt-auth.guard';
import { AdminGuard } from '../../shared/guards/admin.guard';
import { CurrentUser } from '../../shared/decorators/current-user.decorator';
@ApiTags('Presence')
@Controller('presence')
export class PresenceController {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus,
) {}
@Post('heartbeat')
@UseGuards(JwtAuthGuard)
@ApiBearerAuth()
@ApiOperation({ summary: '心跳上报' })
async heartbeat(
@CurrentUser('userId') userId: string,
@Body() dto: HeartbeatDto,
) {
return this.commandBus.execute(
new RecordHeartbeatCommand(userId, dto.installId, dto.appVersion, dto.clientTs),
);
}
@Get('online-count')
@UseGuards(AdminGuard)
@ApiBearerAuth()
@ApiOperation({ summary: '实时在线人数(管理员)' })
async getOnlineCount() {
const result = await this.queryBus.execute(new GetOnlineCountQuery());
return {
count: result.count,
windowSeconds: result.windowSeconds,
queriedAt: result.queriedAt.toISOString(),
};
}
@Get('online-history')
@UseGuards(AdminGuard)
@ApiBearerAuth()
@ApiOperation({ summary: '历史在线人数(管理员)' })
async getOnlineHistory(@Query() dto: QueryOnlineHistoryDto) {
return this.queryBus.execute(
new GetOnlineHistoryQuery(new Date(dto.startTime), new Date(dto.endTime), dto.interval || '5m'),
);
}
}

View File

@ -0,0 +1,60 @@
import { IsArray, IsString, IsOptional, IsNumber, ValidateNested, IsObject } from 'class-validator';
import { Type } from 'class-transformer';
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
export class EventItemDto {
@ApiProperty({ example: 'app_session_start' })
@IsString()
eventName: string;
@ApiPropertyOptional()
@IsOptional()
@IsString()
userId?: string;
@ApiProperty({ example: 'uuid-v4-xxx' })
@IsString()
installId: string;
@ApiProperty({ example: 1732685100 })
@IsNumber()
clientTs: number;
@ApiPropertyOptional()
@IsOptional()
@IsString()
deviceBrand?: string;
@ApiPropertyOptional()
@IsOptional()
@IsString()
deviceModel?: string;
@ApiPropertyOptional()
@IsOptional()
@IsString()
deviceOs?: string;
@ApiPropertyOptional()
@IsOptional()
@IsString()
appVersion?: string;
@ApiPropertyOptional()
@IsOptional()
@IsString()
locale?: string;
@ApiPropertyOptional()
@IsOptional()
@IsObject()
properties?: Record<string, unknown>;
}
export class BatchEventsDto {
@ApiProperty({ type: [EventItemDto] })
@IsArray()
@ValidateNested({ each: true })
@Type(() => EventItemDto)
events: EventItemDto[];
}

View File

@ -0,0 +1,16 @@
import { IsString, IsNumber } from 'class-validator';
import { ApiProperty } from '@nestjs/swagger';
export class HeartbeatDto {
@ApiProperty({ example: 'uuid-v4-xxx' })
@IsString()
installId: string;
@ApiProperty({ example: '1.0.0' })
@IsString()
appVersion: string;
@ApiProperty({ example: 1732685100 })
@IsNumber()
clientTs: number;
}

View File

@ -0,0 +1,12 @@
import { IsDateString } from 'class-validator';
import { ApiProperty } from '@nestjs/swagger';
export class QueryDauDto {
@ApiProperty({ example: '2026-03-01' })
@IsDateString()
startDate: string;
@ApiProperty({ example: '2026-03-07' })
@IsDateString()
endDate: string;
}

View File

@ -0,0 +1,17 @@
import { IsString, IsDateString, IsOptional } from 'class-validator';
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
export class QueryOnlineHistoryDto {
@ApiProperty({ example: '2026-03-01T00:00:00Z' })
@IsDateString()
startTime: string;
@ApiProperty({ example: '2026-03-07T23:59:59Z' })
@IsDateString()
endTime: string;
@ApiPropertyOptional({ enum: ['1m', '5m', '1h'], default: '5m' })
@IsOptional()
@IsString()
interval?: '1m' | '5m' | '1h';
}

View File

@ -0,0 +1,30 @@
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { JwtModule } from '@nestjs/jwt';
import { ScheduleModule } from '@nestjs/schedule';
import { ApiModule } from './api/api.module';
import { ApplicationModule } from './application/application.module';
import { DomainModule } from './domain/domain.module';
import { InfrastructureModule } from './infrastructure/infrastructure.module';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ['.env.local', '.env'],
}),
JwtModule.registerAsync({
global: true,
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
secret: configService.get<string>('JWT_SECRET'),
}),
}),
ScheduleModule.forRoot(),
DomainModule,
InfrastructureModule,
ApplicationModule,
ApiModule,
],
})
export class AppModule {}

View File

@ -0,0 +1,26 @@
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { DomainModule } from '../domain/domain.module';
import { InfrastructureModule } from '../infrastructure/infrastructure.module';
import { RecordEventsHandler } from './commands/record-events/record-events.handler';
import { RecordHeartbeatHandler } from './commands/record-heartbeat/record-heartbeat.handler';
import { CalculateDauHandler } from './commands/calculate-dau/calculate-dau.handler';
import { GetOnlineCountHandler } from './queries/get-online-count/get-online-count.handler';
import { GetDauStatsHandler } from './queries/get-dau-stats/get-dau-stats.handler';
import { GetOnlineHistoryHandler } from './queries/get-online-history/get-online-history.handler';
import { AnalyticsScheduler } from './schedulers/analytics.scheduler';
@Module({
imports: [CqrsModule, DomainModule, InfrastructureModule],
providers: [
RecordEventsHandler,
RecordHeartbeatHandler,
CalculateDauHandler,
GetOnlineCountHandler,
GetDauStatsHandler,
GetOnlineHistoryHandler,
AnalyticsScheduler,
],
exports: [CqrsModule],
})
export class ApplicationModule {}

View File

@ -0,0 +1,3 @@
export class CalculateDauCommand {
constructor(public readonly date: Date) {}
}

View File

@ -0,0 +1,48 @@
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { CalculateDauCommand } from './calculate-dau.command';
import { IEventLogRepository, EVENT_LOG_REPOSITORY } from '../../../domain/repositories/event-log.repository.interface';
import { IDailyActiveStatsRepository, DAILY_ACTIVE_STATS_REPOSITORY } from '../../../domain/repositories/daily-active-stats.repository.interface';
import { DauCalculationService } from '../../../domain/services/dau-calculation.service';
import { EventName } from '../../../domain/value-objects/event-name.vo';
import { startOfDayInTimezone, endOfDayInTimezone } from '../../../shared/utils/timezone.util';
@Injectable()
@CommandHandler(CalculateDauCommand)
export class CalculateDauHandler implements ICommandHandler<CalculateDauCommand> {
private readonly logger = new Logger(CalculateDauHandler.name);
constructor(
@Inject(EVENT_LOG_REPOSITORY)
private readonly eventLogRepository: IEventLogRepository,
@Inject(DAILY_ACTIVE_STATS_REPOSITORY)
private readonly dauStatsRepository: IDailyActiveStatsRepository,
private readonly dauCalculationService: DauCalculationService,
) {}
async execute(command: CalculateDauCommand): Promise<void> {
const { date } = command;
const timezone = 'Asia/Shanghai';
const startOfDay = startOfDayInTimezone(date, timezone);
const endOfDay = endOfDayInTimezone(date, timezone);
this.logger.log(`Calculating DAU for ${date.toISOString().split('T')[0]}`);
const result = await this.eventLogRepository.queryDau(
EventName.APP_SESSION_START,
startOfDay,
endOfDay,
);
const existingStats = await this.dauStatsRepository.findByDate(date);
if (existingStats) {
existingStats.recalculate(result.total, result.byProvince, result.byCity);
await this.dauStatsRepository.upsert(existingStats);
} else {
const stats = this.dauCalculationService.createStatsFromQueryResult(date, result);
await this.dauStatsRepository.upsert(stats);
}
this.logger.log(`DAU calculated: ${result.total} users`);
}
}

View File

@ -0,0 +1,16 @@
export interface EventItemDto {
eventName: string;
userId?: string;
installId: string;
clientTs: number;
deviceBrand?: string;
deviceModel?: string;
deviceOs?: string;
appVersion?: string;
locale?: string;
properties?: Record<string, unknown>;
}
export class RecordEventsCommand {
constructor(public readonly events: EventItemDto[]) {}
}

View File

@ -0,0 +1,97 @@
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { Inject, Injectable } from '@nestjs/common';
import { RecordEventsCommand, EventItemDto } from './record-events.command';
import { EventLog } from '../../../domain/entities/event-log.entity';
import { DeviceProfile } from '../../../domain/entities/device-profile.entity';
import { InstallId } from '../../../domain/value-objects/install-id.vo';
import { EventName } from '../../../domain/value-objects/event-name.vo';
import { EventProperties } from '../../../domain/value-objects/event-properties.vo';
import { IEventLogRepository, EVENT_LOG_REPOSITORY } from '../../../domain/repositories/event-log.repository.interface';
import { IDeviceProfileRepository, DEVICE_PROFILE_REPOSITORY } from '../../../domain/repositories/device-profile.repository.interface';
import { RedisService } from '../../../infrastructure/redis/redis.service';
import { formatToDateKey } from '../../../shared/utils/timezone.util';
export interface RecordEventsResult {
accepted: number;
failed: number;
errors?: string[];
}
@Injectable()
@CommandHandler(RecordEventsCommand)
export class RecordEventsHandler implements ICommandHandler<RecordEventsCommand> {
constructor(
@Inject(EVENT_LOG_REPOSITORY)
private readonly eventLogRepository: IEventLogRepository,
@Inject(DEVICE_PROFILE_REPOSITORY)
private readonly deviceProfileRepository: IDeviceProfileRepository,
private readonly redisService: RedisService,
) {}
async execute(command: RecordEventsCommand): Promise<RecordEventsResult> {
const { events } = command;
const errors: string[] = [];
const validLogs: EventLog[] = [];
for (let i = 0; i < events.length; i++) {
try {
validLogs.push(this.toEventLog(events[i]));
} catch (e) {
errors.push(`Event[${i}]: ${e.message}`);
}
}
if (validLogs.length === 0) {
return { accepted: 0, failed: events.length, errors };
}
await this.eventLogRepository.batchInsert(validLogs);
// Upsert device profiles
const deviceProfiles = validLogs
.filter((log) => log.installId.value)
.map((log) =>
DeviceProfile.create({
installId: log.installId.value,
userId: log.userId,
deviceBrand: log.deviceBrand,
deviceModel: log.deviceModel,
deviceOs: log.deviceOs,
appVersion: log.appVersion,
locale: log.locale,
eventCount: 1,
}),
);
await this.deviceProfileRepository.upsertBatch(deviceProfiles);
// Update realtime DAU (HyperLogLog)
const todayKey = formatToDateKey(new Date());
for (const log of validLogs) {
if (log.eventName.isDauEvent()) {
await this.redisService.pfadd(`presence:dau:${todayKey}`, log.dauIdentifier);
await this.redisService.expire(`presence:dau:${todayKey}`, 86400 * 3);
}
}
return {
accepted: validLogs.length,
failed: events.length - validLogs.length,
errors: errors.length > 0 ? errors : undefined,
};
}
private toEventLog(dto: EventItemDto): EventLog {
return EventLog.create({
userId: dto.userId ?? null,
installId: InstallId.fromString(dto.installId),
eventName: EventName.fromString(dto.eventName),
eventTime: new Date(dto.clientTs * 1000),
deviceBrand: dto.deviceBrand ?? null,
deviceModel: dto.deviceModel ?? null,
deviceOs: dto.deviceOs ?? null,
appVersion: dto.appVersion ?? null,
locale: dto.locale ?? null,
properties: EventProperties.fromData(dto.properties ?? {}),
});
}
}

View File

@ -0,0 +1,8 @@
export class RecordHeartbeatCommand {
constructor(
public readonly userId: string,
public readonly installId: string,
public readonly appVersion: string,
public readonly clientTs: number,
) {}
}

View File

@ -0,0 +1,21 @@
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { Injectable } from '@nestjs/common';
import { RecordHeartbeatCommand } from './record-heartbeat.command';
import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository';
export interface RecordHeartbeatResult {
ok: boolean;
serverTs: number;
}
@Injectable()
@CommandHandler(RecordHeartbeatCommand)
export class RecordHeartbeatHandler implements ICommandHandler<RecordHeartbeatCommand> {
constructor(private readonly presenceRedisRepository: PresenceRedisRepository) {}
async execute(command: RecordHeartbeatCommand): Promise<RecordHeartbeatResult> {
const now = Math.floor(Date.now() / 1000);
await this.presenceRedisRepository.updateUserPresence(command.userId, now);
return { ok: true, serverTs: now };
}
}

View File

@ -0,0 +1,27 @@
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { Inject, Injectable } from '@nestjs/common';
import { GetDauStatsQuery } from './get-dau-stats.query';
import { IDailyActiveStatsRepository, DAILY_ACTIVE_STATS_REPOSITORY } from '../../../domain/repositories/daily-active-stats.repository.interface';
export interface DauStatsResult {
data: { day: string; dauCount: number }[];
total: number;
}
@Injectable()
@QueryHandler(GetDauStatsQuery)
export class GetDauStatsHandler implements IQueryHandler<GetDauStatsQuery> {
constructor(
@Inject(DAILY_ACTIVE_STATS_REPOSITORY)
private readonly dauStatsRepository: IDailyActiveStatsRepository,
) {}
async execute(query: GetDauStatsQuery): Promise<DauStatsResult> {
const statsList = await this.dauStatsRepository.findByDateRange(query.startDate, query.endDate);
const data = statsList.map((stats) => ({
day: stats.day.toISOString().split('T')[0],
dauCount: stats.dauCount,
}));
return { data, total: data.length };
}
}

View File

@ -0,0 +1,6 @@
export class GetDauStatsQuery {
constructor(
public readonly startDate: Date,
public readonly endDate: Date,
) {}
}

View File

@ -0,0 +1,27 @@
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { Injectable } from '@nestjs/common';
import { GetOnlineCountQuery } from './get-online-count.query';
import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository';
import { OnlineDetectionService } from '../../../domain/services/online-detection.service';
export interface OnlineCountResult {
count: number;
windowSeconds: number;
queriedAt: Date;
}
@Injectable()
@QueryHandler(GetOnlineCountQuery)
export class GetOnlineCountHandler implements IQueryHandler<GetOnlineCountQuery> {
constructor(
private readonly presenceRedisRepository: PresenceRedisRepository,
private readonly onlineDetectionService: OnlineDetectionService,
) {}
async execute(): Promise<OnlineCountResult> {
const now = new Date();
const threshold = this.onlineDetectionService.getOnlineThreshold(now);
const count = await this.presenceRedisRepository.countOnlineUsers(threshold);
return { count, windowSeconds: this.onlineDetectionService.getWindowSeconds(), queriedAt: now };
}
}

View File

@ -0,0 +1 @@
export class GetOnlineCountQuery {}

View File

@ -0,0 +1,100 @@
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { Inject, Injectable } from '@nestjs/common';
import { GetOnlineHistoryQuery, OnlineHistoryInterval } from './get-online-history.query';
import { IOnlineSnapshotRepository, ONLINE_SNAPSHOT_REPOSITORY } from '../../../domain/repositories/online-snapshot.repository.interface';
import { OnlineSnapshot } from '../../../domain/entities/online-snapshot.entity';
export interface OnlineHistoryDataPoint {
timestamp: string;
onlineCount: number;
windowSeconds: number;
}
export interface OnlineHistoryResult {
data: OnlineHistoryDataPoint[];
interval: OnlineHistoryInterval;
startTime: string;
endTime: string;
total: number;
summary: { maxOnline: number; minOnline: number; avgOnline: number; maxTimestamp: string | null; minTimestamp: string | null };
}
@Injectable()
@QueryHandler(GetOnlineHistoryQuery)
export class GetOnlineHistoryHandler implements IQueryHandler<GetOnlineHistoryQuery> {
constructor(
@Inject(ONLINE_SNAPSHOT_REPOSITORY)
private readonly snapshotRepository: IOnlineSnapshotRepository,
) {}
async execute(query: GetOnlineHistoryQuery): Promise<OnlineHistoryResult> {
const { startTime, endTime, interval } = query;
const snapshots = await this.snapshotRepository.findByTimeRange(startTime, endTime, interval);
const aggregated = this.aggregateByInterval(snapshots, interval);
const data: OnlineHistoryDataPoint[] = aggregated.map((s) => ({
timestamp: s.ts.toISOString(),
onlineCount: s.onlineCount,
windowSeconds: s.windowSeconds,
}));
return {
data,
interval,
startTime: startTime.toISOString(),
endTime: endTime.toISOString(),
total: data.length,
summary: this.calculateSummary(aggregated),
};
}
private aggregateByInterval(snapshots: OnlineSnapshot[], interval: OnlineHistoryInterval): OnlineSnapshot[] {
if (snapshots.length === 0) return [];
const intervalMs = interval === '1m' ? 60000 : interval === '5m' ? 300000 : 3600000;
const buckets = new Map<number, { total: number; count: number; windowSeconds: number }>();
for (const snapshot of snapshots) {
const key = Math.floor(snapshot.ts.getTime() / intervalMs) * intervalMs;
const existing = buckets.get(key);
if (existing) {
existing.total += snapshot.onlineCount;
existing.count++;
} else {
buckets.set(key, { total: snapshot.onlineCount, count: 1, windowSeconds: snapshot.windowSeconds });
}
}
return Array.from(buckets.keys())
.sort((a, b) => a - b)
.map((key) => {
const bucket = buckets.get(key)!;
return OnlineSnapshot.reconstitute({
id: BigInt(0),
ts: new Date(key),
onlineCount: Math.round(bucket.total / bucket.count),
windowSeconds: bucket.windowSeconds,
});
});
}
private calculateSummary(snapshots: OnlineSnapshot[]): OnlineHistoryResult['summary'] {
if (snapshots.length === 0) return { maxOnline: 0, minOnline: 0, avgOnline: 0, maxTimestamp: null, minTimestamp: null };
let maxOnline = -Infinity, minOnline = Infinity, totalOnline = 0;
let maxTimestamp: Date | null = null, minTimestamp: Date | null = null;
for (const s of snapshots) {
totalOnline += s.onlineCount;
if (s.onlineCount > maxOnline) { maxOnline = s.onlineCount; maxTimestamp = s.ts; }
if (s.onlineCount < minOnline) { minOnline = s.onlineCount; minTimestamp = s.ts; }
}
return {
maxOnline,
minOnline,
avgOnline: Math.round(totalOnline / snapshots.length),
maxTimestamp: maxTimestamp?.toISOString() || null,
minTimestamp: minTimestamp?.toISOString() || null,
};
}
}

View File

@ -0,0 +1,9 @@
export type OnlineHistoryInterval = '1m' | '5m' | '1h';
export class GetOnlineHistoryQuery {
constructor(
public readonly startTime: Date,
public readonly endTime: Date,
public readonly interval: OnlineHistoryInterval = '5m',
) {}
}

View File

@ -0,0 +1,70 @@
import { Injectable, Logger, Inject } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { CommandBus } from '@nestjs/cqrs';
import { subDays } from 'date-fns';
import { CalculateDauCommand } from '../commands/calculate-dau/calculate-dau.command';
import { PresenceRedisRepository } from '../../infrastructure/redis/presence-redis.repository';
import { OnlineDetectionService } from '../../domain/services/online-detection.service';
import { OnlineSnapshot } from '../../domain/entities/online-snapshot.entity';
import { IOnlineSnapshotRepository, ONLINE_SNAPSHOT_REPOSITORY } from '../../domain/repositories/online-snapshot.repository.interface';
@Injectable()
export class AnalyticsScheduler {
private readonly logger = new Logger(AnalyticsScheduler.name);
constructor(
private readonly commandBus: CommandBus,
private readonly presenceRedisRepository: PresenceRedisRepository,
private readonly onlineDetectionService: OnlineDetectionService,
@Inject(ONLINE_SNAPSHOT_REPOSITORY)
private readonly snapshotRepository: IOnlineSnapshotRepository,
) {}
@Cron(CronExpression.EVERY_MINUTE)
async recordOnlineSnapshot(): Promise<void> {
try {
const now = new Date();
const threshold = this.onlineDetectionService.getOnlineThreshold(now);
const count = await this.presenceRedisRepository.countOnlineUsers(threshold);
const snapshot = OnlineSnapshot.create({
ts: now,
onlineCount: count,
windowSeconds: this.onlineDetectionService.getWindowSeconds(),
});
await this.snapshotRepository.insert(snapshot);
this.logger.debug(`Online snapshot: ${count} users`);
} catch (error) {
this.logger.error('Failed to record online snapshot', error);
}
}
@Cron(CronExpression.EVERY_HOUR)
async cleanupExpiredPresence(): Promise<void> {
try {
const threshold = Math.floor(Date.now() / 1000) - 86400;
await this.presenceRedisRepository.removeExpiredUsers(threshold);
} catch (error) {
this.logger.error('Failed to cleanup expired presence', error);
}
}
@Cron('0 0 1 * * *', { timeZone: 'Asia/Shanghai' })
async calculateYesterdayDau(): Promise<void> {
try {
const yesterday = subDays(new Date(), 1);
await this.commandBus.execute(new CalculateDauCommand(yesterday));
this.logger.log('Yesterday DAU calculated');
} catch (error) {
this.logger.error('Failed to calculate yesterday DAU', error);
}
}
@Cron(CronExpression.EVERY_HOUR)
async calculateTodayDauRolling(): Promise<void> {
try {
await this.commandBus.execute(new CalculateDauCommand(new Date()));
} catch (error) {
this.logger.error('Failed to calculate today DAU', error);
}
}
}

View File

@ -0,0 +1,61 @@
import { AggregateRoot } from '@nestjs/cqrs';
export class DailyActiveStats extends AggregateRoot {
private _day: Date;
private _dauCount: number;
private _dauByProvince: Map<string, number>;
private _dauByCity: Map<string, number>;
private _calculatedAt: Date;
private _version: number;
private constructor() { super(); }
get day(): Date { return this._day; }
get dauCount(): number { return this._dauCount; }
get dauByProvince(): Map<string, number> { return new Map(this._dauByProvince); }
get dauByCity(): Map<string, number> { return new Map(this._dauByCity); }
get calculatedAt(): Date { return this._calculatedAt; }
get version(): number { return this._version; }
static create(props: {
day: Date;
dauCount: number;
dauByProvince?: Map<string, number>;
dauByCity?: Map<string, number>;
}): DailyActiveStats {
const stats = new DailyActiveStats();
stats._day = props.day;
stats._dauCount = props.dauCount;
stats._dauByProvince = props.dauByProvince ?? new Map();
stats._dauByCity = props.dauByCity ?? new Map();
stats._calculatedAt = new Date();
stats._version = 1;
return stats;
}
recalculate(newDauCount: number, byProvince?: Map<string, number>, byCity?: Map<string, number>): void {
this._dauCount = newDauCount;
if (byProvince) this._dauByProvince = byProvince;
if (byCity) this._dauByCity = byCity;
this._calculatedAt = new Date();
this._version++;
}
static reconstitute(props: {
day: Date;
dauCount: number;
dauByProvince: Map<string, number>;
dauByCity: Map<string, number>;
calculatedAt: Date;
version: number;
}): DailyActiveStats {
const stats = new DailyActiveStats();
stats._day = props.day;
stats._dauCount = props.dauCount;
stats._dauByProvince = props.dauByProvince;
stats._dauByCity = props.dauByCity;
stats._calculatedAt = props.calculatedAt;
stats._version = props.version;
return stats;
}
}

View File

@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { DauCalculationService } from './services/dau-calculation.service';
import { OnlineDetectionService } from './services/online-detection.service';
@Module({
providers: [DauCalculationService, OnlineDetectionService],
exports: [DauCalculationService, OnlineDetectionService],
})
export class DomainModule {}

View File

@ -0,0 +1,32 @@
export class DeviceProfile {
installId: string;
userId: string | null;
deviceBrand: string | null;
deviceModel: string | null;
deviceOs: string | null;
appVersion: string | null;
locale: string | null;
eventCount: number;
static create(props: {
installId: string;
userId?: string | null;
deviceBrand?: string | null;
deviceModel?: string | null;
deviceOs?: string | null;
appVersion?: string | null;
locale?: string | null;
eventCount?: number;
}): DeviceProfile {
const profile = new DeviceProfile();
profile.installId = props.installId;
profile.userId = props.userId ?? null;
profile.deviceBrand = props.deviceBrand ?? null;
profile.deviceModel = props.deviceModel ?? null;
profile.deviceOs = props.deviceOs ?? null;
profile.appVersion = props.appVersion ?? null;
profile.locale = props.locale ?? null;
profile.eventCount = props.eventCount ?? 1;
return profile;
}
}

View File

@ -0,0 +1,95 @@
import { InstallId } from '../value-objects/install-id.vo';
import { EventName } from '../value-objects/event-name.vo';
import { EventProperties } from '../value-objects/event-properties.vo';
export class EventLog {
private _id: bigint | null;
private _userId: string | null;
private _installId: InstallId;
private _eventName: EventName;
private _eventTime: Date;
private _deviceBrand: string | null;
private _deviceModel: string | null;
private _deviceOs: string | null;
private _appVersion: string | null;
private _locale: string | null;
private _properties: EventProperties;
private _createdAt: Date;
private constructor() {}
get id(): bigint | null { return this._id; }
get userId(): string | null { return this._userId; }
get installId(): InstallId { return this._installId; }
get eventName(): EventName { return this._eventName; }
get eventTime(): Date { return this._eventTime; }
get deviceBrand(): string | null { return this._deviceBrand; }
get deviceModel(): string | null { return this._deviceModel; }
get deviceOs(): string | null { return this._deviceOs; }
get appVersion(): string | null { return this._appVersion; }
get locale(): string | null { return this._locale; }
get properties(): EventProperties { return this._properties; }
get createdAt(): Date { return this._createdAt; }
get dauIdentifier(): string {
return this._userId ?? this._installId.value;
}
static create(props: {
userId?: string | null;
installId: InstallId;
eventName: EventName;
eventTime: Date;
deviceBrand?: string | null;
deviceModel?: string | null;
deviceOs?: string | null;
appVersion?: string | null;
locale?: string | null;
properties?: EventProperties;
}): EventLog {
const log = new EventLog();
log._id = null;
log._userId = props.userId ?? null;
log._installId = props.installId;
log._eventName = props.eventName;
log._eventTime = props.eventTime;
log._deviceBrand = props.deviceBrand ?? null;
log._deviceModel = props.deviceModel ?? null;
log._deviceOs = props.deviceOs ?? null;
log._appVersion = props.appVersion ?? null;
log._locale = props.locale ?? null;
log._properties = props.properties ?? EventProperties.empty();
log._createdAt = new Date();
return log;
}
static reconstitute(props: {
id: bigint;
userId: string | null;
installId: InstallId;
eventName: EventName;
eventTime: Date;
deviceBrand: string | null;
deviceModel: string | null;
deviceOs: string | null;
appVersion: string | null;
locale: string | null;
properties: EventProperties;
createdAt: Date;
}): EventLog {
const log = new EventLog();
log._id = props.id;
log._userId = props.userId;
log._installId = props.installId;
log._eventName = props.eventName;
log._eventTime = props.eventTime;
log._deviceBrand = props.deviceBrand;
log._deviceModel = props.deviceModel;
log._deviceOs = props.deviceOs;
log._appVersion = props.appVersion;
log._locale = props.locale;
log._properties = props.properties;
log._createdAt = props.createdAt;
return log;
}
}

View File

@ -0,0 +1,33 @@
import { TimeWindow } from '../value-objects/time-window.vo';
export class OnlineSnapshot {
private _id: bigint | null;
private _ts: Date;
private _onlineCount: number;
private _windowSeconds: number;
private constructor() {}
get id(): bigint | null { return this._id; }
get ts(): Date { return this._ts; }
get onlineCount(): number { return this._onlineCount; }
get windowSeconds(): number { return this._windowSeconds; }
static create(props: { ts: Date; onlineCount: number; windowSeconds?: number }): OnlineSnapshot {
const snapshot = new OnlineSnapshot();
snapshot._id = null;
snapshot._ts = props.ts;
snapshot._onlineCount = props.onlineCount;
snapshot._windowSeconds = props.windowSeconds ?? TimeWindow.DEFAULT_ONLINE_WINDOW_SECONDS;
return snapshot;
}
static reconstitute(props: { id: bigint; ts: Date; onlineCount: number; windowSeconds: number }): OnlineSnapshot {
const snapshot = new OnlineSnapshot();
snapshot._id = props.id;
snapshot._ts = props.ts;
snapshot._onlineCount = props.onlineCount;
snapshot._windowSeconds = props.windowSeconds;
return snapshot;
}
}

View File

@ -0,0 +1,9 @@
import { DailyActiveStats } from '../aggregates/daily-active-stats/daily-active-stats.aggregate';
export interface IDailyActiveStatsRepository {
upsert(stats: DailyActiveStats): Promise<void>;
findByDate(day: Date): Promise<DailyActiveStats | null>;
findByDateRange(startDate: Date, endDate: Date): Promise<DailyActiveStats[]>;
}
export const DAILY_ACTIVE_STATS_REPOSITORY = 'DAILY_ACTIVE_STATS_REPOSITORY';

View File

@ -0,0 +1,7 @@
import { DeviceProfile } from '../entities/device-profile.entity';
export interface IDeviceProfileRepository {
upsertBatch(profiles: DeviceProfile[]): Promise<void>;
}
export const DEVICE_PROFILE_REPOSITORY = 'DEVICE_PROFILE_REPOSITORY';

View File

@ -0,0 +1,17 @@
import { EventLog } from '../entities/event-log.entity';
import { EventName } from '../value-objects/event-name.vo';
export interface DauQueryResult {
total: number;
byProvince: Map<string, number>;
byCity: Map<string, number>;
}
export interface IEventLogRepository {
batchInsert(logs: EventLog[]): Promise<void>;
insert(log: EventLog): Promise<EventLog>;
queryDau(eventName: EventName, startTime: Date, endTime: Date): Promise<DauQueryResult>;
findByTimeRange(eventName: EventName, startTime: Date, endTime: Date, limit?: number): Promise<EventLog[]>;
}
export const EVENT_LOG_REPOSITORY = 'EVENT_LOG_REPOSITORY';

View File

@ -0,0 +1,9 @@
import { OnlineSnapshot } from '../entities/online-snapshot.entity';
export interface IOnlineSnapshotRepository {
insert(snapshot: OnlineSnapshot): Promise<OnlineSnapshot>;
findByTimeRange(startTime: Date, endTime: Date, interval?: string): Promise<OnlineSnapshot[]>;
findLatest(): Promise<OnlineSnapshot | null>;
}
export const ONLINE_SNAPSHOT_REPOSITORY = 'ONLINE_SNAPSHOT_REPOSITORY';

View File

@ -0,0 +1,15 @@
import { Injectable } from '@nestjs/common';
import { DailyActiveStats } from '../aggregates/daily-active-stats/daily-active-stats.aggregate';
import { DauQueryResult } from '../repositories/event-log.repository.interface';
@Injectable()
export class DauCalculationService {
createStatsFromQueryResult(day: Date, result: DauQueryResult): DailyActiveStats {
return DailyActiveStats.create({
day,
dauCount: result.total,
dauByProvince: result.byProvince,
dauByCity: result.byCity,
});
}
}

View File

@ -0,0 +1,24 @@
import { Injectable } from '@nestjs/common';
import { TimeWindow } from '../value-objects/time-window.vo';
@Injectable()
export class OnlineDetectionService {
private readonly timeWindow: TimeWindow;
constructor() {
const windowSeconds = parseInt(process.env.PRESENCE_WINDOW_SECONDS || '300', 10);
this.timeWindow = TimeWindow.ofSeconds(windowSeconds);
}
isOnline(lastHeartbeatTs: number, now: Date = new Date()): boolean {
return lastHeartbeatTs > this.timeWindow.getThresholdTimestamp(now);
}
getOnlineThreshold(now: Date = new Date()): number {
return this.timeWindow.getThresholdTimestamp(now);
}
getWindowSeconds(): number {
return this.timeWindow.windowSeconds;
}
}

View File

@ -0,0 +1,43 @@
import { DomainException } from '../../shared/exceptions/domain.exception';
export class EventName {
static readonly APP_SESSION_START = new EventName('app_session_start');
static readonly PRESENCE_HEARTBEAT = new EventName('presence_heartbeat');
static readonly APP_SESSION_END = new EventName('app_session_end');
private readonly _value: string;
private constructor(value: string) {
this._value = value;
}
get value(): string {
return this._value;
}
static fromString(value: string): EventName {
if (!value || value.trim() === '') {
throw new DomainException('EventName cannot be empty');
}
const trimmed = value.trim().toLowerCase();
if (trimmed.length > 64) {
throw new DomainException('EventName cannot exceed 64 characters');
}
if (!/^[a-z][a-z0-9_]*$/.test(trimmed)) {
throw new DomainException('EventName must start with a letter and contain only lowercase letters, numbers, and underscores');
}
return new EventName(trimmed);
}
isDauEvent(): boolean {
return this._value === EventName.APP_SESSION_START.value;
}
equals(other: EventName): boolean {
return this._value === other._value;
}
toString(): string {
return this._value;
}
}

View File

@ -0,0 +1,41 @@
export interface EventPropertiesData {
province?: string;
city?: string;
[key: string]: unknown;
}
export class EventProperties {
private readonly _data: EventPropertiesData;
private constructor(data: EventPropertiesData) {
this._data = { ...data };
}
get data(): EventPropertiesData {
return { ...this._data };
}
get province(): string | undefined {
return this._data.province;
}
get city(): string | undefined {
return this._data.city;
}
get appVersion(): string | undefined {
return this._data.appVersion as string | undefined;
}
static empty(): EventProperties {
return new EventProperties({});
}
static fromData(data: EventPropertiesData): EventProperties {
return new EventProperties(data);
}
toJSON(): EventPropertiesData {
return this._data;
}
}

View File

@ -0,0 +1,31 @@
import { DomainException } from '../../shared/exceptions/domain.exception';
export class InstallId {
private readonly _value: string;
private constructor(value: string) {
this._value = value;
}
get value(): string {
return this._value;
}
static fromString(value: string): InstallId {
if (!value || value.trim() === '') {
throw new DomainException('InstallId cannot be empty');
}
if (value.length < 8 || value.length > 128) {
throw new DomainException('InstallId length must be between 8 and 128 characters');
}
return new InstallId(value.trim());
}
equals(other: InstallId): boolean {
return this._value === other._value;
}
toString(): string {
return this._value;
}
}

View File

@ -0,0 +1,27 @@
export class TimeWindow {
static readonly DEFAULT_ONLINE_WINDOW_SECONDS = 300;
static readonly DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 60;
private readonly _windowSeconds: number;
private constructor(windowSeconds: number) {
this._windowSeconds = windowSeconds;
}
get windowSeconds(): number {
return this._windowSeconds;
}
static default(): TimeWindow {
return new TimeWindow(TimeWindow.DEFAULT_ONLINE_WINDOW_SECONDS);
}
static ofSeconds(seconds: number): TimeWindow {
if (seconds <= 0) throw new Error('TimeWindow must be positive');
return new TimeWindow(seconds);
}
getThresholdTimestamp(now: Date = new Date()): number {
return Math.floor(now.getTime() / 1000) - this._windowSeconds;
}
}

View File

@ -0,0 +1,37 @@
import { Module } from '@nestjs/common';
import { PrismaService } from './persistence/prisma/prisma.service';
import { EventLogMapper } from './persistence/mappers/event-log.mapper';
import { DailyActiveStatsMapper } from './persistence/mappers/daily-active-stats.mapper';
import { OnlineSnapshotMapper } from './persistence/mappers/online-snapshot.mapper';
import { EventLogRepositoryImpl } from './persistence/repositories/event-log.repository.impl';
import { DailyActiveStatsRepositoryImpl } from './persistence/repositories/daily-active-stats.repository.impl';
import { OnlineSnapshotRepositoryImpl } from './persistence/repositories/online-snapshot.repository.impl';
import { DeviceProfileRepositoryImpl } from './persistence/repositories/device-profile.repository.impl';
import { RedisModule } from './redis/redis.module';
import { EVENT_LOG_REPOSITORY } from '../domain/repositories/event-log.repository.interface';
import { DAILY_ACTIVE_STATS_REPOSITORY } from '../domain/repositories/daily-active-stats.repository.interface';
import { ONLINE_SNAPSHOT_REPOSITORY } from '../domain/repositories/online-snapshot.repository.interface';
import { DEVICE_PROFILE_REPOSITORY } from '../domain/repositories/device-profile.repository.interface';
@Module({
imports: [RedisModule],
providers: [
PrismaService,
EventLogMapper,
DailyActiveStatsMapper,
OnlineSnapshotMapper,
{ provide: EVENT_LOG_REPOSITORY, useClass: EventLogRepositoryImpl },
{ provide: DAILY_ACTIVE_STATS_REPOSITORY, useClass: DailyActiveStatsRepositoryImpl },
{ provide: ONLINE_SNAPSHOT_REPOSITORY, useClass: OnlineSnapshotRepositoryImpl },
{ provide: DEVICE_PROFILE_REPOSITORY, useClass: DeviceProfileRepositoryImpl },
],
exports: [
PrismaService,
EVENT_LOG_REPOSITORY,
DAILY_ACTIVE_STATS_REPOSITORY,
ONLINE_SNAPSHOT_REPOSITORY,
DEVICE_PROFILE_REPOSITORY,
RedisModule,
],
})
export class InfrastructureModule {}

View File

@ -0,0 +1,34 @@
import { Injectable } from '@nestjs/common';
import { DailyActiveStats as PrismaDailyActiveStats, Prisma } from '@prisma/client';
import { DailyActiveStats } from '../../../domain/aggregates/daily-active-stats/daily-active-stats.aggregate';
@Injectable()
export class DailyActiveStatsMapper {
toDomain(prisma: PrismaDailyActiveStats): DailyActiveStats {
const dauByProvince = new Map<string, number>(
Object.entries((prisma.dauByProvince as Record<string, number>) ?? {}),
);
const dauByCity = new Map<string, number>(
Object.entries((prisma.dauByCity as Record<string, number>) ?? {}),
);
return DailyActiveStats.reconstitute({
day: prisma.day,
dauCount: prisma.dauCount,
dauByProvince,
dauByCity,
calculatedAt: prisma.calculatedAt,
version: prisma.version,
});
}
toPersistence(domain: DailyActiveStats): Prisma.DailyActiveStatsCreateInput {
return {
day: domain.day,
dauCount: domain.dauCount,
dauByProvince: Object.fromEntries(domain.dauByProvince) as Prisma.InputJsonValue,
dauByCity: Object.fromEntries(domain.dauByCity) as Prisma.InputJsonValue,
calculatedAt: domain.calculatedAt,
version: domain.version,
};
}
}

View File

@ -0,0 +1,41 @@
import { Injectable } from '@nestjs/common';
import { EventLog as PrismaEventLog, Prisma } from '@prisma/client';
import { EventLog } from '../../../domain/entities/event-log.entity';
import { InstallId } from '../../../domain/value-objects/install-id.vo';
import { EventName } from '../../../domain/value-objects/event-name.vo';
import { EventProperties, EventPropertiesData } from '../../../domain/value-objects/event-properties.vo';
@Injectable()
export class EventLogMapper {
toDomain(prisma: PrismaEventLog): EventLog {
return EventLog.reconstitute({
id: prisma.id,
userId: prisma.userId,
installId: InstallId.fromString(prisma.installId),
eventName: EventName.fromString(prisma.eventName),
eventTime: prisma.eventTime,
deviceBrand: prisma.deviceBrand,
deviceModel: prisma.deviceModel,
deviceOs: prisma.deviceOs,
appVersion: prisma.appVersion,
locale: prisma.locale,
properties: EventProperties.fromData((prisma.properties as EventPropertiesData) ?? {}),
createdAt: prisma.createdAt,
});
}
toPersistence(domain: EventLog): Prisma.EventLogCreateManyInput {
return {
userId: domain.userId,
installId: domain.installId.value,
eventName: domain.eventName.value,
eventTime: domain.eventTime,
deviceBrand: domain.deviceBrand,
deviceModel: domain.deviceModel,
deviceOs: domain.deviceOs,
appVersion: domain.appVersion,
locale: domain.locale,
properties: domain.properties.toJSON() as Prisma.InputJsonValue,
};
}
}

View File

@ -0,0 +1,23 @@
import { Injectable } from '@nestjs/common';
import { OnlineSnapshot as PrismaOnlineSnapshot } from '@prisma/client';
import { OnlineSnapshot } from '../../../domain/entities/online-snapshot.entity';
@Injectable()
export class OnlineSnapshotMapper {
toDomain(prisma: PrismaOnlineSnapshot): OnlineSnapshot {
return OnlineSnapshot.reconstitute({
id: prisma.id,
ts: prisma.ts,
onlineCount: prisma.onlineCount,
windowSeconds: prisma.windowSeconds,
});
}
toPersistence(domain: OnlineSnapshot): Omit<PrismaOnlineSnapshot, 'id'> {
return {
ts: domain.ts,
onlineCount: domain.onlineCount,
windowSeconds: domain.windowSeconds,
};
}
}

View File

@ -0,0 +1,13 @@
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
@Injectable()
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {
async onModuleInit(): Promise<void> {
await this.$connect();
}
async onModuleDestroy(): Promise<void> {
await this.$disconnect();
}
}

View File

@ -0,0 +1,35 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { IDailyActiveStatsRepository } from '../../../domain/repositories/daily-active-stats.repository.interface';
import { DailyActiveStats } from '../../../domain/aggregates/daily-active-stats/daily-active-stats.aggregate';
import { DailyActiveStatsMapper } from '../mappers/daily-active-stats.mapper';
@Injectable()
export class DailyActiveStatsRepositoryImpl implements IDailyActiveStatsRepository {
constructor(
private readonly prisma: PrismaService,
private readonly mapper: DailyActiveStatsMapper,
) {}
async upsert(stats: DailyActiveStats): Promise<void> {
const data = this.mapper.toPersistence(stats);
await this.prisma.dailyActiveStats.upsert({
where: { day: stats.day },
create: data,
update: data,
});
}
async findByDate(day: Date): Promise<DailyActiveStats | null> {
const record = await this.prisma.dailyActiveStats.findUnique({ where: { day } });
return record ? this.mapper.toDomain(record) : null;
}
async findByDateRange(startDate: Date, endDate: Date): Promise<DailyActiveStats[]> {
const records = await this.prisma.dailyActiveStats.findMany({
where: { day: { gte: startDate, lte: endDate } },
orderBy: { day: 'asc' },
});
return records.map((r) => this.mapper.toDomain(r));
}
}

View File

@ -0,0 +1,40 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { IDeviceProfileRepository } from '../../../domain/repositories/device-profile.repository.interface';
import { DeviceProfile } from '../../../domain/entities/device-profile.entity';
@Injectable()
export class DeviceProfileRepositoryImpl implements IDeviceProfileRepository {
constructor(private readonly prisma: PrismaService) {}
async upsertBatch(profiles: DeviceProfile[]): Promise<void> {
if (profiles.length === 0) return;
// 同一批次中同一 install_id 可能重复,取最后一条
const map = new Map<string, DeviceProfile>();
for (const p of profiles) {
map.set(p.installId, p);
}
const now = new Date();
for (const p of Array.from(map.values())) {
await this.prisma.$executeRaw`
INSERT INTO presence_device_profile
(install_id, user_id, device_brand, device_model, device_os, app_version, locale,
first_seen_at, last_seen_at, event_count)
VALUES
(${p.installId}, ${p.userId}, ${p.deviceBrand}, ${p.deviceModel},
${p.deviceOs}, ${p.appVersion}, ${p.locale}, ${now}, ${now}, ${p.eventCount})
ON CONFLICT (install_id) DO UPDATE SET
user_id = COALESCE(EXCLUDED.user_id, presence_device_profile.user_id),
device_brand = COALESCE(EXCLUDED.device_brand, presence_device_profile.device_brand),
device_model = COALESCE(EXCLUDED.device_model, presence_device_profile.device_model),
device_os = COALESCE(EXCLUDED.device_os, presence_device_profile.device_os),
app_version = COALESCE(EXCLUDED.app_version, presence_device_profile.app_version),
locale = COALESCE(EXCLUDED.locale, presence_device_profile.locale),
last_seen_at = ${now},
event_count = presence_device_profile.event_count + ${p.eventCount}
`;
}
}
}

View File

@ -0,0 +1,77 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { IEventLogRepository, DauQueryResult } from '../../../domain/repositories/event-log.repository.interface';
import { EventLog } from '../../../domain/entities/event-log.entity';
import { EventName } from '../../../domain/value-objects/event-name.vo';
import { EventLogMapper } from '../mappers/event-log.mapper';
@Injectable()
export class EventLogRepositoryImpl implements IEventLogRepository {
constructor(
private readonly prisma: PrismaService,
private readonly mapper: EventLogMapper,
) {}
async batchInsert(logs: EventLog[]): Promise<void> {
const data = logs.map((log) => this.mapper.toPersistence(log));
await this.prisma.eventLog.createMany({ data });
}
async insert(log: EventLog): Promise<EventLog> {
const data = this.mapper.toPersistence(log);
const created = await this.prisma.eventLog.create({ data });
return this.mapper.toDomain(created);
}
async queryDau(eventName: EventName, startTime: Date, endTime: Date): Promise<DauQueryResult> {
const result = await this.prisma.$queryRaw<
{ total: bigint; province: string | null; city: string | null; count: bigint }[]
>`
WITH base AS (
SELECT
COALESCE(user_id::text, install_id) AS identifier,
properties->>'province' AS province,
properties->>'city' AS city
FROM presence_event_log
WHERE event_name = ${eventName.value}
AND event_time >= ${startTime}
AND event_time < ${endTime}
),
unique_users AS (
SELECT DISTINCT identifier, province, city FROM base
)
SELECT
COUNT(DISTINCT identifier) AS total,
province,
city,
COUNT(*) AS count
FROM unique_users
GROUP BY GROUPING SETS ((), (province), (city))
`;
let total = 0;
const byProvince = new Map<string, number>();
const byCity = new Map<string, number>();
for (const row of result) {
if (row.province === null && row.city === null) {
total = Number(row.total);
} else if (row.province !== null && row.city === null) {
byProvince.set(row.province, Number(row.count));
} else if (row.city !== null && row.province === null) {
byCity.set(row.city, Number(row.count));
}
}
return { total, byProvince, byCity };
}
async findByTimeRange(eventName: EventName, startTime: Date, endTime: Date, limit?: number): Promise<EventLog[]> {
const records = await this.prisma.eventLog.findMany({
where: { eventName: eventName.value, eventTime: { gte: startTime, lt: endTime } },
orderBy: { eventTime: 'desc' },
take: limit,
});
return records.map((r) => this.mapper.toDomain(r));
}
}

View File

@ -0,0 +1,32 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { IOnlineSnapshotRepository } from '../../../domain/repositories/online-snapshot.repository.interface';
import { OnlineSnapshot } from '../../../domain/entities/online-snapshot.entity';
import { OnlineSnapshotMapper } from '../mappers/online-snapshot.mapper';
@Injectable()
export class OnlineSnapshotRepositoryImpl implements IOnlineSnapshotRepository {
constructor(
private readonly prisma: PrismaService,
private readonly mapper: OnlineSnapshotMapper,
) {}
async insert(snapshot: OnlineSnapshot): Promise<OnlineSnapshot> {
const data = this.mapper.toPersistence(snapshot);
const created = await this.prisma.onlineSnapshot.create({ data });
return this.mapper.toDomain(created);
}
async findByTimeRange(startTime: Date, endTime: Date, interval?: string): Promise<OnlineSnapshot[]> {
const records = await this.prisma.onlineSnapshot.findMany({
where: { ts: { gte: startTime, lte: endTime } },
orderBy: { ts: 'asc' },
});
return records.map((r) => this.mapper.toDomain(r));
}
async findLatest(): Promise<OnlineSnapshot | null> {
const record = await this.prisma.onlineSnapshot.findFirst({ orderBy: { ts: 'desc' } });
return record ? this.mapper.toDomain(record) : null;
}
}

View File

@ -0,0 +1,33 @@
import { Injectable } from '@nestjs/common';
import { RedisService } from './redis.service';
@Injectable()
export class PresenceRedisRepository {
private readonly ONLINE_USERS_KEY = 'presence:online_users';
constructor(private readonly redisService: RedisService) {}
async updateUserPresence(userId: string, timestamp: number): Promise<void> {
await this.redisService.zadd(this.ONLINE_USERS_KEY, timestamp, userId);
}
async countOnlineUsers(thresholdTimestamp: number): Promise<number> {
return this.redisService.zcount(this.ONLINE_USERS_KEY, thresholdTimestamp, '+inf');
}
async getOnlineUsers(thresholdTimestamp: number, limit?: number): Promise<string[]> {
if (limit) {
return this.redisService.zrangebyscore(this.ONLINE_USERS_KEY, thresholdTimestamp, '+inf', 'LIMIT', 0, limit);
}
return this.redisService.zrangebyscore(this.ONLINE_USERS_KEY, thresholdTimestamp, '+inf');
}
async removeExpiredUsers(thresholdTimestamp: number): Promise<number> {
return this.redisService.zremrangebyscore(this.ONLINE_USERS_KEY, '-inf', thresholdTimestamp);
}
async getUserLastHeartbeat(userId: string): Promise<number | null> {
const score = await this.redisService.zscore(this.ONLINE_USERS_KEY, userId);
return score ? Number(score) : null;
}
}

View File

@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { RedisService } from './redis.service';
import { PresenceRedisRepository } from './presence-redis.repository';
@Module({
providers: [RedisService, PresenceRedisRepository],
exports: [RedisService, PresenceRedisRepository],
})
export class RedisModule {}

View File

@ -0,0 +1,59 @@
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import Redis from 'ioredis';
@Injectable()
export class RedisService implements OnModuleDestroy {
private readonly client: Redis;
constructor(private readonly configService: ConfigService) {
this.client = new Redis({
host: this.configService.get<string>('REDIS_HOST', 'localhost'),
port: this.configService.get<number>('REDIS_PORT', 6379),
password: this.configService.get<string>('REDIS_PASSWORD'),
db: this.configService.get<number>('REDIS_DB', 10),
});
}
async onModuleDestroy(): Promise<void> {
await this.client.quit();
}
async zadd(key: string, score: number, member: string): Promise<number> {
return this.client.zadd(key, score, member);
}
async zcount(key: string, min: number | string, max: number | string): Promise<number> {
return this.client.zcount(key, min, max);
}
async zrangebyscore(
key: string,
min: number | string,
max: number | string,
limit?: 'LIMIT',
offset?: number,
count?: number,
): Promise<string[]> {
if (limit && offset !== undefined && count !== undefined) {
return this.client.zrangebyscore(key, min, max, limit, offset, count);
}
return this.client.zrangebyscore(key, min, max);
}
async zremrangebyscore(key: string, min: number | string, max: number | string): Promise<number> {
return this.client.zremrangebyscore(key, min, max);
}
async zscore(key: string, member: string): Promise<string | null> {
return this.client.zscore(key, member);
}
async pfadd(key: string, ...elements: string[]): Promise<number> {
return this.client.pfadd(key, ...elements);
}
async expire(key: string, seconds: number): Promise<number> {
return this.client.expire(key, seconds);
}
}

View File

@ -0,0 +1,40 @@
import { NestFactory } from '@nestjs/core';
import { ValidationPipe, Logger } from '@nestjs/common';
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';
import { AppModule } from './app.module';
import { GlobalExceptionFilter } from './shared/filters/global-exception.filter';
import { LoggingInterceptor } from './shared/interceptors/logging.interceptor';
async function bootstrap() {
const logger = new Logger('Bootstrap');
const app = await NestFactory.create(AppModule);
app.useGlobalFilters(new GlobalExceptionFilter());
app.useGlobalInterceptors(new LoggingInterceptor());
app.useGlobalPipes(
new ValidationPipe({
whitelist: true,
transform: true,
forbidNonWhitelisted: true,
}),
);
const apiPrefix = process.env.API_PREFIX || 'api/v1';
app.setGlobalPrefix(apiPrefix);
const config = new DocumentBuilder()
.setTitle('Presence & Analytics Service API')
.setDescription('用户活跃度与在线状态服务')
.setVersion('1.0')
.addBearerAuth()
.build();
const document = SwaggerModule.createDocument(app, config);
SwaggerModule.setup(`${apiPrefix}/docs`, app, document);
const port = parseInt(process.env.APP_PORT || '3011', 10);
await app.listen(port);
logger.log(`Presence Service running on: http://localhost:${port}/${apiPrefix}`);
}
bootstrap();

View File

@ -0,0 +1,9 @@
import { createParamDecorator, ExecutionContext } from '@nestjs/common';
export const CurrentUser = createParamDecorator(
(data: string | undefined, ctx: ExecutionContext) => {
const request = ctx.switchToHttp().getRequest();
const user = request.user;
return data ? user?.[data] : user;
},
);

View File

@ -0,0 +1,4 @@
import { SetMetadata } from '@nestjs/common';
export const IS_PUBLIC_KEY = 'isPublic';
export const Public = () => SetMetadata(IS_PUBLIC_KEY, true);

View File

@ -0,0 +1,6 @@
export class ApplicationException extends Error {
constructor(message: string) {
super(message);
this.name = 'ApplicationException';
}
}

View File

@ -0,0 +1,6 @@
export class DomainException extends Error {
constructor(message: string) {
super(message);
this.name = 'DomainException';
}
}

View File

@ -0,0 +1,61 @@
import {
ExceptionFilter,
Catch,
ArgumentsHost,
HttpException,
HttpStatus,
Logger,
} from '@nestjs/common';
import { Request, Response } from 'express';
import { DomainException } from '../exceptions/domain.exception';
import { ApplicationException } from '../exceptions/application.exception';
@Catch()
export class GlobalExceptionFilter implements ExceptionFilter {
private readonly logger = new Logger(GlobalExceptionFilter.name);
catch(exception: unknown, host: ArgumentsHost): void {
const ctx = host.switchToHttp();
const response = ctx.getResponse<Response>();
const request = ctx.getRequest<Request>();
let statusCode = HttpStatus.INTERNAL_SERVER_ERROR;
let message = 'Internal server error';
let error = 'Internal Server Error';
if (exception instanceof HttpException) {
statusCode = exception.getStatus();
const exceptionResponse = exception.getResponse();
message = typeof exceptionResponse === 'string'
? exceptionResponse
: (exceptionResponse as any).message || message;
error = HttpStatus[statusCode] || error;
} else if (exception instanceof DomainException) {
statusCode = HttpStatus.UNPROCESSABLE_ENTITY;
message = exception.message;
error = 'Domain Error';
} else if (exception instanceof ApplicationException) {
statusCode = HttpStatus.BAD_REQUEST;
message = exception.message;
error = 'Application Error';
} else if (exception instanceof Error) {
message = process.env.NODE_ENV === 'production' ? 'Internal server error' : exception.message;
}
if (statusCode >= 500) {
this.logger.error(`[${request.method}] ${request.url} - ${statusCode} - ${message}`,
exception instanceof Error ? exception.stack : undefined);
} else if (statusCode >= 400) {
this.logger.warn(`[${request.method}] ${request.url} - ${statusCode} - ${message}`);
}
response.status(statusCode).json({
statusCode,
timestamp: new Date().toISOString(),
path: request.url,
method: request.method,
message,
error,
});
}
}

View File

@ -0,0 +1 @@
export * from './global-exception.filter';

View File

@ -0,0 +1,38 @@
import { Injectable, CanActivate, ExecutionContext, UnauthorizedException, ForbiddenException } from '@nestjs/common';
import { JwtService } from '@nestjs/jwt';
/**
* Guard JWT roles 'admin'
* online-countonline-historydau
*/
@Injectable()
export class AdminGuard implements CanActivate {
constructor(private readonly jwtService: JwtService) {}
async canActivate(context: ExecutionContext): Promise<boolean> {
const request = context.switchToHttp().getRequest();
const token = this.extractTokenFromHeader(request);
if (!token) throw new UnauthorizedException('缺少认证令牌');
try {
const payload = await this.jwtService.verifyAsync(token);
const roles: string[] = payload.roles ?? [];
if (!roles.includes('admin')) throw new ForbiddenException('需要管理员权限');
request.user = {
userId: payload.sub,
email: payload.email,
roles,
};
} catch (e) {
if (e instanceof ForbiddenException) throw e;
throw new UnauthorizedException('令牌无效或已过期');
}
return true;
}
private extractTokenFromHeader(request: any): string | undefined {
const [type, token] = request.headers.authorization?.split(' ') ?? [];
return type === 'Bearer' ? token : undefined;
}
}

View File

@ -0,0 +1,44 @@
import { Injectable, CanActivate, ExecutionContext, UnauthorizedException } from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { JwtService } from '@nestjs/jwt';
import { IS_PUBLIC_KEY } from '../decorators/public.decorator';
@Injectable()
export class JwtAuthGuard implements CanActivate {
constructor(
private readonly jwtService: JwtService,
private readonly reflector: Reflector,
) {}
async canActivate(context: ExecutionContext): Promise<boolean> {
const isPublic = this.reflector.getAllAndOverride<boolean>(IS_PUBLIC_KEY, [
context.getHandler(),
context.getClass(),
]);
if (isPublic) return true;
const request = context.switchToHttp().getRequest();
const token = this.extractTokenFromHeader(request);
if (!token) throw new UnauthorizedException('缺少认证令牌');
try {
const payload = await this.jwtService.verifyAsync(token);
if (payload.type && payload.type !== 'access') throw new UnauthorizedException('无效的令牌类型');
request.user = {
userId: payload.sub,
email: payload.email,
tenantId: payload.tenantId,
roles: payload.roles ?? [],
};
} catch {
throw new UnauthorizedException('令牌无效或已过期');
}
return true;
}
private extractTokenFromHeader(request: any): string | undefined {
const [type, token] = request.headers.authorization?.split(' ') ?? [];
return type === 'Bearer' ? token : undefined;
}
}

View File

@ -0,0 +1 @@
export * from './logging.interceptor';

View File

@ -0,0 +1,29 @@
import { Injectable, NestInterceptor, ExecutionContext, CallHandler, Logger } from '@nestjs/common';
import { Observable } from 'rxjs';
import { tap, catchError } from 'rxjs/operators';
import { Request, Response } from 'express';
@Injectable()
export class LoggingInterceptor implements NestInterceptor {
private readonly logger = new Logger('HTTP');
intercept(context: ExecutionContext, next: CallHandler): Observable<unknown> {
const ctx = context.switchToHttp();
const request = ctx.getRequest<Request>();
const response = ctx.getResponse<Response>();
const startTime = Date.now();
const { method, url } = request;
this.logger.log(`${method} ${url}`);
return next.handle().pipe(
tap(() => {
this.logger.log(`${method} ${url} - ${response.statusCode} - ${Date.now() - startTime}ms`);
}),
catchError((error) => {
this.logger.error(`${method} ${url} - ${error.status || 500} - ${Date.now() - startTime}ms - ${error.message}`);
throw error;
}),
);
}
}

View File

@ -0,0 +1,21 @@
import { format, startOfDay, endOfDay } from 'date-fns';
import { utcToZonedTime, zonedTimeToUtc } from 'date-fns-tz';
const DEFAULT_TIMEZONE = 'Asia/Shanghai';
export function startOfDayInTimezone(date: Date, timezone: string = DEFAULT_TIMEZONE): Date {
const zonedDate = utcToZonedTime(date, timezone);
const start = startOfDay(zonedDate);
return zonedTimeToUtc(start, timezone);
}
export function endOfDayInTimezone(date: Date, timezone: string = DEFAULT_TIMEZONE): Date {
const zonedDate = utcToZonedTime(date, timezone);
const end = endOfDay(zonedDate);
return zonedTimeToUtc(end, timezone);
}
export function formatToDateKey(date: Date, timezone: string = DEFAULT_TIMEZONE): string {
const zonedDate = utcToZonedTime(date, timezone);
return format(zonedDate, 'yyyy-MM-dd');
}

View File

@ -0,0 +1,23 @@
{
"compilerOptions": {
"module": "commonjs",
"declaration": true,
"removeComments": true,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"allowSyntheticDefaultImports": true,
"target": "ES2021",
"sourceMap": true,
"outDir": "./dist",
"baseUrl": "./",
"incremental": true,
"skipLibCheck": true,
"strictNullChecks": false,
"noImplicitAny": false,
"strictBindCallApply": false,
"forceConsistentCasingInFileNames": false,
"noFallthroughCasesInSwitch": false
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "test"]
}

View File

@ -0,0 +1,60 @@
-- IT0 Presence & Analytics Tables (public schema)
-- Telemetry event log, device profiles, DAU stats, online snapshots
CREATE TABLE IF NOT EXISTS presence_event_log (
id BIGSERIAL PRIMARY KEY,
user_id VARCHAR(36),
install_id VARCHAR(64) NOT NULL,
event_name VARCHAR(64) NOT NULL,
event_time TIMESTAMPTZ NOT NULL,
device_brand VARCHAR(64),
device_model VARCHAR(64),
device_os VARCHAR(32),
app_version VARCHAR(32),
locale VARCHAR(16),
properties JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_presence_event_log_event_time ON presence_event_log (event_time);
CREATE INDEX IF NOT EXISTS idx_presence_event_log_event_name ON presence_event_log (event_name);
CREATE INDEX IF NOT EXISTS idx_presence_event_log_event_name_time ON presence_event_log (event_name, event_time);
CREATE INDEX IF NOT EXISTS idx_presence_event_log_user_id ON presence_event_log (user_id);
CREATE INDEX IF NOT EXISTS idx_presence_event_log_device_brand ON presence_event_log (device_brand);
CREATE INDEX IF NOT EXISTS idx_presence_event_log_app_version ON presence_event_log (app_version);
CREATE TABLE IF NOT EXISTS presence_device_profile (
install_id VARCHAR(64) PRIMARY KEY,
user_id VARCHAR(36),
device_brand VARCHAR(64),
device_model VARCHAR(64),
device_os VARCHAR(32),
app_version VARCHAR(32),
locale VARCHAR(16),
first_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
event_count INTEGER NOT NULL DEFAULT 1
);
CREATE INDEX IF NOT EXISTS idx_presence_device_profile_brand ON presence_device_profile (device_brand);
CREATE INDEX IF NOT EXISTS idx_presence_device_profile_app_version ON presence_device_profile (app_version);
CREATE INDEX IF NOT EXISTS idx_presence_device_profile_user_id ON presence_device_profile (user_id);
CREATE INDEX IF NOT EXISTS idx_presence_device_profile_last_seen ON presence_device_profile (last_seen_at DESC);
CREATE TABLE IF NOT EXISTS presence_daily_active_users (
day DATE PRIMARY KEY,
dau_count INTEGER NOT NULL,
dau_by_province JSONB,
dau_by_city JSONB,
calculated_at TIMESTAMPTZ NOT NULL,
version INTEGER NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS presence_online_snapshots (
id BIGSERIAL PRIMARY KEY,
ts TIMESTAMPTZ NOT NULL UNIQUE,
online_count INTEGER NOT NULL,
window_seconds INTEGER NOT NULL DEFAULT 300
);
CREATE INDEX IF NOT EXISTS idx_presence_online_snapshots_ts ON presence_online_snapshots (ts DESC);

View File

@ -70,6 +70,10 @@ async function runSharedSchema(client: Client) {
log('Running 005-create-billing-tables.sql ...');
await runSqlFile(client, path.join(MIGRATIONS_DIR, '005-create-billing-tables.sql'));
log('Billing tables created.');
log('Running 010-create-presence-tables.sql ...');
await runSqlFile(client, path.join(MIGRATIONS_DIR, '010-create-presence-tables.sql'));
log('Presence tables created.');
}
async function runTenantSchema(client: Client, tenantId: string) {