From 8d2fd3335a3d6eb60922f17aadd3729a1a1a323a Mon Sep 17 00:00:00 2001 From: hailin Date: Sat, 7 Mar 2026 17:44:01 -0800 Subject: [PATCH] feat(telemetry): add presence-service + Flutter telemetry module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Backend — packages/services/presence-service (新微服务) 完整的 DDD + Clean Architecture 实现,移植自 RWADurian presence-service, 针对 IT0 架构做了以下适配: ### 核心功能 - 心跳接口: POST /api/v1/presence/heartbeat(JWT 验证,60s 间隔) → Redis Sorted Set `presence:online_users` 记录在线时间戳 → 默认 5 分钟窗口判断在线(PRESENCE_WINDOW_SECONDS=300) - 事件上报: POST /api/v1/analytics/events(批量,最多 50 条) → 写入 presence_event_log 表 + 更新 presence_device_profile → Redis HyperLogLog `presence:dau:{date}` 实时 DAU 估算 - 查询接口(需 AdminGuard): - GET /api/v1/analytics/online-count — 实时在线人数 - GET /api/v1/analytics/online-history — 历史在线快照 - GET /api/v1/analytics/dau — DAU 统计 ### IT0 适配要点 - JWT payload: `sub` = UUID userId(非 RWADurian 的 userSerialNum) → JwtAuthGuard: request.user = { userId: payload.sub, roles, tenantId } - AdminGuard: 改为检查 `roles.includes('admin')`(非 type==='admin') - 移除 Kafka EventPublisherService(IT0 无 Kafka) - 移除 Prometheus MetricsService(IT0 无 Prometheus) - 表前缀改为 `presence_`(避免与其他服务冲突) - userId 字段 VarChar(36)(UUID 格式,非原来的 VarChar(20)) - Redis DB=10 隔离(独立 key 空间) ### 数据库表(public schema) - presence_event_log — 事件流水(append-only) - presence_device_profile — 设备快照(upsert,每台设备一行) - presence_daily_active_users — DAU 日统计 - presence_online_snapshots — 在线人数每分钟快照 ### 定时任务(@nestjs/schedule) - 每分钟: 采集在线人数快照 → presence_online_snapshots - 每天 01:05 (UTC+8): 计算前一天 DAU → presence_daily_active_users --- ## Flutter — it0_app/lib/core/telemetry (新模块) ### 文件结构 - telemetry_service.dart — 单例入口,统筹所有组件 - models/telemetry_event.dart — 事件模型,toServerJson() 将设备字段提升为顶层列 - models/device_context.dart — 设备上下文(Android/iOS 信息) - models/telemetry_config.dart — 远程配置(采样率/开关,支持远端同步) - collectors/device_info_collector.dart — 采集 device_info_plus 设备信息 - storage/telemetry_storage.dart — SharedPreferences 队列(最多 500 条) - uploader/telemetry_uploader.dart — 批量上传到 /api/v1/analytics/events - session/session_manager.dart — WidgetsBindingObserver 监听前后台切换 - session/session_events.dart — 会话事件常量 - presence/heartbeat_service.dart — 定时心跳 POST /api/v1/presence/heartbeat - presence/presence_config.dart — 心跳配置(间隔/requiresAuth) - telemetry.dart — barrel 导出 ### 集成点 - app_router.dart _tryRestore(): TelemetryService().initialize() 在 auth 之前 - auth_provider.dart login/loginWithOtp: setUserId + setAccessToken + resumeAfterLogin - auth_provider.dart tryRestoreSession: 恢复 userId + accessToken - auth_provider.dart logout: pauseForLogout + clearUserId + clearAccessToken ### 新增依赖 - device_info_plus: ^10.1.0 - equatable: ^2.0.5 --- ## 基础设施 ### Dockerfile.service - 在 builder 和 production 阶段均添加 presence-service/package.json 的 COPY ### docker-compose.yml - 新增 presence-service 容器(端口 3011/13011) - DATABASE_URL: postgresql://... (Prisma 所需连接串格式) - REDIS_HOST/PORT/DB: 10(presence 独立 Redis DB) - APP_PORT=3011, JWT_SECRET, PRESENCE_WINDOW_SECONDS=300 - api-gateway depends_on 新增 presence-service ### kong.yml (dbless 声明式) - 新增 presence-service 服务(http://presence-service:3011) - presence-routes: /api/v1/presence - analytics-routes: /api/v1/analytics - 对整个 presence-service 启用 JWT 插件(Kong 层鉴权) ### DB 迁移 - packages/shared/database/src/migrations/010-create-presence-tables.sql — 4 张 presence_ 前缀表 + 完整索引(IF NOT EXISTS 幂等) - run-migrations.ts: runSharedSchema() 中新增执行 010-create-presence-tables.sql --- ## 部署步骤(服务器) 1. git pull 2. 执行 presence 表迁移(首次): docker exec it0-postgres psql -U it0 -d it0 \ -f /path/to/010-create-presence-tables.sql 或通过 migration runner: cd /home/ceshi/it0 && node packages/shared/database/dist/run-migrations.js 3. 重建并启动 presence-service: docker compose build presence-service api-gateway docker compose up -d presence-service api-gateway Co-Authored-By: Claude Sonnet 4.6 --- Dockerfile.service | 2 + deploy/docker/docker-compose.yml | 35 +++ it0_app/lib/core/router/app_router.dart | 8 + .../collectors/device_info_collector.dart | 102 ++++++++ .../core/telemetry/models/device_context.dart | 135 +++++++++++ .../telemetry/models/telemetry_config.dart | 114 +++++++++ .../telemetry/models/telemetry_event.dart | 109 +++++++++ .../telemetry/presence/heartbeat_service.dart | 140 +++++++++++ .../telemetry/presence/presence_config.dart | 39 +++ .../telemetry/session/session_events.dart | 9 + .../telemetry/session/session_manager.dart | 106 +++++++++ .../telemetry/storage/telemetry_storage.dart | 88 +++++++ it0_app/lib/core/telemetry/telemetry.dart | 8 + .../lib/core/telemetry/telemetry_service.dart | 224 ++++++++++++++++++ .../uploader/telemetry_uploader.dart | 88 +++++++ .../auth/data/providers/auth_provider.dart | 18 ++ it0_app/pubspec.yaml | 4 + packages/gateway/config/kong.yml | 19 ++ .../services/presence-service/nest-cli.json | 8 + .../services/presence-service/package.json | 38 +++ .../20250307000000_init/migration.sql | 91 +++++++ .../prisma/migrations/migration_lock.toml | 3 + .../presence-service/prisma/schema.prisma | 81 +++++++ .../presence-service/src/api/api.module.ts | 14 ++ .../api/controllers/analytics.controller.ts | 35 +++ .../src/api/controllers/health.controller.ts | 11 + .../api/controllers/presence.controller.ts | 56 +++++ .../src/api/dto/request/batch-events.dto.ts | 60 +++++ .../src/api/dto/request/heartbeat.dto.ts | 16 ++ .../src/api/dto/request/query-dau.dto.ts | 12 + .../dto/request/query-online-history.dto.ts | 17 ++ .../presence-service/src/app.module.ts | 30 +++ .../src/application/application.module.ts | 26 ++ .../calculate-dau/calculate-dau.command.ts | 3 + .../calculate-dau/calculate-dau.handler.ts | 48 ++++ .../record-events/record-events.command.ts | 16 ++ .../record-events/record-events.handler.ts | 97 ++++++++ .../record-heartbeat.command.ts | 8 + .../record-heartbeat.handler.ts | 21 ++ .../get-dau-stats/get-dau-stats.handler.ts | 27 +++ .../get-dau-stats/get-dau-stats.query.ts | 6 + .../get-online-count.handler.ts | 27 +++ .../get-online-count.query.ts | 1 + .../get-online-history.handler.ts | 100 ++++++++ .../get-online-history.query.ts | 9 + .../schedulers/analytics.scheduler.ts | 70 ++++++ .../daily-active-stats.aggregate.ts | 61 +++++ .../src/domain/domain.module.ts | 9 + .../domain/entities/device-profile.entity.ts | 32 +++ .../src/domain/entities/event-log.entity.ts | 95 ++++++++ .../domain/entities/online-snapshot.entity.ts | 33 +++ ...daily-active-stats.repository.interface.ts | 9 + .../device-profile.repository.interface.ts | 7 + .../event-log.repository.interface.ts | 17 ++ .../online-snapshot.repository.interface.ts | 9 + .../services/dau-calculation.service.ts | 15 ++ .../services/online-detection.service.ts | 24 ++ .../src/domain/value-objects/event-name.vo.ts | 43 ++++ .../value-objects/event-properties.vo.ts | 41 ++++ .../src/domain/value-objects/install-id.vo.ts | 31 +++ .../domain/value-objects/time-window.vo.ts | 27 +++ .../infrastructure/infrastructure.module.ts | 37 +++ .../mappers/daily-active-stats.mapper.ts | 34 +++ .../persistence/mappers/event-log.mapper.ts | 41 ++++ .../mappers/online-snapshot.mapper.ts | 23 ++ .../persistence/prisma/prisma.service.ts | 13 + .../daily-active-stats.repository.impl.ts | 35 +++ .../device-profile.repository.impl.ts | 40 ++++ .../repositories/event-log.repository.impl.ts | 77 ++++++ .../online-snapshot.repository.impl.ts | 32 +++ .../redis/presence-redis.repository.ts | 33 +++ .../src/infrastructure/redis/redis.module.ts | 9 + .../src/infrastructure/redis/redis.service.ts | 59 +++++ .../services/presence-service/src/main.ts | 40 ++++ .../decorators/current-user.decorator.ts | 9 + .../src/shared/decorators/public.decorator.ts | 4 + .../exceptions/application.exception.ts | 6 + .../src/shared/exceptions/domain.exception.ts | 6 + .../shared/filters/global-exception.filter.ts | 61 +++++ .../src/shared/filters/index.ts | 1 + .../src/shared/guards/admin.guard.ts | 38 +++ .../src/shared/guards/jwt-auth.guard.ts | 44 ++++ .../src/shared/interceptors/index.ts | 1 + .../interceptors/logging.interceptor.ts | 29 +++ .../src/shared/utils/timezone.util.ts | 21 ++ .../services/presence-service/tsconfig.json | 23 ++ .../migrations/010-create-presence-tables.sql | 60 +++++ .../shared/database/src/run-migrations.ts | 4 + 88 files changed, 3412 insertions(+) create mode 100644 it0_app/lib/core/telemetry/collectors/device_info_collector.dart create mode 100644 it0_app/lib/core/telemetry/models/device_context.dart create mode 100644 it0_app/lib/core/telemetry/models/telemetry_config.dart create mode 100644 it0_app/lib/core/telemetry/models/telemetry_event.dart create mode 100644 it0_app/lib/core/telemetry/presence/heartbeat_service.dart create mode 100644 it0_app/lib/core/telemetry/presence/presence_config.dart create mode 100644 it0_app/lib/core/telemetry/session/session_events.dart create mode 100644 it0_app/lib/core/telemetry/session/session_manager.dart create mode 100644 it0_app/lib/core/telemetry/storage/telemetry_storage.dart create mode 100644 it0_app/lib/core/telemetry/telemetry.dart create mode 100644 it0_app/lib/core/telemetry/telemetry_service.dart create mode 100644 it0_app/lib/core/telemetry/uploader/telemetry_uploader.dart create mode 100644 packages/services/presence-service/nest-cli.json create mode 100644 packages/services/presence-service/package.json create mode 100644 packages/services/presence-service/prisma/migrations/20250307000000_init/migration.sql create mode 100644 packages/services/presence-service/prisma/migrations/migration_lock.toml create mode 100644 packages/services/presence-service/prisma/schema.prisma create mode 100644 packages/services/presence-service/src/api/api.module.ts create mode 100644 packages/services/presence-service/src/api/controllers/analytics.controller.ts create mode 100644 packages/services/presence-service/src/api/controllers/health.controller.ts create mode 100644 packages/services/presence-service/src/api/controllers/presence.controller.ts create mode 100644 packages/services/presence-service/src/api/dto/request/batch-events.dto.ts create mode 100644 packages/services/presence-service/src/api/dto/request/heartbeat.dto.ts create mode 100644 packages/services/presence-service/src/api/dto/request/query-dau.dto.ts create mode 100644 packages/services/presence-service/src/api/dto/request/query-online-history.dto.ts create mode 100644 packages/services/presence-service/src/app.module.ts create mode 100644 packages/services/presence-service/src/application/application.module.ts create mode 100644 packages/services/presence-service/src/application/commands/calculate-dau/calculate-dau.command.ts create mode 100644 packages/services/presence-service/src/application/commands/calculate-dau/calculate-dau.handler.ts create mode 100644 packages/services/presence-service/src/application/commands/record-events/record-events.command.ts create mode 100644 packages/services/presence-service/src/application/commands/record-events/record-events.handler.ts create mode 100644 packages/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.command.ts create mode 100644 packages/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.handler.ts create mode 100644 packages/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.handler.ts create mode 100644 packages/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.query.ts create mode 100644 packages/services/presence-service/src/application/queries/get-online-count/get-online-count.handler.ts create mode 100644 packages/services/presence-service/src/application/queries/get-online-count/get-online-count.query.ts create mode 100644 packages/services/presence-service/src/application/queries/get-online-history/get-online-history.handler.ts create mode 100644 packages/services/presence-service/src/application/queries/get-online-history/get-online-history.query.ts create mode 100644 packages/services/presence-service/src/application/schedulers/analytics.scheduler.ts create mode 100644 packages/services/presence-service/src/domain/aggregates/daily-active-stats/daily-active-stats.aggregate.ts create mode 100644 packages/services/presence-service/src/domain/domain.module.ts create mode 100644 packages/services/presence-service/src/domain/entities/device-profile.entity.ts create mode 100644 packages/services/presence-service/src/domain/entities/event-log.entity.ts create mode 100644 packages/services/presence-service/src/domain/entities/online-snapshot.entity.ts create mode 100644 packages/services/presence-service/src/domain/repositories/daily-active-stats.repository.interface.ts create mode 100644 packages/services/presence-service/src/domain/repositories/device-profile.repository.interface.ts create mode 100644 packages/services/presence-service/src/domain/repositories/event-log.repository.interface.ts create mode 100644 packages/services/presence-service/src/domain/repositories/online-snapshot.repository.interface.ts create mode 100644 packages/services/presence-service/src/domain/services/dau-calculation.service.ts create mode 100644 packages/services/presence-service/src/domain/services/online-detection.service.ts create mode 100644 packages/services/presence-service/src/domain/value-objects/event-name.vo.ts create mode 100644 packages/services/presence-service/src/domain/value-objects/event-properties.vo.ts create mode 100644 packages/services/presence-service/src/domain/value-objects/install-id.vo.ts create mode 100644 packages/services/presence-service/src/domain/value-objects/time-window.vo.ts create mode 100644 packages/services/presence-service/src/infrastructure/infrastructure.module.ts create mode 100644 packages/services/presence-service/src/infrastructure/persistence/mappers/daily-active-stats.mapper.ts create mode 100644 packages/services/presence-service/src/infrastructure/persistence/mappers/event-log.mapper.ts create mode 100644 packages/services/presence-service/src/infrastructure/persistence/mappers/online-snapshot.mapper.ts create mode 100644 packages/services/presence-service/src/infrastructure/persistence/prisma/prisma.service.ts create mode 100644 packages/services/presence-service/src/infrastructure/persistence/repositories/daily-active-stats.repository.impl.ts create mode 100644 packages/services/presence-service/src/infrastructure/persistence/repositories/device-profile.repository.impl.ts create mode 100644 packages/services/presence-service/src/infrastructure/persistence/repositories/event-log.repository.impl.ts create mode 100644 packages/services/presence-service/src/infrastructure/persistence/repositories/online-snapshot.repository.impl.ts create mode 100644 packages/services/presence-service/src/infrastructure/redis/presence-redis.repository.ts create mode 100644 packages/services/presence-service/src/infrastructure/redis/redis.module.ts create mode 100644 packages/services/presence-service/src/infrastructure/redis/redis.service.ts create mode 100644 packages/services/presence-service/src/main.ts create mode 100644 packages/services/presence-service/src/shared/decorators/current-user.decorator.ts create mode 100644 packages/services/presence-service/src/shared/decorators/public.decorator.ts create mode 100644 packages/services/presence-service/src/shared/exceptions/application.exception.ts create mode 100644 packages/services/presence-service/src/shared/exceptions/domain.exception.ts create mode 100644 packages/services/presence-service/src/shared/filters/global-exception.filter.ts create mode 100644 packages/services/presence-service/src/shared/filters/index.ts create mode 100644 packages/services/presence-service/src/shared/guards/admin.guard.ts create mode 100644 packages/services/presence-service/src/shared/guards/jwt-auth.guard.ts create mode 100644 packages/services/presence-service/src/shared/interceptors/index.ts create mode 100644 packages/services/presence-service/src/shared/interceptors/logging.interceptor.ts create mode 100644 packages/services/presence-service/src/shared/utils/timezone.util.ts create mode 100644 packages/services/presence-service/tsconfig.json create mode 100644 packages/shared/database/src/migrations/010-create-presence-tables.sql diff --git a/Dockerfile.service b/Dockerfile.service index 988fc0f..a8dce55 100644 --- a/Dockerfile.service +++ b/Dockerfile.service @@ -26,6 +26,7 @@ COPY packages/services/comm-service/package.json packages/services/comm-service/ COPY packages/services/audit-service/package.json packages/services/audit-service/ COPY packages/services/billing-service/package.json packages/services/billing-service/ COPY packages/services/version-service/package.json packages/services/version-service/ +COPY packages/services/presence-service/package.json packages/services/presence-service/ # Install all dependencies (cached unless package.json changes) RUN pnpm install --frozen-lockfile @@ -64,6 +65,7 @@ COPY --from=builder /app/packages/services/comm-service/package.json packages/se COPY --from=builder /app/packages/services/audit-service/package.json packages/services/audit-service/ COPY --from=builder /app/packages/services/billing-service/package.json packages/services/billing-service/ COPY --from=builder /app/packages/services/version-service/package.json packages/services/version-service/ +COPY --from=builder /app/packages/services/presence-service/package.json packages/services/presence-service/ # Install production dependencies only RUN pnpm install --frozen-lockfile --prod diff --git a/deploy/docker/docker-compose.yml b/deploy/docker/docker-compose.yml index a4a1f71..d73af69 100644 --- a/deploy/docker/docker-compose.yml +++ b/deploy/docker/docker-compose.yml @@ -67,6 +67,8 @@ services: condition: service_healthy version-service: condition: service_healthy + presence-service: + condition: service_healthy healthcheck: test: ["CMD", "kong", "health"] interval: 10s @@ -396,6 +398,39 @@ services: networks: - it0-network + presence-service: + build: + context: ../.. + dockerfile: Dockerfile.service + args: + SERVICE_NAME: presence-service + SERVICE_PORT: 3011 + container_name: it0-presence-service + restart: unless-stopped + ports: + - "13011:3011" + environment: + - DATABASE_URL=postgresql://${POSTGRES_USER:-it0}:${POSTGRES_PASSWORD:-it0_dev}@postgres:5432/${POSTGRES_DB:-it0} + - REDIS_HOST=redis + - REDIS_PORT=6379 + - REDIS_DB=10 + - APP_PORT=3011 + - JWT_SECRET=${JWT_SECRET:-dev-jwt-secret} + - PRESENCE_WINDOW_SECONDS=300 + healthcheck: + test: ["CMD-SHELL", "node -e \"require('http').get('http://localhost:3011/',r=>{process.exit(r.statusCode<500?0:1)}).on('error',()=>process.exit(1))\""] + interval: 30s + timeout: 5s + retries: 3 + start_period: 15s + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + networks: + - it0-network + # ===== LiveKit Infrastructure ===== # NOTE: livekit-server, voice-agent, voice-service use host networking # to eliminate docker-proxy overhead for real-time audio (WebRTC UDP). diff --git a/it0_app/lib/core/router/app_router.dart b/it0_app/lib/core/router/app_router.dart index b6c843c..584d30d 100644 --- a/it0_app/lib/core/router/app_router.dart +++ b/it0_app/lib/core/router/app_router.dart @@ -6,6 +6,8 @@ import '../updater/update_service.dart'; import '../network/connectivity_provider.dart'; import '../widgets/offline_banner.dart'; import '../widgets/floating_robot_fab.dart'; +import '../config/app_config.dart'; +import '../telemetry/telemetry.dart'; import '../../features/auth/data/providers/auth_provider.dart'; import '../../features/auth/presentation/pages/login_page.dart'; import '../../features/home/presentation/pages/home_page.dart'; @@ -198,6 +200,12 @@ class _SplashPageState extends ConsumerState<_SplashPage> { } Future _tryRestore() async { + final config = ref.read(appConfigProvider); + await TelemetryService().initialize( + apiBaseUrl: config.apiBaseUrl, + context: context, + ); + final auth = ref.read(authStateProvider.notifier); final restored = await auth.tryRestoreSession(); if (!mounted) return; diff --git a/it0_app/lib/core/telemetry/collectors/device_info_collector.dart b/it0_app/lib/core/telemetry/collectors/device_info_collector.dart new file mode 100644 index 0000000..ef24560 --- /dev/null +++ b/it0_app/lib/core/telemetry/collectors/device_info_collector.dart @@ -0,0 +1,102 @@ +import 'dart:io'; +import 'package:device_info_plus/device_info_plus.dart'; +import 'package:package_info_plus/package_info_plus.dart'; +import 'package:flutter/material.dart'; +import 'package:flutter/foundation.dart'; +import '../models/device_context.dart'; + +class DeviceInfoCollector { + static DeviceInfoCollector? _instance; + DeviceInfoCollector._(); + factory DeviceInfoCollector() { + _instance ??= DeviceInfoCollector._(); + return _instance!; + } + + DeviceContext? _cachedContext; + + Future collect(BuildContext context) async { + if (_cachedContext != null) return _cachedContext!; + + final deviceInfo = DeviceInfoPlugin(); + final packageInfo = await PackageInfo.fromPlatform(); + final mediaQuery = MediaQuery.of(context); + + DeviceContext result; + + if (Platform.isAndroid) { + final androidInfo = await deviceInfo.androidInfo; + result = DeviceContext( + platform: 'android', + brand: androidInfo.brand, + model: androidInfo.model, + manufacturer: androidInfo.manufacturer, + isPhysicalDevice: androidInfo.isPhysicalDevice, + osVersion: androidInfo.version.release, + sdkInt: androidInfo.version.sdkInt, + androidId: androidInfo.id, + screen: _collectScreenInfo(mediaQuery), + appName: packageInfo.appName, + packageName: packageInfo.packageName, + appVersion: packageInfo.version, + buildNumber: packageInfo.buildNumber, + buildMode: _getBuildMode(), + locale: Platform.localeName, + timezone: DateTime.now().timeZoneName, + isDarkMode: mediaQuery.platformBrightness == Brightness.dark, + networkType: 'unknown', + collectedAt: DateTime.now(), + ); + } else if (Platform.isIOS) { + final iosInfo = await deviceInfo.iosInfo; + result = DeviceContext( + platform: 'ios', + brand: 'Apple', + model: iosInfo.model, + manufacturer: 'Apple', + isPhysicalDevice: iosInfo.isPhysicalDevice, + osVersion: iosInfo.systemVersion, + sdkInt: 0, + androidId: iosInfo.identifierForVendor ?? '', + screen: _collectScreenInfo(mediaQuery), + appName: packageInfo.appName, + packageName: packageInfo.packageName, + appVersion: packageInfo.version, + buildNumber: packageInfo.buildNumber, + buildMode: _getBuildMode(), + locale: Platform.localeName, + timezone: DateTime.now().timeZoneName, + isDarkMode: mediaQuery.platformBrightness == Brightness.dark, + networkType: 'unknown', + collectedAt: DateTime.now(), + ); + } else { + throw UnsupportedError('Unsupported platform'); + } + + _cachedContext = result; + return result; + } + + ScreenInfo _collectScreenInfo(MediaQueryData mediaQuery) { + final size = mediaQuery.size; + final density = mediaQuery.devicePixelRatio; + return ScreenInfo( + widthPx: size.width * density, + heightPx: size.height * density, + density: density, + widthDp: size.width, + heightDp: size.height, + hasNotch: mediaQuery.padding.top > 24, + ); + } + + String _getBuildMode() { + if (kReleaseMode) return 'release'; + if (kProfileMode) return 'profile'; + return 'debug'; + } + + void clearCache() => _cachedContext = null; + DeviceContext? get cachedContext => _cachedContext; +} diff --git a/it0_app/lib/core/telemetry/models/device_context.dart b/it0_app/lib/core/telemetry/models/device_context.dart new file mode 100644 index 0000000..3afa37a --- /dev/null +++ b/it0_app/lib/core/telemetry/models/device_context.dart @@ -0,0 +1,135 @@ +import 'package:equatable/equatable.dart'; + +class ScreenInfo extends Equatable { + final double widthPx; + final double heightPx; + final double density; + final double widthDp; + final double heightDp; + final bool hasNotch; + + const ScreenInfo({ + required this.widthPx, + required this.heightPx, + required this.density, + required this.widthDp, + required this.heightDp, + required this.hasNotch, + }); + + factory ScreenInfo.fromJson(Map json) { + return ScreenInfo( + widthPx: (json['widthPx'] as num).toDouble(), + heightPx: (json['heightPx'] as num).toDouble(), + density: (json['density'] as num).toDouble(), + widthDp: (json['widthDp'] as num).toDouble(), + heightDp: (json['heightDp'] as num).toDouble(), + hasNotch: json['hasNotch'] as bool? ?? false, + ); + } + + Map toJson() => { + 'widthPx': widthPx, + 'heightPx': heightPx, + 'density': density, + 'widthDp': widthDp, + 'heightDp': heightDp, + 'hasNotch': hasNotch, + }; + + @override + List get props => [widthPx, heightPx, density, widthDp, heightDp, hasNotch]; +} + +class DeviceContext extends Equatable { + final String platform; + final String brand; + final String model; + final String manufacturer; + final bool isPhysicalDevice; + final String osVersion; + final int sdkInt; + final String androidId; + final ScreenInfo screen; + final String appName; + final String packageName; + final String appVersion; + final String buildNumber; + final String buildMode; + final String locale; + final String timezone; + final bool isDarkMode; + final String networkType; + final DateTime collectedAt; + + const DeviceContext({ + required this.platform, + required this.brand, + required this.model, + required this.manufacturer, + required this.isPhysicalDevice, + required this.osVersion, + required this.sdkInt, + required this.androidId, + required this.screen, + required this.appName, + required this.packageName, + required this.appVersion, + required this.buildNumber, + required this.buildMode, + required this.locale, + required this.timezone, + required this.isDarkMode, + required this.networkType, + required this.collectedAt, + }); + + factory DeviceContext.fromJson(Map json) { + return DeviceContext( + platform: json['platform'] as String, + brand: json['brand'] as String, + model: json['model'] as String, + manufacturer: json['manufacturer'] as String, + isPhysicalDevice: json['isPhysicalDevice'] as bool, + osVersion: json['osVersion'] as String, + sdkInt: json['sdkInt'] as int, + androidId: json['androidId'] as String, + screen: ScreenInfo.fromJson(json['screen'] as Map), + appName: json['appName'] as String, + packageName: json['packageName'] as String, + appVersion: json['appVersion'] as String, + buildNumber: json['buildNumber'] as String, + buildMode: json['buildMode'] as String, + locale: json['locale'] as String, + timezone: json['timezone'] as String, + isDarkMode: json['isDarkMode'] as bool, + networkType: json['networkType'] as String, + collectedAt: DateTime.parse(json['collectedAt'] as String), + ); + } + + Map toJson() => { + 'platform': platform, + 'brand': brand, + 'model': model, + 'manufacturer': manufacturer, + 'isPhysicalDevice': isPhysicalDevice, + 'osVersion': osVersion, + 'sdkInt': sdkInt, + 'androidId': androidId, + 'screen': screen.toJson(), + 'appName': appName, + 'packageName': packageName, + 'appVersion': appVersion, + 'buildNumber': buildNumber, + 'buildMode': buildMode, + 'locale': locale, + 'timezone': timezone, + 'isDarkMode': isDarkMode, + 'networkType': networkType, + 'collectedAt': collectedAt.toIso8601String(), + }; + + @override + List get props => [platform, brand, model, manufacturer, isPhysicalDevice, osVersion, sdkInt, androidId, screen, appName, packageName, appVersion, buildNumber, buildMode, locale, timezone, isDarkMode, networkType, collectedAt]; +} diff --git a/it0_app/lib/core/telemetry/models/telemetry_config.dart b/it0_app/lib/core/telemetry/models/telemetry_config.dart new file mode 100644 index 0000000..eb4c720 --- /dev/null +++ b/it0_app/lib/core/telemetry/models/telemetry_config.dart @@ -0,0 +1,114 @@ +import 'package:dio/dio.dart'; +import 'package:shared_preferences/shared_preferences.dart'; +import 'package:flutter/foundation.dart'; +import 'telemetry_event.dart'; +import '../presence/presence_config.dart'; + +class TelemetryConfig { + bool globalEnabled = true; + bool errorReportEnabled = true; + bool performanceEnabled = true; + bool userActionEnabled = true; + bool pageViewEnabled = true; + bool sessionEnabled = true; + double samplingRate = 0.1; + List disabledEvents = []; + String configVersion = '1.0.0'; + bool userOptIn = true; + PresenceConfig? presenceConfig; + + static final TelemetryConfig _instance = TelemetryConfig._(); + TelemetryConfig._(); + factory TelemetryConfig() => _instance; + + Future syncFromRemote(String apiBaseUrl) async { + try { + final dio = Dio(BaseOptions( + connectTimeout: const Duration(seconds: 5), + receiveTimeout: const Duration(seconds: 5), + )); + final response = await dio.get('$apiBaseUrl/telemetry/config'); + final data = response.data as Map; + + globalEnabled = data['global_enabled'] ?? true; + errorReportEnabled = data['error_report_enabled'] ?? true; + performanceEnabled = data['performance_enabled'] ?? true; + userActionEnabled = data['user_action_enabled'] ?? true; + pageViewEnabled = data['page_view_enabled'] ?? true; + sessionEnabled = data['session_enabled'] ?? true; + samplingRate = (data['sampling_rate'] ?? 0.1).toDouble(); + disabledEvents = List.from(data['disabled_events'] ?? []); + configVersion = data['version'] ?? '1.0.0'; + + if (data['presence_config'] != null) { + presenceConfig = PresenceConfig.fromJson(data['presence_config']); + } + + await _saveToLocal(); + } catch (e) { + debugPrint('[TelemetryConfig] Remote sync failed (non-blocking): $e'); + await _loadFromLocal(); + } + } + + Future _saveToLocal() async { + final prefs = await SharedPreferences.getInstance(); + await prefs.setBool('telemetry_global_enabled', globalEnabled); + await prefs.setBool('telemetry_error_enabled', errorReportEnabled); + await prefs.setBool('telemetry_performance_enabled', performanceEnabled); + await prefs.setBool('telemetry_user_action_enabled', userActionEnabled); + await prefs.setBool('telemetry_page_view_enabled', pageViewEnabled); + await prefs.setBool('telemetry_session_enabled', sessionEnabled); + await prefs.setDouble('telemetry_sampling_rate', samplingRate); + await prefs.setStringList('telemetry_disabled_events', disabledEvents); + await prefs.setString('telemetry_config_version', configVersion); + } + + Future _loadFromLocal() async { + final prefs = await SharedPreferences.getInstance(); + globalEnabled = prefs.getBool('telemetry_global_enabled') ?? true; + errorReportEnabled = prefs.getBool('telemetry_error_enabled') ?? true; + performanceEnabled = prefs.getBool('telemetry_performance_enabled') ?? true; + userActionEnabled = prefs.getBool('telemetry_user_action_enabled') ?? true; + pageViewEnabled = prefs.getBool('telemetry_page_view_enabled') ?? true; + sessionEnabled = prefs.getBool('telemetry_session_enabled') ?? true; + samplingRate = prefs.getDouble('telemetry_sampling_rate') ?? 0.1; + disabledEvents = prefs.getStringList('telemetry_disabled_events') ?? []; + configVersion = prefs.getString('telemetry_config_version') ?? '1.0.0'; + } + + bool shouldLog(EventType type, String eventName) { + if (!globalEnabled) return false; + if (!userOptIn) return false; + if (disabledEvents.contains(eventName)) return false; + + switch (type) { + case EventType.error: + case EventType.crash: + return errorReportEnabled; + case EventType.performance: + return performanceEnabled; + case EventType.userAction: + return userActionEnabled; + case EventType.pageView: + return pageViewEnabled; + case EventType.apiCall: + return performanceEnabled; + case EventType.session: + return sessionEnabled; + case EventType.presence: + return presenceConfig?.enabled ?? true; + } + } + + Future setUserOptIn(bool optIn) async { + userOptIn = optIn; + final prefs = await SharedPreferences.getInstance(); + await prefs.setBool('telemetry_user_opt_in', optIn); + } + + Future loadUserOptIn() async { + final prefs = await SharedPreferences.getInstance(); + userOptIn = prefs.getBool('telemetry_user_opt_in') ?? true; + } +} diff --git a/it0_app/lib/core/telemetry/models/telemetry_event.dart b/it0_app/lib/core/telemetry/models/telemetry_event.dart new file mode 100644 index 0000000..dc0b946 --- /dev/null +++ b/it0_app/lib/core/telemetry/models/telemetry_event.dart @@ -0,0 +1,109 @@ +import 'package:equatable/equatable.dart'; + +enum EventLevel { debug, info, warning, error, fatal } + +enum EventType { + pageView, + userAction, + apiCall, + performance, + error, + crash, + session, + presence, +} + +class TelemetryEvent extends Equatable { + final String eventId; + final EventType type; + final EventLevel level; + final String name; + final Map? properties; + final DateTime timestamp; + final String? userId; + final String? sessionId; + final String installId; + final String deviceContextId; + + const TelemetryEvent({ + required this.eventId, + required this.type, + required this.level, + required this.name, + this.properties, + required this.timestamp, + this.userId, + this.sessionId, + required this.installId, + required this.deviceContextId, + }); + + factory TelemetryEvent.fromJson(Map json) { + return TelemetryEvent( + eventId: json['eventId'] as String, + type: EventType.values.firstWhere( + (e) => e.name == json['type'], + orElse: () => EventType.userAction, + ), + level: EventLevel.values.firstWhere( + (e) => e.name == json['level'], + orElse: () => EventLevel.info, + ), + name: json['name'] as String, + properties: json['properties'] as Map?, + timestamp: DateTime.parse(json['timestamp'] as String), + userId: json['userId'] as String?, + sessionId: json['sessionId'] as String?, + installId: json['installId'] as String, + deviceContextId: json['deviceContextId'] as String, + ); + } + + Map toJson() { + return { + 'eventId': eventId, + 'type': type.name, + 'level': level.name, + 'name': name, + 'properties': properties, + 'timestamp': timestamp.toIso8601String(), + 'userId': userId, + 'sessionId': sessionId, + 'installId': installId, + 'deviceContextId': deviceContextId, + }; + } + + /// 转换为服务端 API 格式(Amplitude 风格:设备字段提升为顶层独立列) + Map toServerJson() { + final props = Map.from(properties ?? {}); + final deviceBrand = props.remove('device_brand'); + final deviceModel = props.remove('device_model'); + final deviceOs = props.remove('device_os'); + final appVersion = props.remove('app_version'); + final locale = props.remove('locale'); + + return { + 'eventName': name, + 'userId': userId, + 'installId': installId, + 'clientTs': timestamp.millisecondsSinceEpoch ~/ 1000, + 'deviceBrand': deviceBrand, + 'deviceModel': deviceModel, + 'deviceOs': deviceOs, + 'appVersion': appVersion, + 'locale': locale, + 'properties': { + ...props, + 'eventId': eventId, + 'type': type.name, + 'level': level.name, + 'sessionId': sessionId, + 'deviceContextId': deviceContextId, + }, + }; + } + + @override + List get props => [eventId, type, level, name, properties, timestamp, userId, sessionId, installId, deviceContextId]; +} diff --git a/it0_app/lib/core/telemetry/presence/heartbeat_service.dart b/it0_app/lib/core/telemetry/presence/heartbeat_service.dart new file mode 100644 index 0000000..6fe3f04 --- /dev/null +++ b/it0_app/lib/core/telemetry/presence/heartbeat_service.dart @@ -0,0 +1,140 @@ +import 'dart:async'; +import 'package:flutter/foundation.dart'; +import 'package:dio/dio.dart'; +import '../session/session_manager.dart'; +import '../session/session_events.dart'; +import 'presence_config.dart'; + +class HeartbeatService { + static HeartbeatService? _instance; + HeartbeatService._(); + factory HeartbeatService() { + _instance ??= HeartbeatService._(); + return _instance!; + } + + PresenceConfig _config = PresenceConfig.defaultConfig; + Timer? _heartbeatTimer; + bool _isRunning = false; + bool get isRunning => _isRunning; + + DateTime? _lastHeartbeatAt; + DateTime? get lastHeartbeatAt => _lastHeartbeatAt; + + int _heartbeatCount = 0; + int get heartbeatCount => _heartbeatCount; + + String Function()? getInstallId; + String? Function()? getUserId; + String Function()? getAppVersion; + Map Function()? getAuthHeaders; + + late Dio _dio; + bool _isInitialized = false; + + void initialize({ + required String apiBaseUrl, + PresenceConfig? config, + required String Function() getInstallId, + required String? Function() getUserId, + required String Function() getAppVersion, + Map Function()? getAuthHeaders, + }) { + if (_isInitialized) return; + + _config = config ?? PresenceConfig.defaultConfig; + this.getInstallId = getInstallId; + this.getUserId = getUserId; + this.getAppVersion = getAppVersion; + this.getAuthHeaders = getAuthHeaders; + + _dio = Dio(BaseOptions( + baseUrl: apiBaseUrl, + connectTimeout: const Duration(seconds: 5), + receiveTimeout: const Duration(seconds: 5), + )); + + final sessionManager = SessionManager(); + sessionManager.onSessionStart = _onSessionStart; + sessionManager.onSessionEnd = _onSessionEnd; + + if (sessionManager.state == SessionState.foreground) { + _startHeartbeat(); + } + + _isInitialized = true; + debugPrint('[Heartbeat] Initialized, interval: ${_config.heartbeatIntervalSeconds}s'); + } + + void updateConfig(PresenceConfig config) { + final wasRunning = _isRunning; + if (wasRunning) _stopHeartbeat(); + _config = config; + if (wasRunning && _config.enabled) _startHeartbeat(); + } + + void dispose() { + _stopHeartbeat(); + _isInitialized = false; + _instance = null; + } + + void _onSessionStart() { + if (_config.enabled) _startHeartbeat(); + } + + void _onSessionEnd() { + _stopHeartbeat(); + } + + void _startHeartbeat() { + if (_isRunning || !_config.enabled) return; + _isRunning = true; + _heartbeatCount = 0; + _sendHeartbeat(); + _heartbeatTimer = Timer.periodic( + Duration(seconds: _config.heartbeatIntervalSeconds), + (_) => _sendHeartbeat(), + ); + debugPrint('[Heartbeat] Started'); + } + + void _stopHeartbeat() { + _heartbeatTimer?.cancel(); + _heartbeatTimer = null; + _isRunning = false; + debugPrint('[Heartbeat] Stopped (count: $_heartbeatCount)'); + } + + Future _sendHeartbeat() async { + if (_config.requiresAuth && (getUserId?.call() == null)) { + debugPrint('[Heartbeat] Skipped: user not logged in'); + return; + } + + try { + final response = await _dio.post( + '/api/v1/presence/heartbeat', + data: { + 'installId': getInstallId?.call() ?? '', + 'appVersion': getAppVersion?.call() ?? '', + 'clientTs': DateTime.now().millisecondsSinceEpoch ~/ 1000, + }, + options: Options(headers: getAuthHeaders?.call()), + ); + + if (response.statusCode == 200) { + _lastHeartbeatAt = DateTime.now(); + _heartbeatCount++; + debugPrint('[Heartbeat] Sent #$_heartbeatCount'); + } + } on DioException catch (e) { + debugPrint('[Heartbeat] Failed: ${e.message}'); + } catch (e) { + debugPrint('[Heartbeat] Failed: $e'); + } + } + + @visibleForTesting + Future forceHeartbeat() async => _sendHeartbeat(); +} diff --git a/it0_app/lib/core/telemetry/presence/presence_config.dart b/it0_app/lib/core/telemetry/presence/presence_config.dart new file mode 100644 index 0000000..568b522 --- /dev/null +++ b/it0_app/lib/core/telemetry/presence/presence_config.dart @@ -0,0 +1,39 @@ +class PresenceConfig { + final int heartbeatIntervalSeconds; + final bool requiresAuth; + final bool enabled; + + const PresenceConfig({ + this.heartbeatIntervalSeconds = 60, + this.requiresAuth = true, + this.enabled = true, + }); + + static const PresenceConfig defaultConfig = PresenceConfig(); + + factory PresenceConfig.fromJson(Map json) { + return PresenceConfig( + heartbeatIntervalSeconds: json['heartbeat_interval_seconds'] ?? 60, + requiresAuth: json['requires_auth'] ?? true, + enabled: json['presence_enabled'] ?? json['enabled'] ?? true, + ); + } + + Map toJson() => { + 'heartbeat_interval_seconds': heartbeatIntervalSeconds, + 'requires_auth': requiresAuth, + 'presence_enabled': enabled, + }; + + PresenceConfig copyWith({ + int? heartbeatIntervalSeconds, + bool? requiresAuth, + bool? enabled, + }) { + return PresenceConfig( + heartbeatIntervalSeconds: heartbeatIntervalSeconds ?? this.heartbeatIntervalSeconds, + requiresAuth: requiresAuth ?? this.requiresAuth, + enabled: enabled ?? this.enabled, + ); + } +} diff --git a/it0_app/lib/core/telemetry/session/session_events.dart b/it0_app/lib/core/telemetry/session/session_events.dart new file mode 100644 index 0000000..349c35a --- /dev/null +++ b/it0_app/lib/core/telemetry/session/session_events.dart @@ -0,0 +1,9 @@ +class SessionEvents { + static const String sessionStart = 'app_session_start'; + static const String sessionEnd = 'app_session_end'; + static const String heartbeat = 'presence_heartbeat'; + + SessionEvents._(); +} + +enum SessionState { foreground, background, unknown } diff --git a/it0_app/lib/core/telemetry/session/session_manager.dart b/it0_app/lib/core/telemetry/session/session_manager.dart new file mode 100644 index 0000000..6ba6461 --- /dev/null +++ b/it0_app/lib/core/telemetry/session/session_manager.dart @@ -0,0 +1,106 @@ +import 'package:flutter/widgets.dart'; +import 'package:uuid/uuid.dart'; +import '../telemetry_service.dart'; +import '../models/telemetry_event.dart'; +import 'session_events.dart'; + +class SessionManager with WidgetsBindingObserver { + static SessionManager? _instance; + SessionManager._(); + factory SessionManager() { + _instance ??= SessionManager._(); + return _instance!; + } + + String? _currentSessionId; + String? get currentSessionId => _currentSessionId; + + SessionState _state = SessionState.unknown; + SessionState get state => _state; + + DateTime? _sessionStartTime; + + VoidCallback? onSessionStart; + VoidCallback? onSessionEnd; + + TelemetryService? _telemetryService; + bool _isInitialized = false; + + void initialize(TelemetryService telemetryService) { + if (_isInitialized) return; + _telemetryService = telemetryService; + WidgetsBinding.instance.addObserver(this); + _handleForeground(); + _isInitialized = true; + debugPrint('[Session] Manager initialized'); + } + + void dispose() { + WidgetsBinding.instance.removeObserver(this); + _isInitialized = false; + _instance = null; + } + + @override + void didChangeAppLifecycleState(AppLifecycleState state) { + switch (state) { + case AppLifecycleState.resumed: + _handleForeground(); + break; + case AppLifecycleState.paused: + _handleBackground(); + break; + default: + break; + } + } + + void _handleForeground() { + if (_state == SessionState.foreground) return; + _state = SessionState.foreground; + _startNewSession(); + _telemetryService?.flushOnBackground(); + } + + void _handleBackground() { + if (_state == SessionState.background) return; + _state = SessionState.background; + _endCurrentSession(); + _telemetryService?.flushOnBackground(); + } + + void _startNewSession() { + _currentSessionId = const Uuid().v4(); + _sessionStartTime = DateTime.now(); + _telemetryService?.logEvent( + SessionEvents.sessionStart, + type: EventType.session, + level: EventLevel.info, + properties: {'session_id': _currentSessionId}, + ); + onSessionStart?.call(); + debugPrint('[Session] Started: $_currentSessionId'); + } + + void _endCurrentSession() { + if (_currentSessionId == null) return; + final duration = _sessionStartTime != null + ? DateTime.now().difference(_sessionStartTime!).inSeconds + : 0; + _telemetryService?.logEvent( + SessionEvents.sessionEnd, + type: EventType.session, + level: EventLevel.info, + properties: {'session_id': _currentSessionId, 'duration_seconds': duration}, + ); + onSessionEnd?.call(); + debugPrint('[Session] Ended: $_currentSessionId (${duration}s)'); + _currentSessionId = null; + _sessionStartTime = null; + } + + int get sessionDurationSeconds { + if (_sessionStartTime == null) return 0; + return DateTime.now().difference(_sessionStartTime!).inSeconds; + } +} diff --git a/it0_app/lib/core/telemetry/storage/telemetry_storage.dart b/it0_app/lib/core/telemetry/storage/telemetry_storage.dart new file mode 100644 index 0000000..95d8589 --- /dev/null +++ b/it0_app/lib/core/telemetry/storage/telemetry_storage.dart @@ -0,0 +1,88 @@ +import 'dart:convert'; +import 'package:shared_preferences/shared_preferences.dart'; +import 'package:flutter/foundation.dart'; +import '../models/telemetry_event.dart'; + +class TelemetryStorage { + static const String _keyEventQueue = 'telemetry_event_queue'; + static const String _keyDeviceContext = 'telemetry_device_context'; + static const String _keyInstallId = 'telemetry_install_id'; + static const int _maxQueueSize = 500; + + late SharedPreferences _prefs; + bool _isInitialized = false; + + Future init() async { + if (_isInitialized) return; + _prefs = await SharedPreferences.getInstance(); + _isInitialized = true; + } + + Future saveDeviceContext(Map context) async { + await _prefs.setString(_keyDeviceContext, jsonEncode(context)); + } + + Map? getDeviceContext() { + final str = _prefs.getString(_keyDeviceContext); + if (str == null) return null; + return jsonDecode(str) as Map; + } + + Future saveInstallId(String installId) async { + await _prefs.setString(_keyInstallId, installId); + } + + String? getInstallId() => _prefs.getString(_keyInstallId); + + Future enqueueEvent(TelemetryEvent event) async { + final queue = _getEventQueue(); + if (queue.length >= _maxQueueSize) { + queue.removeAt(0); + debugPrint('[TelemetryStorage] Queue full, dropped oldest event'); + } + queue.add(event.toJson()); + await _saveEventQueue(queue); + } + + List dequeueEvents(int limit) { + final queue = _getEventQueue(); + final count = queue.length > limit ? limit : queue.length; + return queue.take(count).map((json) => TelemetryEvent.fromJson(json)).toList(); + } + + Future removeEvents(int count) async { + final queue = _getEventQueue(); + if (count >= queue.length) { + await clearEventQueue(); + } else { + queue.removeRange(0, count); + await _saveEventQueue(queue); + } + } + + int getQueueSize() => _getEventQueue().length; + + Future clearEventQueue() async { + await _prefs.remove(_keyEventQueue); + } + + Future clearUserData() async { + if (!_isInitialized) await init(); + await _prefs.remove(_keyEventQueue); + } + + List> _getEventQueue() { + final str = _prefs.getString(_keyEventQueue); + if (str == null) return []; + try { + return (jsonDecode(str) as List).cast>(); + } catch (e) { + debugPrint('[TelemetryStorage] Failed to parse queue: $e'); + return []; + } + } + + Future _saveEventQueue(List> queue) async { + await _prefs.setString(_keyEventQueue, jsonEncode(queue)); + } +} diff --git a/it0_app/lib/core/telemetry/telemetry.dart b/it0_app/lib/core/telemetry/telemetry.dart new file mode 100644 index 0000000..6037bac --- /dev/null +++ b/it0_app/lib/core/telemetry/telemetry.dart @@ -0,0 +1,8 @@ +export 'telemetry_service.dart'; +export 'models/telemetry_event.dart'; +export 'models/device_context.dart'; +export 'models/telemetry_config.dart'; +export 'presence/presence_config.dart'; +export 'presence/heartbeat_service.dart'; +export 'session/session_events.dart'; +export 'session/session_manager.dart'; diff --git a/it0_app/lib/core/telemetry/telemetry_service.dart b/it0_app/lib/core/telemetry/telemetry_service.dart new file mode 100644 index 0000000..ddc62fa --- /dev/null +++ b/it0_app/lib/core/telemetry/telemetry_service.dart @@ -0,0 +1,224 @@ +import 'dart:async'; +import 'dart:math'; +import 'package:uuid/uuid.dart'; +import 'package:flutter/material.dart'; +import 'models/telemetry_event.dart'; +import 'models/device_context.dart'; +import 'models/telemetry_config.dart'; +import 'collectors/device_info_collector.dart'; +import 'storage/telemetry_storage.dart'; +import 'uploader/telemetry_uploader.dart'; +import 'session/session_manager.dart'; +import 'presence/heartbeat_service.dart'; +import 'presence/presence_config.dart'; + +class TelemetryService { + static TelemetryService? _instance; + TelemetryService._(); + factory TelemetryService() { + _instance ??= TelemetryService._(); + return _instance!; + } + + final _storage = TelemetryStorage(); + late TelemetryUploader _uploader; + + DeviceContext? _deviceContext; + + late String _installId; + String get installId => _installId; + + String? _userId; + String? get userId => _userId; + + String? _accessToken; + + bool _isInitialized = false; + bool get isInitialized => _isInitialized; + + late SessionManager _sessionManager; + late HeartbeatService _heartbeatService; + + Timer? _configSyncTimer; + + Future initialize({ + required String apiBaseUrl, + required BuildContext context, + String? userId, + Duration configSyncInterval = const Duration(hours: 1), + PresenceConfig? presenceConfig, + }) async { + if (_isInitialized) return; + + await _storage.init(); + await _initInstallId(); + await TelemetryConfig().loadUserOptIn(); + + TelemetryConfig().syncFromRemote(apiBaseUrl).catchError((e) { + debugPrint('[Telemetry] Remote config sync failed: $e'); + }); + + _deviceContext = await DeviceInfoCollector().collect(context); + await _storage.saveDeviceContext(_deviceContext!.toJson()); + + _userId = userId; + + _uploader = TelemetryUploader( + apiBaseUrl: apiBaseUrl, + storage: _storage, + getAuthHeaders: _getAuthHeaders, + ); + + if (TelemetryConfig().globalEnabled) { + _uploader.startPeriodicUpload(); + } + + _configSyncTimer = Timer.periodic(configSyncInterval, (_) async { + await TelemetryConfig().syncFromRemote(apiBaseUrl); + if (TelemetryConfig().globalEnabled) { + _uploader.startPeriodicUpload(); + } else { + _uploader.stopPeriodicUpload(); + } + if (TelemetryConfig().presenceConfig != null) { + _heartbeatService.updateConfig(TelemetryConfig().presenceConfig!); + } + }); + + _sessionManager = SessionManager(); + _sessionManager.initialize(this); + + _heartbeatService = HeartbeatService(); + _heartbeatService.initialize( + apiBaseUrl: apiBaseUrl, + config: presenceConfig ?? TelemetryConfig().presenceConfig, + getInstallId: () => _installId, + getUserId: () => _userId, + getAppVersion: () => _deviceContext?.appVersion ?? 'unknown', + getAuthHeaders: _getAuthHeaders, + ); + + _isInitialized = true; + debugPrint('[Telemetry] Initialized | installId: $_installId | userId: $_userId'); + } + + Future _initInstallId() async { + final storedId = _storage.getInstallId(); + if (storedId != null) { + _installId = storedId; + } else { + _installId = const Uuid().v4(); + await _storage.saveInstallId(_installId); + } + } + + Map _getAuthHeaders() { + if (_accessToken == null) return {}; + return {'Authorization': 'Bearer $_accessToken'}; + } + + void setAccessToken(String? token) => _accessToken = token; + void clearAccessToken() => _accessToken = null; + + void setUserId(String? userId) { + _userId = userId; + debugPrint('[Telemetry] userId set: $userId'); + } + + void clearUserId() { + _userId = null; + debugPrint('[Telemetry] userId cleared'); + } + + void logEvent( + String eventName, { + EventType type = EventType.userAction, + EventLevel level = EventLevel.info, + Map? properties, + }) { + if (!_isInitialized) return; + if (!TelemetryConfig().shouldLog(type, eventName)) return; + + if (_needsSampling(type)) { + if (Random().nextDouble() > TelemetryConfig().samplingRate) return; + } + + final deviceProps = _deviceContext != null + ? { + 'device_brand': _deviceContext!.brand, + 'device_model': _deviceContext!.model, + 'device_os': _deviceContext!.osVersion, + 'app_version': _deviceContext!.appVersion, + 'locale': _deviceContext!.locale, + } + : {}; + + final event = TelemetryEvent( + eventId: const Uuid().v4(), + type: type, + level: level, + name: eventName, + properties: {...deviceProps, ...?properties}, + timestamp: DateTime.now(), + userId: _userId, + sessionId: _sessionManager.currentSessionId, + installId: _installId, + deviceContextId: _deviceContext?.androidId ?? '', + ); + + _storage.enqueueEvent(event); + _uploader.uploadIfNeeded(); + } + + bool _needsSampling(EventType type) => + type != EventType.error && type != EventType.crash && type != EventType.session; + + void logPageView(String pageName, {Map? extra}) { + logEvent('page_view', type: EventType.pageView, properties: {'page': pageName, ...?extra}); + } + + void logUserAction(String action, {Map? properties}) { + logEvent(action, type: EventType.userAction, properties: properties); + } + + void logError(String errorMessage, {Object? error, StackTrace? stackTrace, Map? extra}) { + logEvent('error_occurred', type: EventType.error, level: EventLevel.error, properties: { + 'message': errorMessage, + 'error': error?.toString(), + 'stack_trace': stackTrace?.toString(), + ...?extra, + }); + } + + Future flushOnBackground() async { + await _uploader.uploadBatch(batchSize: 50); + } + + Future pauseForLogout() async { + _uploader.stopPeriodicUpload(); + await _storage.clearEventQueue(); + _userId = null; + } + + void resumeAfterLogin() { + if (TelemetryConfig().globalEnabled) { + _uploader.startPeriodicUpload(); + } + } + + String? get currentSessionId => _sessionManager.currentSessionId; + int get sessionDurationSeconds => _sessionManager.sessionDurationSeconds; + bool get isHeartbeatRunning => _heartbeatService.isRunning; + int get heartbeatCount => _heartbeatService.heartbeatCount; + DeviceContext? get deviceContext => _deviceContext; + + Future dispose() async { + _configSyncTimer?.cancel(); + _sessionManager.dispose(); + _heartbeatService.dispose(); + await _uploader.forceUploadAll(); + _isInitialized = false; + } + + static void reset() => _instance = null; +} diff --git a/it0_app/lib/core/telemetry/uploader/telemetry_uploader.dart b/it0_app/lib/core/telemetry/uploader/telemetry_uploader.dart new file mode 100644 index 0000000..5158d5b --- /dev/null +++ b/it0_app/lib/core/telemetry/uploader/telemetry_uploader.dart @@ -0,0 +1,88 @@ +import 'dart:async'; +import 'package:dio/dio.dart'; +import 'package:flutter/foundation.dart'; +import '../storage/telemetry_storage.dart'; + +class TelemetryUploader { + final String apiBaseUrl; + final TelemetryStorage storage; + final Dio _dio; + + Timer? _uploadTimer; + bool _isUploading = false; + + Map Function()? getAuthHeaders; + + TelemetryUploader({ + required this.apiBaseUrl, + required this.storage, + this.getAuthHeaders, + }) : _dio = Dio(BaseOptions( + baseUrl: apiBaseUrl, + connectTimeout: const Duration(seconds: 10), + receiveTimeout: const Duration(seconds: 10), + )); + + void startPeriodicUpload({ + Duration interval = const Duration(seconds: 30), + int batchSize = 20, + }) { + _uploadTimer?.cancel(); + _uploadTimer = Timer.periodic(interval, (_) { + uploadIfNeeded(batchSize: batchSize); + }); + } + + void stopPeriodicUpload() { + _uploadTimer?.cancel(); + _uploadTimer = null; + } + + Future uploadIfNeeded({int batchSize = 20}) async { + if (_isUploading) return; + if (storage.getQueueSize() < 10) return; + await uploadBatch(batchSize: batchSize); + } + + Future uploadBatch({int batchSize = 20}) async { + if (_isUploading) return false; + _isUploading = true; + try { + final events = storage.dequeueEvents(batchSize); + if (events.isEmpty) return true; + + final response = await _dio.post( + '/api/v1/analytics/events', + data: {'events': events.map((e) => e.toServerJson()).toList()}, + options: Options(headers: getAuthHeaders?.call()), + ); + + if (response.statusCode == 200) { + await storage.removeEvents(events.length); + debugPrint('[TelemetryUploader] Uploaded ${events.length} events'); + return true; + } + return false; + } on DioException catch (e) { + debugPrint('[TelemetryUploader] Upload error: ${e.message}'); + return false; + } catch (e) { + debugPrint('[TelemetryUploader] Upload error: $e'); + return false; + } finally { + _isUploading = false; + } + } + + Future forceUploadAll() async { + stopPeriodicUpload(); + int retries = 0; + while (storage.getQueueSize() > 0 && retries < 3) { + final success = await uploadBatch(batchSize: 50); + if (!success) { + retries++; + await Future.delayed(const Duration(seconds: 1)); + } + } + } +} diff --git a/it0_app/lib/features/auth/data/providers/auth_provider.dart b/it0_app/lib/features/auth/data/providers/auth_provider.dart index a6ed645..3a7a0d5 100644 --- a/it0_app/lib/features/auth/data/providers/auth_provider.dart +++ b/it0_app/lib/features/auth/data/providers/auth_provider.dart @@ -6,6 +6,7 @@ import 'package:flutter_secure_storage/flutter_secure_storage.dart'; import '../../../../core/config/api_endpoints.dart'; import '../../../../core/config/app_config.dart'; import '../../../../core/errors/error_handler.dart'; +import '../../../../core/telemetry/telemetry.dart'; import '../../../notifications/presentation/providers/notification_providers.dart'; import '../models/auth_response.dart'; import 'tenant_provider.dart'; @@ -95,6 +96,11 @@ class AuthNotifier extends StateNotifier { // Reconnect notifications _connectNotifications(user.tenantId); + // Restore telemetry session + TelemetryService().setUserId(user.id); + TelemetryService().setAccessToken(accessToken); + TelemetryService().resumeAfterLogin(); + return true; } @@ -162,6 +168,9 @@ class AuthNotifier extends StateNotifier { _ref.read(currentTenantIdProvider.notifier).state = authResponse.user.tenantId; } _connectNotifications(authResponse.user.tenantId); + TelemetryService().setUserId(authResponse.user.id); + TelemetryService().setAccessToken(authResponse.accessToken); + TelemetryService().resumeAfterLogin(); return true; } on DioException catch (e) { final message = (e.response?.data is Map) ? e.response?.data['message'] : null; @@ -211,6 +220,10 @@ class AuthNotifier extends StateNotifier { // Connect notification service after login _connectNotifications(authResponse.user.tenantId); + TelemetryService().setUserId(authResponse.user.id); + TelemetryService().setAccessToken(authResponse.accessToken); + TelemetryService().resumeAfterLogin(); + return true; } on DioException catch (e) { final message = @@ -238,6 +251,11 @@ class AuthNotifier extends StateNotifier { // Clear tenant context _ref.read(currentTenantIdProvider.notifier).state = null; + // Pause telemetry before clearing tokens + await TelemetryService().pauseForLogout(); + TelemetryService().clearUserId(); + TelemetryService().clearAccessToken(); + final storage = _ref.read(secureStorageProvider); await storage.delete(key: _keyAccessToken); await storage.delete(key: _keyRefreshToken); diff --git a/it0_app/pubspec.yaml b/it0_app/pubspec.yaml index f55f896..5b9e29f 100644 --- a/it0_app/pubspec.yaml +++ b/it0_app/pubspec.yaml @@ -69,6 +69,10 @@ dependencies: package_info_plus: ^8.0.0 crypto: ^3.0.3 + # Telemetry + device_info_plus: ^10.1.0 + equatable: ^2.0.5 + dev_dependencies: flutter_test: sdk: flutter diff --git a/packages/gateway/config/kong.yml b/packages/gateway/config/kong.yml index a369bed..b9cc9d9 100644 --- a/packages/gateway/config/kong.yml +++ b/packages/gateway/config/kong.yml @@ -141,6 +141,18 @@ services: - /api/v1/billing/webhooks strip_path: false + - name: presence-service + url: http://presence-service:3011 + routes: + - name: presence-routes + paths: + - /api/v1/presence + strip_path: false + - name: analytics-routes + paths: + - /api/v1/analytics + strip_path: false + plugins: # ===== Global plugins (apply to ALL routes) ===== - name: cors @@ -246,6 +258,13 @@ plugins: claims_to_verify: - exp + - name: jwt + service: presence-service + config: + key_claim_name: kid + claims_to_verify: + - exp + - name: jwt route: admin-routes config: diff --git a/packages/services/presence-service/nest-cli.json b/packages/services/presence-service/nest-cli.json new file mode 100644 index 0000000..f9aa683 --- /dev/null +++ b/packages/services/presence-service/nest-cli.json @@ -0,0 +1,8 @@ +{ + "$schema": "https://json.schemastore.org/nest-cli", + "collection": "@nestjs/schematics", + "sourceRoot": "src", + "compilerOptions": { + "deleteOutDir": true + } +} diff --git a/packages/services/presence-service/package.json b/packages/services/presence-service/package.json new file mode 100644 index 0000000..6b027c4 --- /dev/null +++ b/packages/services/presence-service/package.json @@ -0,0 +1,38 @@ +{ + "name": "@it0/presence-service", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "nest build", + "dev": "nest start --watch", + "start": "node dist/main", + "prisma:generate": "prisma generate", + "prisma:migrate": "prisma migrate deploy" + }, + "dependencies": { + "@nestjs/common": "^10.3.0", + "@nestjs/core": "^10.3.0", + "@nestjs/config": "^3.2.0", + "@nestjs/cqrs": "^10.2.7", + "@nestjs/jwt": "^10.2.0", + "@nestjs/platform-express": "^10.3.0", + "@nestjs/schedule": "^4.0.1", + "@nestjs/swagger": "^7.3.0", + "@prisma/client": "^5.9.0", + "class-transformer": "^0.5.1", + "class-validator": "^0.14.1", + "date-fns": "^2.30.0", + "date-fns-tz": "^2.0.1", + "ioredis": "^5.3.0", + "reflect-metadata": "^0.2.0", + "rxjs": "^7.8.0", + "uuid": "^9.0.0" + }, + "devDependencies": { + "@nestjs/cli": "^10.3.0", + "@types/express": "^4.17.21", + "@types/node": "^20.11.0", + "prisma": "^5.9.0", + "typescript": "^5.4.0" + } +} diff --git a/packages/services/presence-service/prisma/migrations/20250307000000_init/migration.sql b/packages/services/presence-service/prisma/migrations/20250307000000_init/migration.sql new file mode 100644 index 0000000..87df650 --- /dev/null +++ b/packages/services/presence-service/prisma/migrations/20250307000000_init/migration.sql @@ -0,0 +1,91 @@ +-- CreateTable +CREATE TABLE "presence_event_log" ( + "id" BIGSERIAL NOT NULL, + "user_id" VARCHAR(36), + "install_id" VARCHAR(64) NOT NULL, + "event_name" VARCHAR(64) NOT NULL, + "event_time" TIMESTAMPTZ NOT NULL, + "device_brand" VARCHAR(64), + "device_model" VARCHAR(64), + "device_os" VARCHAR(32), + "app_version" VARCHAR(32), + "locale" VARCHAR(16), + "properties" JSONB, + "created_at" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "presence_event_log_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "presence_device_profile" ( + "install_id" VARCHAR(64) NOT NULL, + "user_id" VARCHAR(36), + "device_brand" VARCHAR(64), + "device_model" VARCHAR(64), + "device_os" VARCHAR(32), + "app_version" VARCHAR(32), + "locale" VARCHAR(16), + "first_seen_at" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + "last_seen_at" TIMESTAMPTZ NOT NULL, + "event_count" INTEGER NOT NULL DEFAULT 1, + + CONSTRAINT "presence_device_profile_pkey" PRIMARY KEY ("install_id") +); + +-- CreateTable +CREATE TABLE "presence_daily_active_users" ( + "day" DATE NOT NULL, + "dau_count" INTEGER NOT NULL, + "dau_by_province" JSONB, + "dau_by_city" JSONB, + "calculated_at" TIMESTAMPTZ NOT NULL, + "version" INTEGER NOT NULL DEFAULT 1, + + CONSTRAINT "presence_daily_active_users_pkey" PRIMARY KEY ("day") +); + +-- CreateTable +CREATE TABLE "presence_online_snapshots" ( + "id" BIGSERIAL NOT NULL, + "ts" TIMESTAMPTZ NOT NULL, + "online_count" INTEGER NOT NULL, + "window_seconds" INTEGER NOT NULL DEFAULT 300, + + CONSTRAINT "presence_online_snapshots_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "idx_presence_event_log_event_time" ON "presence_event_log"("event_time"); + +-- CreateIndex +CREATE INDEX "idx_presence_event_log_event_name" ON "presence_event_log"("event_name"); + +-- CreateIndex +CREATE INDEX "idx_presence_event_log_event_name_time" ON "presence_event_log"("event_name", "event_time"); + +-- CreateIndex +CREATE INDEX "idx_presence_event_log_user_id" ON "presence_event_log"("user_id"); + +-- CreateIndex +CREATE INDEX "idx_presence_event_log_device_brand" ON "presence_event_log"("device_brand"); + +-- CreateIndex +CREATE INDEX "idx_presence_event_log_app_version" ON "presence_event_log"("app_version"); + +-- CreateIndex +CREATE INDEX "idx_presence_device_profile_brand" ON "presence_device_profile"("device_brand"); + +-- CreateIndex +CREATE INDEX "idx_presence_device_profile_app_version" ON "presence_device_profile"("app_version"); + +-- CreateIndex +CREATE INDEX "idx_presence_device_profile_user_id" ON "presence_device_profile"("user_id"); + +-- CreateIndex +CREATE INDEX "idx_presence_device_profile_last_seen" ON "presence_device_profile"("last_seen_at" DESC); + +-- CreateIndex +CREATE UNIQUE INDEX "presence_online_snapshots_ts_key" ON "presence_online_snapshots"("ts"); + +-- CreateIndex +CREATE INDEX "idx_presence_online_snapshots_ts" ON "presence_online_snapshots"("ts" DESC); diff --git a/packages/services/presence-service/prisma/migrations/migration_lock.toml b/packages/services/presence-service/prisma/migrations/migration_lock.toml new file mode 100644 index 0000000..044d57c --- /dev/null +++ b/packages/services/presence-service/prisma/migrations/migration_lock.toml @@ -0,0 +1,3 @@ +# Please do not edit this file manually +# It should be added in your version-control system (e.g., Git) +provider = "postgresql" diff --git a/packages/services/presence-service/prisma/schema.prisma b/packages/services/presence-service/prisma/schema.prisma new file mode 100644 index 0000000..f2df3dc --- /dev/null +++ b/packages/services/presence-service/prisma/schema.prisma @@ -0,0 +1,81 @@ +// ============================================================================= +// Presence Service - Prisma Schema (IT0) +// Table prefix: presence_ to avoid collision with other IT0 services +// userId: VarChar(36) for UUID format +// ============================================================================= + +generator client { + provider = "prisma-client-js" +} + +datasource db { + provider = "postgresql" + url = env("DATABASE_URL") +} + +// 事件日志表 (append-only) +model EventLog { + id BigInt @id @default(autoincrement()) + userId String? @map("user_id") @db.VarChar(36) + installId String @map("install_id") @db.VarChar(64) + eventName String @map("event_name") @db.VarChar(64) + eventTime DateTime @map("event_time") @db.Timestamptz() + deviceBrand String? @map("device_brand") @db.VarChar(64) + deviceModel String? @map("device_model") @db.VarChar(64) + deviceOs String? @map("device_os") @db.VarChar(32) + appVersion String? @map("app_version") @db.VarChar(32) + locale String? @map("locale") @db.VarChar(16) + properties Json? @db.JsonB + createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz() + + @@index([eventTime], name: "idx_presence_event_log_event_time") + @@index([eventName], name: "idx_presence_event_log_event_name") + @@index([eventName, eventTime], name: "idx_presence_event_log_event_name_time") + @@index([userId], name: "idx_presence_event_log_user_id") + @@index([deviceBrand], name: "idx_presence_event_log_device_brand") + @@index([appVersion], name: "idx_presence_event_log_app_version") + @@map("presence_event_log") +} + +// 设备档案表 (每台设备一行, upsert 更新) +model DeviceProfile { + installId String @id @map("install_id") @db.VarChar(64) + userId String? @map("user_id") @db.VarChar(36) + deviceBrand String? @map("device_brand") @db.VarChar(64) + deviceModel String? @map("device_model") @db.VarChar(64) + deviceOs String? @map("device_os") @db.VarChar(32) + appVersion String? @map("app_version") @db.VarChar(32) + locale String? @map("locale") @db.VarChar(16) + firstSeenAt DateTime @default(now()) @map("first_seen_at") @db.Timestamptz() + lastSeenAt DateTime @updatedAt @map("last_seen_at") @db.Timestamptz() + eventCount Int @default(1) @map("event_count") + + @@index([deviceBrand], name: "idx_presence_device_profile_brand") + @@index([appVersion], name: "idx_presence_device_profile_app_version") + @@index([userId], name: "idx_presence_device_profile_user_id") + @@index([lastSeenAt(sort: Desc)], name: "idx_presence_device_profile_last_seen") + @@map("presence_device_profile") +} + +// 日活统计表 +model DailyActiveStats { + day DateTime @id @map("day") @db.Date + dauCount Int @map("dau_count") + dauByProvince Json? @map("dau_by_province") @db.JsonB + dauByCity Json? @map("dau_by_city") @db.JsonB + calculatedAt DateTime @map("calculated_at") @db.Timestamptz() + version Int @default(1) + + @@map("presence_daily_active_users") +} + +// 在线人数快照表 +model OnlineSnapshot { + id BigInt @id @default(autoincrement()) + ts DateTime @unique @db.Timestamptz() + onlineCount Int @map("online_count") + windowSeconds Int @default(300) @map("window_seconds") + + @@index([ts(sort: Desc)], name: "idx_presence_online_snapshots_ts") + @@map("presence_online_snapshots") +} diff --git a/packages/services/presence-service/src/api/api.module.ts b/packages/services/presence-service/src/api/api.module.ts new file mode 100644 index 0000000..6934f96 --- /dev/null +++ b/packages/services/presence-service/src/api/api.module.ts @@ -0,0 +1,14 @@ +import { Module } from '@nestjs/common'; +import { ApplicationModule } from '../application/application.module'; +import { AnalyticsController } from './controllers/analytics.controller'; +import { PresenceController } from './controllers/presence.controller'; +import { HealthController } from './controllers/health.controller'; +import { JwtAuthGuard } from '../shared/guards/jwt-auth.guard'; +import { AdminGuard } from '../shared/guards/admin.guard'; + +@Module({ + imports: [ApplicationModule], + controllers: [AnalyticsController, PresenceController, HealthController], + providers: [JwtAuthGuard, AdminGuard], +}) +export class ApiModule {} diff --git a/packages/services/presence-service/src/api/controllers/analytics.controller.ts b/packages/services/presence-service/src/api/controllers/analytics.controller.ts new file mode 100644 index 0000000..a3a7fe9 --- /dev/null +++ b/packages/services/presence-service/src/api/controllers/analytics.controller.ts @@ -0,0 +1,35 @@ +import { Controller, Post, Get, Body, Query, UseGuards } from '@nestjs/common'; +import { CommandBus, QueryBus } from '@nestjs/cqrs'; +import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; +import { BatchEventsDto } from '../dto/request/batch-events.dto'; +import { QueryDauDto } from '../dto/request/query-dau.dto'; +import { RecordEventsCommand } from '../../application/commands/record-events/record-events.command'; +import { GetDauStatsQuery } from '../../application/queries/get-dau-stats/get-dau-stats.query'; +import { Public } from '../../shared/decorators/public.decorator'; +import { AdminGuard } from '../../shared/guards/admin.guard'; + +@ApiTags('Analytics') +@Controller('analytics') +export class AnalyticsController { + constructor( + private readonly commandBus: CommandBus, + private readonly queryBus: QueryBus, + ) {} + + @Post('events') + @Public() + @ApiOperation({ summary: '批量上报事件(无需认证)' }) + async batchEvents(@Body() dto: BatchEventsDto) { + return this.commandBus.execute(new RecordEventsCommand(dto.events)); + } + + @Get('dau') + @UseGuards(AdminGuard) + @ApiBearerAuth() + @ApiOperation({ summary: 'DAU 统计(管理员)' }) + async getDauStats(@Query() dto: QueryDauDto) { + return this.queryBus.execute( + new GetDauStatsQuery(new Date(dto.startDate), new Date(dto.endDate)), + ); + } +} diff --git a/packages/services/presence-service/src/api/controllers/health.controller.ts b/packages/services/presence-service/src/api/controllers/health.controller.ts new file mode 100644 index 0000000..a3979ed --- /dev/null +++ b/packages/services/presence-service/src/api/controllers/health.controller.ts @@ -0,0 +1,11 @@ +import { Controller, Get } from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; + +@ApiTags('Health') +@Controller() +export class HealthController { + @Get() + health() { + return { status: 'ok', service: 'presence-service', timestamp: new Date().toISOString() }; + } +} diff --git a/packages/services/presence-service/src/api/controllers/presence.controller.ts b/packages/services/presence-service/src/api/controllers/presence.controller.ts new file mode 100644 index 0000000..5178459 --- /dev/null +++ b/packages/services/presence-service/src/api/controllers/presence.controller.ts @@ -0,0 +1,56 @@ +import { Controller, Post, Get, Body, Query, UseGuards } from '@nestjs/common'; +import { CommandBus, QueryBus } from '@nestjs/cqrs'; +import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; +import { HeartbeatDto } from '../dto/request/heartbeat.dto'; +import { QueryOnlineHistoryDto } from '../dto/request/query-online-history.dto'; +import { RecordHeartbeatCommand } from '../../application/commands/record-heartbeat/record-heartbeat.command'; +import { GetOnlineCountQuery } from '../../application/queries/get-online-count/get-online-count.query'; +import { GetOnlineHistoryQuery } from '../../application/queries/get-online-history/get-online-history.query'; +import { JwtAuthGuard } from '../../shared/guards/jwt-auth.guard'; +import { AdminGuard } from '../../shared/guards/admin.guard'; +import { CurrentUser } from '../../shared/decorators/current-user.decorator'; + +@ApiTags('Presence') +@Controller('presence') +export class PresenceController { + constructor( + private readonly commandBus: CommandBus, + private readonly queryBus: QueryBus, + ) {} + + @Post('heartbeat') + @UseGuards(JwtAuthGuard) + @ApiBearerAuth() + @ApiOperation({ summary: '心跳上报' }) + async heartbeat( + @CurrentUser('userId') userId: string, + @Body() dto: HeartbeatDto, + ) { + return this.commandBus.execute( + new RecordHeartbeatCommand(userId, dto.installId, dto.appVersion, dto.clientTs), + ); + } + + @Get('online-count') + @UseGuards(AdminGuard) + @ApiBearerAuth() + @ApiOperation({ summary: '实时在线人数(管理员)' }) + async getOnlineCount() { + const result = await this.queryBus.execute(new GetOnlineCountQuery()); + return { + count: result.count, + windowSeconds: result.windowSeconds, + queriedAt: result.queriedAt.toISOString(), + }; + } + + @Get('online-history') + @UseGuards(AdminGuard) + @ApiBearerAuth() + @ApiOperation({ summary: '历史在线人数(管理员)' }) + async getOnlineHistory(@Query() dto: QueryOnlineHistoryDto) { + return this.queryBus.execute( + new GetOnlineHistoryQuery(new Date(dto.startTime), new Date(dto.endTime), dto.interval || '5m'), + ); + } +} diff --git a/packages/services/presence-service/src/api/dto/request/batch-events.dto.ts b/packages/services/presence-service/src/api/dto/request/batch-events.dto.ts new file mode 100644 index 0000000..1ecddb4 --- /dev/null +++ b/packages/services/presence-service/src/api/dto/request/batch-events.dto.ts @@ -0,0 +1,60 @@ +import { IsArray, IsString, IsOptional, IsNumber, ValidateNested, IsObject } from 'class-validator'; +import { Type } from 'class-transformer'; +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; + +export class EventItemDto { + @ApiProperty({ example: 'app_session_start' }) + @IsString() + eventName: string; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + userId?: string; + + @ApiProperty({ example: 'uuid-v4-xxx' }) + @IsString() + installId: string; + + @ApiProperty({ example: 1732685100 }) + @IsNumber() + clientTs: number; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + deviceBrand?: string; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + deviceModel?: string; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + deviceOs?: string; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + appVersion?: string; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + locale?: string; + + @ApiPropertyOptional() + @IsOptional() + @IsObject() + properties?: Record; +} + +export class BatchEventsDto { + @ApiProperty({ type: [EventItemDto] }) + @IsArray() + @ValidateNested({ each: true }) + @Type(() => EventItemDto) + events: EventItemDto[]; +} diff --git a/packages/services/presence-service/src/api/dto/request/heartbeat.dto.ts b/packages/services/presence-service/src/api/dto/request/heartbeat.dto.ts new file mode 100644 index 0000000..1f52aec --- /dev/null +++ b/packages/services/presence-service/src/api/dto/request/heartbeat.dto.ts @@ -0,0 +1,16 @@ +import { IsString, IsNumber } from 'class-validator'; +import { ApiProperty } from '@nestjs/swagger'; + +export class HeartbeatDto { + @ApiProperty({ example: 'uuid-v4-xxx' }) + @IsString() + installId: string; + + @ApiProperty({ example: '1.0.0' }) + @IsString() + appVersion: string; + + @ApiProperty({ example: 1732685100 }) + @IsNumber() + clientTs: number; +} diff --git a/packages/services/presence-service/src/api/dto/request/query-dau.dto.ts b/packages/services/presence-service/src/api/dto/request/query-dau.dto.ts new file mode 100644 index 0000000..b6b847f --- /dev/null +++ b/packages/services/presence-service/src/api/dto/request/query-dau.dto.ts @@ -0,0 +1,12 @@ +import { IsDateString } from 'class-validator'; +import { ApiProperty } from '@nestjs/swagger'; + +export class QueryDauDto { + @ApiProperty({ example: '2026-03-01' }) + @IsDateString() + startDate: string; + + @ApiProperty({ example: '2026-03-07' }) + @IsDateString() + endDate: string; +} diff --git a/packages/services/presence-service/src/api/dto/request/query-online-history.dto.ts b/packages/services/presence-service/src/api/dto/request/query-online-history.dto.ts new file mode 100644 index 0000000..ad6a32b --- /dev/null +++ b/packages/services/presence-service/src/api/dto/request/query-online-history.dto.ts @@ -0,0 +1,17 @@ +import { IsString, IsDateString, IsOptional } from 'class-validator'; +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; + +export class QueryOnlineHistoryDto { + @ApiProperty({ example: '2026-03-01T00:00:00Z' }) + @IsDateString() + startTime: string; + + @ApiProperty({ example: '2026-03-07T23:59:59Z' }) + @IsDateString() + endTime: string; + + @ApiPropertyOptional({ enum: ['1m', '5m', '1h'], default: '5m' }) + @IsOptional() + @IsString() + interval?: '1m' | '5m' | '1h'; +} diff --git a/packages/services/presence-service/src/app.module.ts b/packages/services/presence-service/src/app.module.ts new file mode 100644 index 0000000..53480f0 --- /dev/null +++ b/packages/services/presence-service/src/app.module.ts @@ -0,0 +1,30 @@ +import { Module } from '@nestjs/common'; +import { ConfigModule, ConfigService } from '@nestjs/config'; +import { JwtModule } from '@nestjs/jwt'; +import { ScheduleModule } from '@nestjs/schedule'; +import { ApiModule } from './api/api.module'; +import { ApplicationModule } from './application/application.module'; +import { DomainModule } from './domain/domain.module'; +import { InfrastructureModule } from './infrastructure/infrastructure.module'; + +@Module({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + envFilePath: ['.env.local', '.env'], + }), + JwtModule.registerAsync({ + global: true, + inject: [ConfigService], + useFactory: (configService: ConfigService) => ({ + secret: configService.get('JWT_SECRET'), + }), + }), + ScheduleModule.forRoot(), + DomainModule, + InfrastructureModule, + ApplicationModule, + ApiModule, + ], +}) +export class AppModule {} diff --git a/packages/services/presence-service/src/application/application.module.ts b/packages/services/presence-service/src/application/application.module.ts new file mode 100644 index 0000000..552ea8f --- /dev/null +++ b/packages/services/presence-service/src/application/application.module.ts @@ -0,0 +1,26 @@ +import { Module } from '@nestjs/common'; +import { CqrsModule } from '@nestjs/cqrs'; +import { DomainModule } from '../domain/domain.module'; +import { InfrastructureModule } from '../infrastructure/infrastructure.module'; +import { RecordEventsHandler } from './commands/record-events/record-events.handler'; +import { RecordHeartbeatHandler } from './commands/record-heartbeat/record-heartbeat.handler'; +import { CalculateDauHandler } from './commands/calculate-dau/calculate-dau.handler'; +import { GetOnlineCountHandler } from './queries/get-online-count/get-online-count.handler'; +import { GetDauStatsHandler } from './queries/get-dau-stats/get-dau-stats.handler'; +import { GetOnlineHistoryHandler } from './queries/get-online-history/get-online-history.handler'; +import { AnalyticsScheduler } from './schedulers/analytics.scheduler'; + +@Module({ + imports: [CqrsModule, DomainModule, InfrastructureModule], + providers: [ + RecordEventsHandler, + RecordHeartbeatHandler, + CalculateDauHandler, + GetOnlineCountHandler, + GetDauStatsHandler, + GetOnlineHistoryHandler, + AnalyticsScheduler, + ], + exports: [CqrsModule], +}) +export class ApplicationModule {} diff --git a/packages/services/presence-service/src/application/commands/calculate-dau/calculate-dau.command.ts b/packages/services/presence-service/src/application/commands/calculate-dau/calculate-dau.command.ts new file mode 100644 index 0000000..62c5c0a --- /dev/null +++ b/packages/services/presence-service/src/application/commands/calculate-dau/calculate-dau.command.ts @@ -0,0 +1,3 @@ +export class CalculateDauCommand { + constructor(public readonly date: Date) {} +} diff --git a/packages/services/presence-service/src/application/commands/calculate-dau/calculate-dau.handler.ts b/packages/services/presence-service/src/application/commands/calculate-dau/calculate-dau.handler.ts new file mode 100644 index 0000000..1b0c9f8 --- /dev/null +++ b/packages/services/presence-service/src/application/commands/calculate-dau/calculate-dau.handler.ts @@ -0,0 +1,48 @@ +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { CalculateDauCommand } from './calculate-dau.command'; +import { IEventLogRepository, EVENT_LOG_REPOSITORY } from '../../../domain/repositories/event-log.repository.interface'; +import { IDailyActiveStatsRepository, DAILY_ACTIVE_STATS_REPOSITORY } from '../../../domain/repositories/daily-active-stats.repository.interface'; +import { DauCalculationService } from '../../../domain/services/dau-calculation.service'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { startOfDayInTimezone, endOfDayInTimezone } from '../../../shared/utils/timezone.util'; + +@Injectable() +@CommandHandler(CalculateDauCommand) +export class CalculateDauHandler implements ICommandHandler { + private readonly logger = new Logger(CalculateDauHandler.name); + + constructor( + @Inject(EVENT_LOG_REPOSITORY) + private readonly eventLogRepository: IEventLogRepository, + @Inject(DAILY_ACTIVE_STATS_REPOSITORY) + private readonly dauStatsRepository: IDailyActiveStatsRepository, + private readonly dauCalculationService: DauCalculationService, + ) {} + + async execute(command: CalculateDauCommand): Promise { + const { date } = command; + const timezone = 'Asia/Shanghai'; + const startOfDay = startOfDayInTimezone(date, timezone); + const endOfDay = endOfDayInTimezone(date, timezone); + + this.logger.log(`Calculating DAU for ${date.toISOString().split('T')[0]}`); + + const result = await this.eventLogRepository.queryDau( + EventName.APP_SESSION_START, + startOfDay, + endOfDay, + ); + + const existingStats = await this.dauStatsRepository.findByDate(date); + if (existingStats) { + existingStats.recalculate(result.total, result.byProvince, result.byCity); + await this.dauStatsRepository.upsert(existingStats); + } else { + const stats = this.dauCalculationService.createStatsFromQueryResult(date, result); + await this.dauStatsRepository.upsert(stats); + } + + this.logger.log(`DAU calculated: ${result.total} users`); + } +} diff --git a/packages/services/presence-service/src/application/commands/record-events/record-events.command.ts b/packages/services/presence-service/src/application/commands/record-events/record-events.command.ts new file mode 100644 index 0000000..6687cd7 --- /dev/null +++ b/packages/services/presence-service/src/application/commands/record-events/record-events.command.ts @@ -0,0 +1,16 @@ +export interface EventItemDto { + eventName: string; + userId?: string; + installId: string; + clientTs: number; + deviceBrand?: string; + deviceModel?: string; + deviceOs?: string; + appVersion?: string; + locale?: string; + properties?: Record; +} + +export class RecordEventsCommand { + constructor(public readonly events: EventItemDto[]) {} +} diff --git a/packages/services/presence-service/src/application/commands/record-events/record-events.handler.ts b/packages/services/presence-service/src/application/commands/record-events/record-events.handler.ts new file mode 100644 index 0000000..5bb353d --- /dev/null +++ b/packages/services/presence-service/src/application/commands/record-events/record-events.handler.ts @@ -0,0 +1,97 @@ +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Inject, Injectable } from '@nestjs/common'; +import { RecordEventsCommand, EventItemDto } from './record-events.command'; +import { EventLog } from '../../../domain/entities/event-log.entity'; +import { DeviceProfile } from '../../../domain/entities/device-profile.entity'; +import { InstallId } from '../../../domain/value-objects/install-id.vo'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { EventProperties } from '../../../domain/value-objects/event-properties.vo'; +import { IEventLogRepository, EVENT_LOG_REPOSITORY } from '../../../domain/repositories/event-log.repository.interface'; +import { IDeviceProfileRepository, DEVICE_PROFILE_REPOSITORY } from '../../../domain/repositories/device-profile.repository.interface'; +import { RedisService } from '../../../infrastructure/redis/redis.service'; +import { formatToDateKey } from '../../../shared/utils/timezone.util'; + +export interface RecordEventsResult { + accepted: number; + failed: number; + errors?: string[]; +} + +@Injectable() +@CommandHandler(RecordEventsCommand) +export class RecordEventsHandler implements ICommandHandler { + constructor( + @Inject(EVENT_LOG_REPOSITORY) + private readonly eventLogRepository: IEventLogRepository, + @Inject(DEVICE_PROFILE_REPOSITORY) + private readonly deviceProfileRepository: IDeviceProfileRepository, + private readonly redisService: RedisService, + ) {} + + async execute(command: RecordEventsCommand): Promise { + const { events } = command; + const errors: string[] = []; + const validLogs: EventLog[] = []; + + for (let i = 0; i < events.length; i++) { + try { + validLogs.push(this.toEventLog(events[i])); + } catch (e) { + errors.push(`Event[${i}]: ${e.message}`); + } + } + + if (validLogs.length === 0) { + return { accepted: 0, failed: events.length, errors }; + } + + await this.eventLogRepository.batchInsert(validLogs); + + // Upsert device profiles + const deviceProfiles = validLogs + .filter((log) => log.installId.value) + .map((log) => + DeviceProfile.create({ + installId: log.installId.value, + userId: log.userId, + deviceBrand: log.deviceBrand, + deviceModel: log.deviceModel, + deviceOs: log.deviceOs, + appVersion: log.appVersion, + locale: log.locale, + eventCount: 1, + }), + ); + await this.deviceProfileRepository.upsertBatch(deviceProfiles); + + // Update realtime DAU (HyperLogLog) + const todayKey = formatToDateKey(new Date()); + for (const log of validLogs) { + if (log.eventName.isDauEvent()) { + await this.redisService.pfadd(`presence:dau:${todayKey}`, log.dauIdentifier); + await this.redisService.expire(`presence:dau:${todayKey}`, 86400 * 3); + } + } + + return { + accepted: validLogs.length, + failed: events.length - validLogs.length, + errors: errors.length > 0 ? errors : undefined, + }; + } + + private toEventLog(dto: EventItemDto): EventLog { + return EventLog.create({ + userId: dto.userId ?? null, + installId: InstallId.fromString(dto.installId), + eventName: EventName.fromString(dto.eventName), + eventTime: new Date(dto.clientTs * 1000), + deviceBrand: dto.deviceBrand ?? null, + deviceModel: dto.deviceModel ?? null, + deviceOs: dto.deviceOs ?? null, + appVersion: dto.appVersion ?? null, + locale: dto.locale ?? null, + properties: EventProperties.fromData(dto.properties ?? {}), + }); + } +} diff --git a/packages/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.command.ts b/packages/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.command.ts new file mode 100644 index 0000000..6a1d169 --- /dev/null +++ b/packages/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.command.ts @@ -0,0 +1,8 @@ +export class RecordHeartbeatCommand { + constructor( + public readonly userId: string, + public readonly installId: string, + public readonly appVersion: string, + public readonly clientTs: number, + ) {} +} diff --git a/packages/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.handler.ts b/packages/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.handler.ts new file mode 100644 index 0000000..b553044 --- /dev/null +++ b/packages/services/presence-service/src/application/commands/record-heartbeat/record-heartbeat.handler.ts @@ -0,0 +1,21 @@ +import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { Injectable } from '@nestjs/common'; +import { RecordHeartbeatCommand } from './record-heartbeat.command'; +import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository'; + +export interface RecordHeartbeatResult { + ok: boolean; + serverTs: number; +} + +@Injectable() +@CommandHandler(RecordHeartbeatCommand) +export class RecordHeartbeatHandler implements ICommandHandler { + constructor(private readonly presenceRedisRepository: PresenceRedisRepository) {} + + async execute(command: RecordHeartbeatCommand): Promise { + const now = Math.floor(Date.now() / 1000); + await this.presenceRedisRepository.updateUserPresence(command.userId, now); + return { ok: true, serverTs: now }; + } +} diff --git a/packages/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.handler.ts b/packages/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.handler.ts new file mode 100644 index 0000000..ebdf560 --- /dev/null +++ b/packages/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.handler.ts @@ -0,0 +1,27 @@ +import { IQueryHandler, QueryHandler } from '@nestjs/cqrs'; +import { Inject, Injectable } from '@nestjs/common'; +import { GetDauStatsQuery } from './get-dau-stats.query'; +import { IDailyActiveStatsRepository, DAILY_ACTIVE_STATS_REPOSITORY } from '../../../domain/repositories/daily-active-stats.repository.interface'; + +export interface DauStatsResult { + data: { day: string; dauCount: number }[]; + total: number; +} + +@Injectable() +@QueryHandler(GetDauStatsQuery) +export class GetDauStatsHandler implements IQueryHandler { + constructor( + @Inject(DAILY_ACTIVE_STATS_REPOSITORY) + private readonly dauStatsRepository: IDailyActiveStatsRepository, + ) {} + + async execute(query: GetDauStatsQuery): Promise { + const statsList = await this.dauStatsRepository.findByDateRange(query.startDate, query.endDate); + const data = statsList.map((stats) => ({ + day: stats.day.toISOString().split('T')[0], + dauCount: stats.dauCount, + })); + return { data, total: data.length }; + } +} diff --git a/packages/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.query.ts b/packages/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.query.ts new file mode 100644 index 0000000..1193019 --- /dev/null +++ b/packages/services/presence-service/src/application/queries/get-dau-stats/get-dau-stats.query.ts @@ -0,0 +1,6 @@ +export class GetDauStatsQuery { + constructor( + public readonly startDate: Date, + public readonly endDate: Date, + ) {} +} diff --git a/packages/services/presence-service/src/application/queries/get-online-count/get-online-count.handler.ts b/packages/services/presence-service/src/application/queries/get-online-count/get-online-count.handler.ts new file mode 100644 index 0000000..68773bd --- /dev/null +++ b/packages/services/presence-service/src/application/queries/get-online-count/get-online-count.handler.ts @@ -0,0 +1,27 @@ +import { IQueryHandler, QueryHandler } from '@nestjs/cqrs'; +import { Injectable } from '@nestjs/common'; +import { GetOnlineCountQuery } from './get-online-count.query'; +import { PresenceRedisRepository } from '../../../infrastructure/redis/presence-redis.repository'; +import { OnlineDetectionService } from '../../../domain/services/online-detection.service'; + +export interface OnlineCountResult { + count: number; + windowSeconds: number; + queriedAt: Date; +} + +@Injectable() +@QueryHandler(GetOnlineCountQuery) +export class GetOnlineCountHandler implements IQueryHandler { + constructor( + private readonly presenceRedisRepository: PresenceRedisRepository, + private readonly onlineDetectionService: OnlineDetectionService, + ) {} + + async execute(): Promise { + const now = new Date(); + const threshold = this.onlineDetectionService.getOnlineThreshold(now); + const count = await this.presenceRedisRepository.countOnlineUsers(threshold); + return { count, windowSeconds: this.onlineDetectionService.getWindowSeconds(), queriedAt: now }; + } +} diff --git a/packages/services/presence-service/src/application/queries/get-online-count/get-online-count.query.ts b/packages/services/presence-service/src/application/queries/get-online-count/get-online-count.query.ts new file mode 100644 index 0000000..18d7ee7 --- /dev/null +++ b/packages/services/presence-service/src/application/queries/get-online-count/get-online-count.query.ts @@ -0,0 +1 @@ +export class GetOnlineCountQuery {} diff --git a/packages/services/presence-service/src/application/queries/get-online-history/get-online-history.handler.ts b/packages/services/presence-service/src/application/queries/get-online-history/get-online-history.handler.ts new file mode 100644 index 0000000..9c914b6 --- /dev/null +++ b/packages/services/presence-service/src/application/queries/get-online-history/get-online-history.handler.ts @@ -0,0 +1,100 @@ +import { IQueryHandler, QueryHandler } from '@nestjs/cqrs'; +import { Inject, Injectable } from '@nestjs/common'; +import { GetOnlineHistoryQuery, OnlineHistoryInterval } from './get-online-history.query'; +import { IOnlineSnapshotRepository, ONLINE_SNAPSHOT_REPOSITORY } from '../../../domain/repositories/online-snapshot.repository.interface'; +import { OnlineSnapshot } from '../../../domain/entities/online-snapshot.entity'; + +export interface OnlineHistoryDataPoint { + timestamp: string; + onlineCount: number; + windowSeconds: number; +} + +export interface OnlineHistoryResult { + data: OnlineHistoryDataPoint[]; + interval: OnlineHistoryInterval; + startTime: string; + endTime: string; + total: number; + summary: { maxOnline: number; minOnline: number; avgOnline: number; maxTimestamp: string | null; minTimestamp: string | null }; +} + +@Injectable() +@QueryHandler(GetOnlineHistoryQuery) +export class GetOnlineHistoryHandler implements IQueryHandler { + constructor( + @Inject(ONLINE_SNAPSHOT_REPOSITORY) + private readonly snapshotRepository: IOnlineSnapshotRepository, + ) {} + + async execute(query: GetOnlineHistoryQuery): Promise { + const { startTime, endTime, interval } = query; + const snapshots = await this.snapshotRepository.findByTimeRange(startTime, endTime, interval); + const aggregated = this.aggregateByInterval(snapshots, interval); + + const data: OnlineHistoryDataPoint[] = aggregated.map((s) => ({ + timestamp: s.ts.toISOString(), + onlineCount: s.onlineCount, + windowSeconds: s.windowSeconds, + })); + + return { + data, + interval, + startTime: startTime.toISOString(), + endTime: endTime.toISOString(), + total: data.length, + summary: this.calculateSummary(aggregated), + }; + } + + private aggregateByInterval(snapshots: OnlineSnapshot[], interval: OnlineHistoryInterval): OnlineSnapshot[] { + if (snapshots.length === 0) return []; + const intervalMs = interval === '1m' ? 60000 : interval === '5m' ? 300000 : 3600000; + const buckets = new Map(); + + for (const snapshot of snapshots) { + const key = Math.floor(snapshot.ts.getTime() / intervalMs) * intervalMs; + const existing = buckets.get(key); + if (existing) { + existing.total += snapshot.onlineCount; + existing.count++; + } else { + buckets.set(key, { total: snapshot.onlineCount, count: 1, windowSeconds: snapshot.windowSeconds }); + } + } + + return Array.from(buckets.keys()) + .sort((a, b) => a - b) + .map((key) => { + const bucket = buckets.get(key)!; + return OnlineSnapshot.reconstitute({ + id: BigInt(0), + ts: new Date(key), + onlineCount: Math.round(bucket.total / bucket.count), + windowSeconds: bucket.windowSeconds, + }); + }); + } + + private calculateSummary(snapshots: OnlineSnapshot[]): OnlineHistoryResult['summary'] { + if (snapshots.length === 0) return { maxOnline: 0, minOnline: 0, avgOnline: 0, maxTimestamp: null, minTimestamp: null }; + + let maxOnline = -Infinity, minOnline = Infinity, totalOnline = 0; + let maxTimestamp: Date | null = null, minTimestamp: Date | null = null; + + for (const s of snapshots) { + totalOnline += s.onlineCount; + if (s.onlineCount > maxOnline) { maxOnline = s.onlineCount; maxTimestamp = s.ts; } + if (s.onlineCount < minOnline) { minOnline = s.onlineCount; minTimestamp = s.ts; } + } + + return { + maxOnline, + minOnline, + avgOnline: Math.round(totalOnline / snapshots.length), + maxTimestamp: maxTimestamp?.toISOString() || null, + minTimestamp: minTimestamp?.toISOString() || null, + }; + } +} diff --git a/packages/services/presence-service/src/application/queries/get-online-history/get-online-history.query.ts b/packages/services/presence-service/src/application/queries/get-online-history/get-online-history.query.ts new file mode 100644 index 0000000..9835c92 --- /dev/null +++ b/packages/services/presence-service/src/application/queries/get-online-history/get-online-history.query.ts @@ -0,0 +1,9 @@ +export type OnlineHistoryInterval = '1m' | '5m' | '1h'; + +export class GetOnlineHistoryQuery { + constructor( + public readonly startTime: Date, + public readonly endTime: Date, + public readonly interval: OnlineHistoryInterval = '5m', + ) {} +} diff --git a/packages/services/presence-service/src/application/schedulers/analytics.scheduler.ts b/packages/services/presence-service/src/application/schedulers/analytics.scheduler.ts new file mode 100644 index 0000000..786205e --- /dev/null +++ b/packages/services/presence-service/src/application/schedulers/analytics.scheduler.ts @@ -0,0 +1,70 @@ +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { CommandBus } from '@nestjs/cqrs'; +import { subDays } from 'date-fns'; +import { CalculateDauCommand } from '../commands/calculate-dau/calculate-dau.command'; +import { PresenceRedisRepository } from '../../infrastructure/redis/presence-redis.repository'; +import { OnlineDetectionService } from '../../domain/services/online-detection.service'; +import { OnlineSnapshot } from '../../domain/entities/online-snapshot.entity'; +import { IOnlineSnapshotRepository, ONLINE_SNAPSHOT_REPOSITORY } from '../../domain/repositories/online-snapshot.repository.interface'; + +@Injectable() +export class AnalyticsScheduler { + private readonly logger = new Logger(AnalyticsScheduler.name); + + constructor( + private readonly commandBus: CommandBus, + private readonly presenceRedisRepository: PresenceRedisRepository, + private readonly onlineDetectionService: OnlineDetectionService, + @Inject(ONLINE_SNAPSHOT_REPOSITORY) + private readonly snapshotRepository: IOnlineSnapshotRepository, + ) {} + + @Cron(CronExpression.EVERY_MINUTE) + async recordOnlineSnapshot(): Promise { + try { + const now = new Date(); + const threshold = this.onlineDetectionService.getOnlineThreshold(now); + const count = await this.presenceRedisRepository.countOnlineUsers(threshold); + const snapshot = OnlineSnapshot.create({ + ts: now, + onlineCount: count, + windowSeconds: this.onlineDetectionService.getWindowSeconds(), + }); + await this.snapshotRepository.insert(snapshot); + this.logger.debug(`Online snapshot: ${count} users`); + } catch (error) { + this.logger.error('Failed to record online snapshot', error); + } + } + + @Cron(CronExpression.EVERY_HOUR) + async cleanupExpiredPresence(): Promise { + try { + const threshold = Math.floor(Date.now() / 1000) - 86400; + await this.presenceRedisRepository.removeExpiredUsers(threshold); + } catch (error) { + this.logger.error('Failed to cleanup expired presence', error); + } + } + + @Cron('0 0 1 * * *', { timeZone: 'Asia/Shanghai' }) + async calculateYesterdayDau(): Promise { + try { + const yesterday = subDays(new Date(), 1); + await this.commandBus.execute(new CalculateDauCommand(yesterday)); + this.logger.log('Yesterday DAU calculated'); + } catch (error) { + this.logger.error('Failed to calculate yesterday DAU', error); + } + } + + @Cron(CronExpression.EVERY_HOUR) + async calculateTodayDauRolling(): Promise { + try { + await this.commandBus.execute(new CalculateDauCommand(new Date())); + } catch (error) { + this.logger.error('Failed to calculate today DAU', error); + } + } +} diff --git a/packages/services/presence-service/src/domain/aggregates/daily-active-stats/daily-active-stats.aggregate.ts b/packages/services/presence-service/src/domain/aggregates/daily-active-stats/daily-active-stats.aggregate.ts new file mode 100644 index 0000000..aa16acf --- /dev/null +++ b/packages/services/presence-service/src/domain/aggregates/daily-active-stats/daily-active-stats.aggregate.ts @@ -0,0 +1,61 @@ +import { AggregateRoot } from '@nestjs/cqrs'; + +export class DailyActiveStats extends AggregateRoot { + private _day: Date; + private _dauCount: number; + private _dauByProvince: Map; + private _dauByCity: Map; + private _calculatedAt: Date; + private _version: number; + + private constructor() { super(); } + + get day(): Date { return this._day; } + get dauCount(): number { return this._dauCount; } + get dauByProvince(): Map { return new Map(this._dauByProvince); } + get dauByCity(): Map { return new Map(this._dauByCity); } + get calculatedAt(): Date { return this._calculatedAt; } + get version(): number { return this._version; } + + static create(props: { + day: Date; + dauCount: number; + dauByProvince?: Map; + dauByCity?: Map; + }): DailyActiveStats { + const stats = new DailyActiveStats(); + stats._day = props.day; + stats._dauCount = props.dauCount; + stats._dauByProvince = props.dauByProvince ?? new Map(); + stats._dauByCity = props.dauByCity ?? new Map(); + stats._calculatedAt = new Date(); + stats._version = 1; + return stats; + } + + recalculate(newDauCount: number, byProvince?: Map, byCity?: Map): void { + this._dauCount = newDauCount; + if (byProvince) this._dauByProvince = byProvince; + if (byCity) this._dauByCity = byCity; + this._calculatedAt = new Date(); + this._version++; + } + + static reconstitute(props: { + day: Date; + dauCount: number; + dauByProvince: Map; + dauByCity: Map; + calculatedAt: Date; + version: number; + }): DailyActiveStats { + const stats = new DailyActiveStats(); + stats._day = props.day; + stats._dauCount = props.dauCount; + stats._dauByProvince = props.dauByProvince; + stats._dauByCity = props.dauByCity; + stats._calculatedAt = props.calculatedAt; + stats._version = props.version; + return stats; + } +} diff --git a/packages/services/presence-service/src/domain/domain.module.ts b/packages/services/presence-service/src/domain/domain.module.ts new file mode 100644 index 0000000..0212fc9 --- /dev/null +++ b/packages/services/presence-service/src/domain/domain.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; +import { DauCalculationService } from './services/dau-calculation.service'; +import { OnlineDetectionService } from './services/online-detection.service'; + +@Module({ + providers: [DauCalculationService, OnlineDetectionService], + exports: [DauCalculationService, OnlineDetectionService], +}) +export class DomainModule {} diff --git a/packages/services/presence-service/src/domain/entities/device-profile.entity.ts b/packages/services/presence-service/src/domain/entities/device-profile.entity.ts new file mode 100644 index 0000000..c8a6ca3 --- /dev/null +++ b/packages/services/presence-service/src/domain/entities/device-profile.entity.ts @@ -0,0 +1,32 @@ +export class DeviceProfile { + installId: string; + userId: string | null; + deviceBrand: string | null; + deviceModel: string | null; + deviceOs: string | null; + appVersion: string | null; + locale: string | null; + eventCount: number; + + static create(props: { + installId: string; + userId?: string | null; + deviceBrand?: string | null; + deviceModel?: string | null; + deviceOs?: string | null; + appVersion?: string | null; + locale?: string | null; + eventCount?: number; + }): DeviceProfile { + const profile = new DeviceProfile(); + profile.installId = props.installId; + profile.userId = props.userId ?? null; + profile.deviceBrand = props.deviceBrand ?? null; + profile.deviceModel = props.deviceModel ?? null; + profile.deviceOs = props.deviceOs ?? null; + profile.appVersion = props.appVersion ?? null; + profile.locale = props.locale ?? null; + profile.eventCount = props.eventCount ?? 1; + return profile; + } +} diff --git a/packages/services/presence-service/src/domain/entities/event-log.entity.ts b/packages/services/presence-service/src/domain/entities/event-log.entity.ts new file mode 100644 index 0000000..3a4a476 --- /dev/null +++ b/packages/services/presence-service/src/domain/entities/event-log.entity.ts @@ -0,0 +1,95 @@ +import { InstallId } from '../value-objects/install-id.vo'; +import { EventName } from '../value-objects/event-name.vo'; +import { EventProperties } from '../value-objects/event-properties.vo'; + +export class EventLog { + private _id: bigint | null; + private _userId: string | null; + private _installId: InstallId; + private _eventName: EventName; + private _eventTime: Date; + private _deviceBrand: string | null; + private _deviceModel: string | null; + private _deviceOs: string | null; + private _appVersion: string | null; + private _locale: string | null; + private _properties: EventProperties; + private _createdAt: Date; + + private constructor() {} + + get id(): bigint | null { return this._id; } + get userId(): string | null { return this._userId; } + get installId(): InstallId { return this._installId; } + get eventName(): EventName { return this._eventName; } + get eventTime(): Date { return this._eventTime; } + get deviceBrand(): string | null { return this._deviceBrand; } + get deviceModel(): string | null { return this._deviceModel; } + get deviceOs(): string | null { return this._deviceOs; } + get appVersion(): string | null { return this._appVersion; } + get locale(): string | null { return this._locale; } + get properties(): EventProperties { return this._properties; } + get createdAt(): Date { return this._createdAt; } + + get dauIdentifier(): string { + return this._userId ?? this._installId.value; + } + + static create(props: { + userId?: string | null; + installId: InstallId; + eventName: EventName; + eventTime: Date; + deviceBrand?: string | null; + deviceModel?: string | null; + deviceOs?: string | null; + appVersion?: string | null; + locale?: string | null; + properties?: EventProperties; + }): EventLog { + const log = new EventLog(); + log._id = null; + log._userId = props.userId ?? null; + log._installId = props.installId; + log._eventName = props.eventName; + log._eventTime = props.eventTime; + log._deviceBrand = props.deviceBrand ?? null; + log._deviceModel = props.deviceModel ?? null; + log._deviceOs = props.deviceOs ?? null; + log._appVersion = props.appVersion ?? null; + log._locale = props.locale ?? null; + log._properties = props.properties ?? EventProperties.empty(); + log._createdAt = new Date(); + return log; + } + + static reconstitute(props: { + id: bigint; + userId: string | null; + installId: InstallId; + eventName: EventName; + eventTime: Date; + deviceBrand: string | null; + deviceModel: string | null; + deviceOs: string | null; + appVersion: string | null; + locale: string | null; + properties: EventProperties; + createdAt: Date; + }): EventLog { + const log = new EventLog(); + log._id = props.id; + log._userId = props.userId; + log._installId = props.installId; + log._eventName = props.eventName; + log._eventTime = props.eventTime; + log._deviceBrand = props.deviceBrand; + log._deviceModel = props.deviceModel; + log._deviceOs = props.deviceOs; + log._appVersion = props.appVersion; + log._locale = props.locale; + log._properties = props.properties; + log._createdAt = props.createdAt; + return log; + } +} diff --git a/packages/services/presence-service/src/domain/entities/online-snapshot.entity.ts b/packages/services/presence-service/src/domain/entities/online-snapshot.entity.ts new file mode 100644 index 0000000..0f0dbc1 --- /dev/null +++ b/packages/services/presence-service/src/domain/entities/online-snapshot.entity.ts @@ -0,0 +1,33 @@ +import { TimeWindow } from '../value-objects/time-window.vo'; + +export class OnlineSnapshot { + private _id: bigint | null; + private _ts: Date; + private _onlineCount: number; + private _windowSeconds: number; + + private constructor() {} + + get id(): bigint | null { return this._id; } + get ts(): Date { return this._ts; } + get onlineCount(): number { return this._onlineCount; } + get windowSeconds(): number { return this._windowSeconds; } + + static create(props: { ts: Date; onlineCount: number; windowSeconds?: number }): OnlineSnapshot { + const snapshot = new OnlineSnapshot(); + snapshot._id = null; + snapshot._ts = props.ts; + snapshot._onlineCount = props.onlineCount; + snapshot._windowSeconds = props.windowSeconds ?? TimeWindow.DEFAULT_ONLINE_WINDOW_SECONDS; + return snapshot; + } + + static reconstitute(props: { id: bigint; ts: Date; onlineCount: number; windowSeconds: number }): OnlineSnapshot { + const snapshot = new OnlineSnapshot(); + snapshot._id = props.id; + snapshot._ts = props.ts; + snapshot._onlineCount = props.onlineCount; + snapshot._windowSeconds = props.windowSeconds; + return snapshot; + } +} diff --git a/packages/services/presence-service/src/domain/repositories/daily-active-stats.repository.interface.ts b/packages/services/presence-service/src/domain/repositories/daily-active-stats.repository.interface.ts new file mode 100644 index 0000000..52732e8 --- /dev/null +++ b/packages/services/presence-service/src/domain/repositories/daily-active-stats.repository.interface.ts @@ -0,0 +1,9 @@ +import { DailyActiveStats } from '../aggregates/daily-active-stats/daily-active-stats.aggregate'; + +export interface IDailyActiveStatsRepository { + upsert(stats: DailyActiveStats): Promise; + findByDate(day: Date): Promise; + findByDateRange(startDate: Date, endDate: Date): Promise; +} + +export const DAILY_ACTIVE_STATS_REPOSITORY = 'DAILY_ACTIVE_STATS_REPOSITORY'; diff --git a/packages/services/presence-service/src/domain/repositories/device-profile.repository.interface.ts b/packages/services/presence-service/src/domain/repositories/device-profile.repository.interface.ts new file mode 100644 index 0000000..bf97db8 --- /dev/null +++ b/packages/services/presence-service/src/domain/repositories/device-profile.repository.interface.ts @@ -0,0 +1,7 @@ +import { DeviceProfile } from '../entities/device-profile.entity'; + +export interface IDeviceProfileRepository { + upsertBatch(profiles: DeviceProfile[]): Promise; +} + +export const DEVICE_PROFILE_REPOSITORY = 'DEVICE_PROFILE_REPOSITORY'; diff --git a/packages/services/presence-service/src/domain/repositories/event-log.repository.interface.ts b/packages/services/presence-service/src/domain/repositories/event-log.repository.interface.ts new file mode 100644 index 0000000..d220fce --- /dev/null +++ b/packages/services/presence-service/src/domain/repositories/event-log.repository.interface.ts @@ -0,0 +1,17 @@ +import { EventLog } from '../entities/event-log.entity'; +import { EventName } from '../value-objects/event-name.vo'; + +export interface DauQueryResult { + total: number; + byProvince: Map; + byCity: Map; +} + +export interface IEventLogRepository { + batchInsert(logs: EventLog[]): Promise; + insert(log: EventLog): Promise; + queryDau(eventName: EventName, startTime: Date, endTime: Date): Promise; + findByTimeRange(eventName: EventName, startTime: Date, endTime: Date, limit?: number): Promise; +} + +export const EVENT_LOG_REPOSITORY = 'EVENT_LOG_REPOSITORY'; diff --git a/packages/services/presence-service/src/domain/repositories/online-snapshot.repository.interface.ts b/packages/services/presence-service/src/domain/repositories/online-snapshot.repository.interface.ts new file mode 100644 index 0000000..03c1ec7 --- /dev/null +++ b/packages/services/presence-service/src/domain/repositories/online-snapshot.repository.interface.ts @@ -0,0 +1,9 @@ +import { OnlineSnapshot } from '../entities/online-snapshot.entity'; + +export interface IOnlineSnapshotRepository { + insert(snapshot: OnlineSnapshot): Promise; + findByTimeRange(startTime: Date, endTime: Date, interval?: string): Promise; + findLatest(): Promise; +} + +export const ONLINE_SNAPSHOT_REPOSITORY = 'ONLINE_SNAPSHOT_REPOSITORY'; diff --git a/packages/services/presence-service/src/domain/services/dau-calculation.service.ts b/packages/services/presence-service/src/domain/services/dau-calculation.service.ts new file mode 100644 index 0000000..3b87f91 --- /dev/null +++ b/packages/services/presence-service/src/domain/services/dau-calculation.service.ts @@ -0,0 +1,15 @@ +import { Injectable } from '@nestjs/common'; +import { DailyActiveStats } from '../aggregates/daily-active-stats/daily-active-stats.aggregate'; +import { DauQueryResult } from '../repositories/event-log.repository.interface'; + +@Injectable() +export class DauCalculationService { + createStatsFromQueryResult(day: Date, result: DauQueryResult): DailyActiveStats { + return DailyActiveStats.create({ + day, + dauCount: result.total, + dauByProvince: result.byProvince, + dauByCity: result.byCity, + }); + } +} diff --git a/packages/services/presence-service/src/domain/services/online-detection.service.ts b/packages/services/presence-service/src/domain/services/online-detection.service.ts new file mode 100644 index 0000000..1235d6b --- /dev/null +++ b/packages/services/presence-service/src/domain/services/online-detection.service.ts @@ -0,0 +1,24 @@ +import { Injectable } from '@nestjs/common'; +import { TimeWindow } from '../value-objects/time-window.vo'; + +@Injectable() +export class OnlineDetectionService { + private readonly timeWindow: TimeWindow; + + constructor() { + const windowSeconds = parseInt(process.env.PRESENCE_WINDOW_SECONDS || '300', 10); + this.timeWindow = TimeWindow.ofSeconds(windowSeconds); + } + + isOnline(lastHeartbeatTs: number, now: Date = new Date()): boolean { + return lastHeartbeatTs > this.timeWindow.getThresholdTimestamp(now); + } + + getOnlineThreshold(now: Date = new Date()): number { + return this.timeWindow.getThresholdTimestamp(now); + } + + getWindowSeconds(): number { + return this.timeWindow.windowSeconds; + } +} diff --git a/packages/services/presence-service/src/domain/value-objects/event-name.vo.ts b/packages/services/presence-service/src/domain/value-objects/event-name.vo.ts new file mode 100644 index 0000000..71b1fed --- /dev/null +++ b/packages/services/presence-service/src/domain/value-objects/event-name.vo.ts @@ -0,0 +1,43 @@ +import { DomainException } from '../../shared/exceptions/domain.exception'; + +export class EventName { + static readonly APP_SESSION_START = new EventName('app_session_start'); + static readonly PRESENCE_HEARTBEAT = new EventName('presence_heartbeat'); + static readonly APP_SESSION_END = new EventName('app_session_end'); + + private readonly _value: string; + + private constructor(value: string) { + this._value = value; + } + + get value(): string { + return this._value; + } + + static fromString(value: string): EventName { + if (!value || value.trim() === '') { + throw new DomainException('EventName cannot be empty'); + } + const trimmed = value.trim().toLowerCase(); + if (trimmed.length > 64) { + throw new DomainException('EventName cannot exceed 64 characters'); + } + if (!/^[a-z][a-z0-9_]*$/.test(trimmed)) { + throw new DomainException('EventName must start with a letter and contain only lowercase letters, numbers, and underscores'); + } + return new EventName(trimmed); + } + + isDauEvent(): boolean { + return this._value === EventName.APP_SESSION_START.value; + } + + equals(other: EventName): boolean { + return this._value === other._value; + } + + toString(): string { + return this._value; + } +} diff --git a/packages/services/presence-service/src/domain/value-objects/event-properties.vo.ts b/packages/services/presence-service/src/domain/value-objects/event-properties.vo.ts new file mode 100644 index 0000000..5faad4d --- /dev/null +++ b/packages/services/presence-service/src/domain/value-objects/event-properties.vo.ts @@ -0,0 +1,41 @@ +export interface EventPropertiesData { + province?: string; + city?: string; + [key: string]: unknown; +} + +export class EventProperties { + private readonly _data: EventPropertiesData; + + private constructor(data: EventPropertiesData) { + this._data = { ...data }; + } + + get data(): EventPropertiesData { + return { ...this._data }; + } + + get province(): string | undefined { + return this._data.province; + } + + get city(): string | undefined { + return this._data.city; + } + + get appVersion(): string | undefined { + return this._data.appVersion as string | undefined; + } + + static empty(): EventProperties { + return new EventProperties({}); + } + + static fromData(data: EventPropertiesData): EventProperties { + return new EventProperties(data); + } + + toJSON(): EventPropertiesData { + return this._data; + } +} diff --git a/packages/services/presence-service/src/domain/value-objects/install-id.vo.ts b/packages/services/presence-service/src/domain/value-objects/install-id.vo.ts new file mode 100644 index 0000000..85d467d --- /dev/null +++ b/packages/services/presence-service/src/domain/value-objects/install-id.vo.ts @@ -0,0 +1,31 @@ +import { DomainException } from '../../shared/exceptions/domain.exception'; + +export class InstallId { + private readonly _value: string; + + private constructor(value: string) { + this._value = value; + } + + get value(): string { + return this._value; + } + + static fromString(value: string): InstallId { + if (!value || value.trim() === '') { + throw new DomainException('InstallId cannot be empty'); + } + if (value.length < 8 || value.length > 128) { + throw new DomainException('InstallId length must be between 8 and 128 characters'); + } + return new InstallId(value.trim()); + } + + equals(other: InstallId): boolean { + return this._value === other._value; + } + + toString(): string { + return this._value; + } +} diff --git a/packages/services/presence-service/src/domain/value-objects/time-window.vo.ts b/packages/services/presence-service/src/domain/value-objects/time-window.vo.ts new file mode 100644 index 0000000..6945e26 --- /dev/null +++ b/packages/services/presence-service/src/domain/value-objects/time-window.vo.ts @@ -0,0 +1,27 @@ +export class TimeWindow { + static readonly DEFAULT_ONLINE_WINDOW_SECONDS = 300; + static readonly DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 60; + + private readonly _windowSeconds: number; + + private constructor(windowSeconds: number) { + this._windowSeconds = windowSeconds; + } + + get windowSeconds(): number { + return this._windowSeconds; + } + + static default(): TimeWindow { + return new TimeWindow(TimeWindow.DEFAULT_ONLINE_WINDOW_SECONDS); + } + + static ofSeconds(seconds: number): TimeWindow { + if (seconds <= 0) throw new Error('TimeWindow must be positive'); + return new TimeWindow(seconds); + } + + getThresholdTimestamp(now: Date = new Date()): number { + return Math.floor(now.getTime() / 1000) - this._windowSeconds; + } +} diff --git a/packages/services/presence-service/src/infrastructure/infrastructure.module.ts b/packages/services/presence-service/src/infrastructure/infrastructure.module.ts new file mode 100644 index 0000000..7bbfce2 --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/infrastructure.module.ts @@ -0,0 +1,37 @@ +import { Module } from '@nestjs/common'; +import { PrismaService } from './persistence/prisma/prisma.service'; +import { EventLogMapper } from './persistence/mappers/event-log.mapper'; +import { DailyActiveStatsMapper } from './persistence/mappers/daily-active-stats.mapper'; +import { OnlineSnapshotMapper } from './persistence/mappers/online-snapshot.mapper'; +import { EventLogRepositoryImpl } from './persistence/repositories/event-log.repository.impl'; +import { DailyActiveStatsRepositoryImpl } from './persistence/repositories/daily-active-stats.repository.impl'; +import { OnlineSnapshotRepositoryImpl } from './persistence/repositories/online-snapshot.repository.impl'; +import { DeviceProfileRepositoryImpl } from './persistence/repositories/device-profile.repository.impl'; +import { RedisModule } from './redis/redis.module'; +import { EVENT_LOG_REPOSITORY } from '../domain/repositories/event-log.repository.interface'; +import { DAILY_ACTIVE_STATS_REPOSITORY } from '../domain/repositories/daily-active-stats.repository.interface'; +import { ONLINE_SNAPSHOT_REPOSITORY } from '../domain/repositories/online-snapshot.repository.interface'; +import { DEVICE_PROFILE_REPOSITORY } from '../domain/repositories/device-profile.repository.interface'; + +@Module({ + imports: [RedisModule], + providers: [ + PrismaService, + EventLogMapper, + DailyActiveStatsMapper, + OnlineSnapshotMapper, + { provide: EVENT_LOG_REPOSITORY, useClass: EventLogRepositoryImpl }, + { provide: DAILY_ACTIVE_STATS_REPOSITORY, useClass: DailyActiveStatsRepositoryImpl }, + { provide: ONLINE_SNAPSHOT_REPOSITORY, useClass: OnlineSnapshotRepositoryImpl }, + { provide: DEVICE_PROFILE_REPOSITORY, useClass: DeviceProfileRepositoryImpl }, + ], + exports: [ + PrismaService, + EVENT_LOG_REPOSITORY, + DAILY_ACTIVE_STATS_REPOSITORY, + ONLINE_SNAPSHOT_REPOSITORY, + DEVICE_PROFILE_REPOSITORY, + RedisModule, + ], +}) +export class InfrastructureModule {} diff --git a/packages/services/presence-service/src/infrastructure/persistence/mappers/daily-active-stats.mapper.ts b/packages/services/presence-service/src/infrastructure/persistence/mappers/daily-active-stats.mapper.ts new file mode 100644 index 0000000..c884b1f --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/persistence/mappers/daily-active-stats.mapper.ts @@ -0,0 +1,34 @@ +import { Injectable } from '@nestjs/common'; +import { DailyActiveStats as PrismaDailyActiveStats, Prisma } from '@prisma/client'; +import { DailyActiveStats } from '../../../domain/aggregates/daily-active-stats/daily-active-stats.aggregate'; + +@Injectable() +export class DailyActiveStatsMapper { + toDomain(prisma: PrismaDailyActiveStats): DailyActiveStats { + const dauByProvince = new Map( + Object.entries((prisma.dauByProvince as Record) ?? {}), + ); + const dauByCity = new Map( + Object.entries((prisma.dauByCity as Record) ?? {}), + ); + return DailyActiveStats.reconstitute({ + day: prisma.day, + dauCount: prisma.dauCount, + dauByProvince, + dauByCity, + calculatedAt: prisma.calculatedAt, + version: prisma.version, + }); + } + + toPersistence(domain: DailyActiveStats): Prisma.DailyActiveStatsCreateInput { + return { + day: domain.day, + dauCount: domain.dauCount, + dauByProvince: Object.fromEntries(domain.dauByProvince) as Prisma.InputJsonValue, + dauByCity: Object.fromEntries(domain.dauByCity) as Prisma.InputJsonValue, + calculatedAt: domain.calculatedAt, + version: domain.version, + }; + } +} diff --git a/packages/services/presence-service/src/infrastructure/persistence/mappers/event-log.mapper.ts b/packages/services/presence-service/src/infrastructure/persistence/mappers/event-log.mapper.ts new file mode 100644 index 0000000..4c2c73f --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/persistence/mappers/event-log.mapper.ts @@ -0,0 +1,41 @@ +import { Injectable } from '@nestjs/common'; +import { EventLog as PrismaEventLog, Prisma } from '@prisma/client'; +import { EventLog } from '../../../domain/entities/event-log.entity'; +import { InstallId } from '../../../domain/value-objects/install-id.vo'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { EventProperties, EventPropertiesData } from '../../../domain/value-objects/event-properties.vo'; + +@Injectable() +export class EventLogMapper { + toDomain(prisma: PrismaEventLog): EventLog { + return EventLog.reconstitute({ + id: prisma.id, + userId: prisma.userId, + installId: InstallId.fromString(prisma.installId), + eventName: EventName.fromString(prisma.eventName), + eventTime: prisma.eventTime, + deviceBrand: prisma.deviceBrand, + deviceModel: prisma.deviceModel, + deviceOs: prisma.deviceOs, + appVersion: prisma.appVersion, + locale: prisma.locale, + properties: EventProperties.fromData((prisma.properties as EventPropertiesData) ?? {}), + createdAt: prisma.createdAt, + }); + } + + toPersistence(domain: EventLog): Prisma.EventLogCreateManyInput { + return { + userId: domain.userId, + installId: domain.installId.value, + eventName: domain.eventName.value, + eventTime: domain.eventTime, + deviceBrand: domain.deviceBrand, + deviceModel: domain.deviceModel, + deviceOs: domain.deviceOs, + appVersion: domain.appVersion, + locale: domain.locale, + properties: domain.properties.toJSON() as Prisma.InputJsonValue, + }; + } +} diff --git a/packages/services/presence-service/src/infrastructure/persistence/mappers/online-snapshot.mapper.ts b/packages/services/presence-service/src/infrastructure/persistence/mappers/online-snapshot.mapper.ts new file mode 100644 index 0000000..5eb7019 --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/persistence/mappers/online-snapshot.mapper.ts @@ -0,0 +1,23 @@ +import { Injectable } from '@nestjs/common'; +import { OnlineSnapshot as PrismaOnlineSnapshot } from '@prisma/client'; +import { OnlineSnapshot } from '../../../domain/entities/online-snapshot.entity'; + +@Injectable() +export class OnlineSnapshotMapper { + toDomain(prisma: PrismaOnlineSnapshot): OnlineSnapshot { + return OnlineSnapshot.reconstitute({ + id: prisma.id, + ts: prisma.ts, + onlineCount: prisma.onlineCount, + windowSeconds: prisma.windowSeconds, + }); + } + + toPersistence(domain: OnlineSnapshot): Omit { + return { + ts: domain.ts, + onlineCount: domain.onlineCount, + windowSeconds: domain.windowSeconds, + }; + } +} diff --git a/packages/services/presence-service/src/infrastructure/persistence/prisma/prisma.service.ts b/packages/services/presence-service/src/infrastructure/persistence/prisma/prisma.service.ts new file mode 100644 index 0000000..5064444 --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/persistence/prisma/prisma.service.ts @@ -0,0 +1,13 @@ +import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { PrismaClient } from '@prisma/client'; + +@Injectable() +export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy { + async onModuleInit(): Promise { + await this.$connect(); + } + + async onModuleDestroy(): Promise { + await this.$disconnect(); + } +} diff --git a/packages/services/presence-service/src/infrastructure/persistence/repositories/daily-active-stats.repository.impl.ts b/packages/services/presence-service/src/infrastructure/persistence/repositories/daily-active-stats.repository.impl.ts new file mode 100644 index 0000000..94d1ade --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/persistence/repositories/daily-active-stats.repository.impl.ts @@ -0,0 +1,35 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { IDailyActiveStatsRepository } from '../../../domain/repositories/daily-active-stats.repository.interface'; +import { DailyActiveStats } from '../../../domain/aggregates/daily-active-stats/daily-active-stats.aggregate'; +import { DailyActiveStatsMapper } from '../mappers/daily-active-stats.mapper'; + +@Injectable() +export class DailyActiveStatsRepositoryImpl implements IDailyActiveStatsRepository { + constructor( + private readonly prisma: PrismaService, + private readonly mapper: DailyActiveStatsMapper, + ) {} + + async upsert(stats: DailyActiveStats): Promise { + const data = this.mapper.toPersistence(stats); + await this.prisma.dailyActiveStats.upsert({ + where: { day: stats.day }, + create: data, + update: data, + }); + } + + async findByDate(day: Date): Promise { + const record = await this.prisma.dailyActiveStats.findUnique({ where: { day } }); + return record ? this.mapper.toDomain(record) : null; + } + + async findByDateRange(startDate: Date, endDate: Date): Promise { + const records = await this.prisma.dailyActiveStats.findMany({ + where: { day: { gte: startDate, lte: endDate } }, + orderBy: { day: 'asc' }, + }); + return records.map((r) => this.mapper.toDomain(r)); + } +} diff --git a/packages/services/presence-service/src/infrastructure/persistence/repositories/device-profile.repository.impl.ts b/packages/services/presence-service/src/infrastructure/persistence/repositories/device-profile.repository.impl.ts new file mode 100644 index 0000000..35ea47f --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/persistence/repositories/device-profile.repository.impl.ts @@ -0,0 +1,40 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { IDeviceProfileRepository } from '../../../domain/repositories/device-profile.repository.interface'; +import { DeviceProfile } from '../../../domain/entities/device-profile.entity'; + +@Injectable() +export class DeviceProfileRepositoryImpl implements IDeviceProfileRepository { + constructor(private readonly prisma: PrismaService) {} + + async upsertBatch(profiles: DeviceProfile[]): Promise { + if (profiles.length === 0) return; + + // 同一批次中同一 install_id 可能重复,取最后一条 + const map = new Map(); + for (const p of profiles) { + map.set(p.installId, p); + } + + const now = new Date(); + for (const p of Array.from(map.values())) { + await this.prisma.$executeRaw` + INSERT INTO presence_device_profile + (install_id, user_id, device_brand, device_model, device_os, app_version, locale, + first_seen_at, last_seen_at, event_count) + VALUES + (${p.installId}, ${p.userId}, ${p.deviceBrand}, ${p.deviceModel}, + ${p.deviceOs}, ${p.appVersion}, ${p.locale}, ${now}, ${now}, ${p.eventCount}) + ON CONFLICT (install_id) DO UPDATE SET + user_id = COALESCE(EXCLUDED.user_id, presence_device_profile.user_id), + device_brand = COALESCE(EXCLUDED.device_brand, presence_device_profile.device_brand), + device_model = COALESCE(EXCLUDED.device_model, presence_device_profile.device_model), + device_os = COALESCE(EXCLUDED.device_os, presence_device_profile.device_os), + app_version = COALESCE(EXCLUDED.app_version, presence_device_profile.app_version), + locale = COALESCE(EXCLUDED.locale, presence_device_profile.locale), + last_seen_at = ${now}, + event_count = presence_device_profile.event_count + ${p.eventCount} + `; + } + } +} diff --git a/packages/services/presence-service/src/infrastructure/persistence/repositories/event-log.repository.impl.ts b/packages/services/presence-service/src/infrastructure/persistence/repositories/event-log.repository.impl.ts new file mode 100644 index 0000000..7105824 --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/persistence/repositories/event-log.repository.impl.ts @@ -0,0 +1,77 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { IEventLogRepository, DauQueryResult } from '../../../domain/repositories/event-log.repository.interface'; +import { EventLog } from '../../../domain/entities/event-log.entity'; +import { EventName } from '../../../domain/value-objects/event-name.vo'; +import { EventLogMapper } from '../mappers/event-log.mapper'; + +@Injectable() +export class EventLogRepositoryImpl implements IEventLogRepository { + constructor( + private readonly prisma: PrismaService, + private readonly mapper: EventLogMapper, + ) {} + + async batchInsert(logs: EventLog[]): Promise { + const data = logs.map((log) => this.mapper.toPersistence(log)); + await this.prisma.eventLog.createMany({ data }); + } + + async insert(log: EventLog): Promise { + const data = this.mapper.toPersistence(log); + const created = await this.prisma.eventLog.create({ data }); + return this.mapper.toDomain(created); + } + + async queryDau(eventName: EventName, startTime: Date, endTime: Date): Promise { + const result = await this.prisma.$queryRaw< + { total: bigint; province: string | null; city: string | null; count: bigint }[] + >` + WITH base AS ( + SELECT + COALESCE(user_id::text, install_id) AS identifier, + properties->>'province' AS province, + properties->>'city' AS city + FROM presence_event_log + WHERE event_name = ${eventName.value} + AND event_time >= ${startTime} + AND event_time < ${endTime} + ), + unique_users AS ( + SELECT DISTINCT identifier, province, city FROM base + ) + SELECT + COUNT(DISTINCT identifier) AS total, + province, + city, + COUNT(*) AS count + FROM unique_users + GROUP BY GROUPING SETS ((), (province), (city)) + `; + + let total = 0; + const byProvince = new Map(); + const byCity = new Map(); + + for (const row of result) { + if (row.province === null && row.city === null) { + total = Number(row.total); + } else if (row.province !== null && row.city === null) { + byProvince.set(row.province, Number(row.count)); + } else if (row.city !== null && row.province === null) { + byCity.set(row.city, Number(row.count)); + } + } + + return { total, byProvince, byCity }; + } + + async findByTimeRange(eventName: EventName, startTime: Date, endTime: Date, limit?: number): Promise { + const records = await this.prisma.eventLog.findMany({ + where: { eventName: eventName.value, eventTime: { gte: startTime, lt: endTime } }, + orderBy: { eventTime: 'desc' }, + take: limit, + }); + return records.map((r) => this.mapper.toDomain(r)); + } +} diff --git a/packages/services/presence-service/src/infrastructure/persistence/repositories/online-snapshot.repository.impl.ts b/packages/services/presence-service/src/infrastructure/persistence/repositories/online-snapshot.repository.impl.ts new file mode 100644 index 0000000..f192e08 --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/persistence/repositories/online-snapshot.repository.impl.ts @@ -0,0 +1,32 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { IOnlineSnapshotRepository } from '../../../domain/repositories/online-snapshot.repository.interface'; +import { OnlineSnapshot } from '../../../domain/entities/online-snapshot.entity'; +import { OnlineSnapshotMapper } from '../mappers/online-snapshot.mapper'; + +@Injectable() +export class OnlineSnapshotRepositoryImpl implements IOnlineSnapshotRepository { + constructor( + private readonly prisma: PrismaService, + private readonly mapper: OnlineSnapshotMapper, + ) {} + + async insert(snapshot: OnlineSnapshot): Promise { + const data = this.mapper.toPersistence(snapshot); + const created = await this.prisma.onlineSnapshot.create({ data }); + return this.mapper.toDomain(created); + } + + async findByTimeRange(startTime: Date, endTime: Date, interval?: string): Promise { + const records = await this.prisma.onlineSnapshot.findMany({ + where: { ts: { gte: startTime, lte: endTime } }, + orderBy: { ts: 'asc' }, + }); + return records.map((r) => this.mapper.toDomain(r)); + } + + async findLatest(): Promise { + const record = await this.prisma.onlineSnapshot.findFirst({ orderBy: { ts: 'desc' } }); + return record ? this.mapper.toDomain(record) : null; + } +} diff --git a/packages/services/presence-service/src/infrastructure/redis/presence-redis.repository.ts b/packages/services/presence-service/src/infrastructure/redis/presence-redis.repository.ts new file mode 100644 index 0000000..3d05105 --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/redis/presence-redis.repository.ts @@ -0,0 +1,33 @@ +import { Injectable } from '@nestjs/common'; +import { RedisService } from './redis.service'; + +@Injectable() +export class PresenceRedisRepository { + private readonly ONLINE_USERS_KEY = 'presence:online_users'; + + constructor(private readonly redisService: RedisService) {} + + async updateUserPresence(userId: string, timestamp: number): Promise { + await this.redisService.zadd(this.ONLINE_USERS_KEY, timestamp, userId); + } + + async countOnlineUsers(thresholdTimestamp: number): Promise { + return this.redisService.zcount(this.ONLINE_USERS_KEY, thresholdTimestamp, '+inf'); + } + + async getOnlineUsers(thresholdTimestamp: number, limit?: number): Promise { + if (limit) { + return this.redisService.zrangebyscore(this.ONLINE_USERS_KEY, thresholdTimestamp, '+inf', 'LIMIT', 0, limit); + } + return this.redisService.zrangebyscore(this.ONLINE_USERS_KEY, thresholdTimestamp, '+inf'); + } + + async removeExpiredUsers(thresholdTimestamp: number): Promise { + return this.redisService.zremrangebyscore(this.ONLINE_USERS_KEY, '-inf', thresholdTimestamp); + } + + async getUserLastHeartbeat(userId: string): Promise { + const score = await this.redisService.zscore(this.ONLINE_USERS_KEY, userId); + return score ? Number(score) : null; + } +} diff --git a/packages/services/presence-service/src/infrastructure/redis/redis.module.ts b/packages/services/presence-service/src/infrastructure/redis/redis.module.ts new file mode 100644 index 0000000..1f27678 --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/redis/redis.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; +import { RedisService } from './redis.service'; +import { PresenceRedisRepository } from './presence-redis.repository'; + +@Module({ + providers: [RedisService, PresenceRedisRepository], + exports: [RedisService, PresenceRedisRepository], +}) +export class RedisModule {} diff --git a/packages/services/presence-service/src/infrastructure/redis/redis.service.ts b/packages/services/presence-service/src/infrastructure/redis/redis.service.ts new file mode 100644 index 0000000..b711575 --- /dev/null +++ b/packages/services/presence-service/src/infrastructure/redis/redis.service.ts @@ -0,0 +1,59 @@ +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; + +@Injectable() +export class RedisService implements OnModuleDestroy { + private readonly client: Redis; + + constructor(private readonly configService: ConfigService) { + this.client = new Redis({ + host: this.configService.get('REDIS_HOST', 'localhost'), + port: this.configService.get('REDIS_PORT', 6379), + password: this.configService.get('REDIS_PASSWORD'), + db: this.configService.get('REDIS_DB', 10), + }); + } + + async onModuleDestroy(): Promise { + await this.client.quit(); + } + + async zadd(key: string, score: number, member: string): Promise { + return this.client.zadd(key, score, member); + } + + async zcount(key: string, min: number | string, max: number | string): Promise { + return this.client.zcount(key, min, max); + } + + async zrangebyscore( + key: string, + min: number | string, + max: number | string, + limit?: 'LIMIT', + offset?: number, + count?: number, + ): Promise { + if (limit && offset !== undefined && count !== undefined) { + return this.client.zrangebyscore(key, min, max, limit, offset, count); + } + return this.client.zrangebyscore(key, min, max); + } + + async zremrangebyscore(key: string, min: number | string, max: number | string): Promise { + return this.client.zremrangebyscore(key, min, max); + } + + async zscore(key: string, member: string): Promise { + return this.client.zscore(key, member); + } + + async pfadd(key: string, ...elements: string[]): Promise { + return this.client.pfadd(key, ...elements); + } + + async expire(key: string, seconds: number): Promise { + return this.client.expire(key, seconds); + } +} diff --git a/packages/services/presence-service/src/main.ts b/packages/services/presence-service/src/main.ts new file mode 100644 index 0000000..394109b --- /dev/null +++ b/packages/services/presence-service/src/main.ts @@ -0,0 +1,40 @@ +import { NestFactory } from '@nestjs/core'; +import { ValidationPipe, Logger } from '@nestjs/common'; +import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'; +import { AppModule } from './app.module'; +import { GlobalExceptionFilter } from './shared/filters/global-exception.filter'; +import { LoggingInterceptor } from './shared/interceptors/logging.interceptor'; + +async function bootstrap() { + const logger = new Logger('Bootstrap'); + const app = await NestFactory.create(AppModule); + + app.useGlobalFilters(new GlobalExceptionFilter()); + app.useGlobalInterceptors(new LoggingInterceptor()); + app.useGlobalPipes( + new ValidationPipe({ + whitelist: true, + transform: true, + forbidNonWhitelisted: true, + }), + ); + + const apiPrefix = process.env.API_PREFIX || 'api/v1'; + app.setGlobalPrefix(apiPrefix); + + const config = new DocumentBuilder() + .setTitle('Presence & Analytics Service API') + .setDescription('用户活跃度与在线状态服务') + .setVersion('1.0') + .addBearerAuth() + .build(); + const document = SwaggerModule.createDocument(app, config); + SwaggerModule.setup(`${apiPrefix}/docs`, app, document); + + const port = parseInt(process.env.APP_PORT || '3011', 10); + await app.listen(port); + + logger.log(`Presence Service running on: http://localhost:${port}/${apiPrefix}`); +} + +bootstrap(); diff --git a/packages/services/presence-service/src/shared/decorators/current-user.decorator.ts b/packages/services/presence-service/src/shared/decorators/current-user.decorator.ts new file mode 100644 index 0000000..45c5b85 --- /dev/null +++ b/packages/services/presence-service/src/shared/decorators/current-user.decorator.ts @@ -0,0 +1,9 @@ +import { createParamDecorator, ExecutionContext } from '@nestjs/common'; + +export const CurrentUser = createParamDecorator( + (data: string | undefined, ctx: ExecutionContext) => { + const request = ctx.switchToHttp().getRequest(); + const user = request.user; + return data ? user?.[data] : user; + }, +); diff --git a/packages/services/presence-service/src/shared/decorators/public.decorator.ts b/packages/services/presence-service/src/shared/decorators/public.decorator.ts new file mode 100644 index 0000000..b3845e1 --- /dev/null +++ b/packages/services/presence-service/src/shared/decorators/public.decorator.ts @@ -0,0 +1,4 @@ +import { SetMetadata } from '@nestjs/common'; + +export const IS_PUBLIC_KEY = 'isPublic'; +export const Public = () => SetMetadata(IS_PUBLIC_KEY, true); diff --git a/packages/services/presence-service/src/shared/exceptions/application.exception.ts b/packages/services/presence-service/src/shared/exceptions/application.exception.ts new file mode 100644 index 0000000..7f74d64 --- /dev/null +++ b/packages/services/presence-service/src/shared/exceptions/application.exception.ts @@ -0,0 +1,6 @@ +export class ApplicationException extends Error { + constructor(message: string) { + super(message); + this.name = 'ApplicationException'; + } +} diff --git a/packages/services/presence-service/src/shared/exceptions/domain.exception.ts b/packages/services/presence-service/src/shared/exceptions/domain.exception.ts new file mode 100644 index 0000000..76ead53 --- /dev/null +++ b/packages/services/presence-service/src/shared/exceptions/domain.exception.ts @@ -0,0 +1,6 @@ +export class DomainException extends Error { + constructor(message: string) { + super(message); + this.name = 'DomainException'; + } +} diff --git a/packages/services/presence-service/src/shared/filters/global-exception.filter.ts b/packages/services/presence-service/src/shared/filters/global-exception.filter.ts new file mode 100644 index 0000000..2f5a259 --- /dev/null +++ b/packages/services/presence-service/src/shared/filters/global-exception.filter.ts @@ -0,0 +1,61 @@ +import { + ExceptionFilter, + Catch, + ArgumentsHost, + HttpException, + HttpStatus, + Logger, +} from '@nestjs/common'; +import { Request, Response } from 'express'; +import { DomainException } from '../exceptions/domain.exception'; +import { ApplicationException } from '../exceptions/application.exception'; + +@Catch() +export class GlobalExceptionFilter implements ExceptionFilter { + private readonly logger = new Logger(GlobalExceptionFilter.name); + + catch(exception: unknown, host: ArgumentsHost): void { + const ctx = host.switchToHttp(); + const response = ctx.getResponse(); + const request = ctx.getRequest(); + + let statusCode = HttpStatus.INTERNAL_SERVER_ERROR; + let message = 'Internal server error'; + let error = 'Internal Server Error'; + + if (exception instanceof HttpException) { + statusCode = exception.getStatus(); + const exceptionResponse = exception.getResponse(); + message = typeof exceptionResponse === 'string' + ? exceptionResponse + : (exceptionResponse as any).message || message; + error = HttpStatus[statusCode] || error; + } else if (exception instanceof DomainException) { + statusCode = HttpStatus.UNPROCESSABLE_ENTITY; + message = exception.message; + error = 'Domain Error'; + } else if (exception instanceof ApplicationException) { + statusCode = HttpStatus.BAD_REQUEST; + message = exception.message; + error = 'Application Error'; + } else if (exception instanceof Error) { + message = process.env.NODE_ENV === 'production' ? 'Internal server error' : exception.message; + } + + if (statusCode >= 500) { + this.logger.error(`[${request.method}] ${request.url} - ${statusCode} - ${message}`, + exception instanceof Error ? exception.stack : undefined); + } else if (statusCode >= 400) { + this.logger.warn(`[${request.method}] ${request.url} - ${statusCode} - ${message}`); + } + + response.status(statusCode).json({ + statusCode, + timestamp: new Date().toISOString(), + path: request.url, + method: request.method, + message, + error, + }); + } +} diff --git a/packages/services/presence-service/src/shared/filters/index.ts b/packages/services/presence-service/src/shared/filters/index.ts new file mode 100644 index 0000000..c3ec44d --- /dev/null +++ b/packages/services/presence-service/src/shared/filters/index.ts @@ -0,0 +1 @@ +export * from './global-exception.filter'; diff --git a/packages/services/presence-service/src/shared/guards/admin.guard.ts b/packages/services/presence-service/src/shared/guards/admin.guard.ts new file mode 100644 index 0000000..8a68f9d --- /dev/null +++ b/packages/services/presence-service/src/shared/guards/admin.guard.ts @@ -0,0 +1,38 @@ +import { Injectable, CanActivate, ExecutionContext, UnauthorizedException, ForbiddenException } from '@nestjs/common'; +import { JwtService } from '@nestjs/jwt'; + +/** + * 管理员 Guard — 验证 JWT 中 roles 包含 'admin'。 + * 用于保护 online-count、online-history、dau 等聚合查询接口。 + */ +@Injectable() +export class AdminGuard implements CanActivate { + constructor(private readonly jwtService: JwtService) {} + + async canActivate(context: ExecutionContext): Promise { + const request = context.switchToHttp().getRequest(); + const token = this.extractTokenFromHeader(request); + if (!token) throw new UnauthorizedException('缺少认证令牌'); + + try { + const payload = await this.jwtService.verifyAsync(token); + const roles: string[] = payload.roles ?? []; + if (!roles.includes('admin')) throw new ForbiddenException('需要管理员权限'); + request.user = { + userId: payload.sub, + email: payload.email, + roles, + }; + } catch (e) { + if (e instanceof ForbiddenException) throw e; + throw new UnauthorizedException('令牌无效或已过期'); + } + + return true; + } + + private extractTokenFromHeader(request: any): string | undefined { + const [type, token] = request.headers.authorization?.split(' ') ?? []; + return type === 'Bearer' ? token : undefined; + } +} diff --git a/packages/services/presence-service/src/shared/guards/jwt-auth.guard.ts b/packages/services/presence-service/src/shared/guards/jwt-auth.guard.ts new file mode 100644 index 0000000..a9292c6 --- /dev/null +++ b/packages/services/presence-service/src/shared/guards/jwt-auth.guard.ts @@ -0,0 +1,44 @@ +import { Injectable, CanActivate, ExecutionContext, UnauthorizedException } from '@nestjs/common'; +import { Reflector } from '@nestjs/core'; +import { JwtService } from '@nestjs/jwt'; +import { IS_PUBLIC_KEY } from '../decorators/public.decorator'; + +@Injectable() +export class JwtAuthGuard implements CanActivate { + constructor( + private readonly jwtService: JwtService, + private readonly reflector: Reflector, + ) {} + + async canActivate(context: ExecutionContext): Promise { + const isPublic = this.reflector.getAllAndOverride(IS_PUBLIC_KEY, [ + context.getHandler(), + context.getClass(), + ]); + if (isPublic) return true; + + const request = context.switchToHttp().getRequest(); + const token = this.extractTokenFromHeader(request); + if (!token) throw new UnauthorizedException('缺少认证令牌'); + + try { + const payload = await this.jwtService.verifyAsync(token); + if (payload.type && payload.type !== 'access') throw new UnauthorizedException('无效的令牌类型'); + request.user = { + userId: payload.sub, + email: payload.email, + tenantId: payload.tenantId, + roles: payload.roles ?? [], + }; + } catch { + throw new UnauthorizedException('令牌无效或已过期'); + } + + return true; + } + + private extractTokenFromHeader(request: any): string | undefined { + const [type, token] = request.headers.authorization?.split(' ') ?? []; + return type === 'Bearer' ? token : undefined; + } +} diff --git a/packages/services/presence-service/src/shared/interceptors/index.ts b/packages/services/presence-service/src/shared/interceptors/index.ts new file mode 100644 index 0000000..c9e2a22 --- /dev/null +++ b/packages/services/presence-service/src/shared/interceptors/index.ts @@ -0,0 +1 @@ +export * from './logging.interceptor'; diff --git a/packages/services/presence-service/src/shared/interceptors/logging.interceptor.ts b/packages/services/presence-service/src/shared/interceptors/logging.interceptor.ts new file mode 100644 index 0000000..094ccef --- /dev/null +++ b/packages/services/presence-service/src/shared/interceptors/logging.interceptor.ts @@ -0,0 +1,29 @@ +import { Injectable, NestInterceptor, ExecutionContext, CallHandler, Logger } from '@nestjs/common'; +import { Observable } from 'rxjs'; +import { tap, catchError } from 'rxjs/operators'; +import { Request, Response } from 'express'; + +@Injectable() +export class LoggingInterceptor implements NestInterceptor { + private readonly logger = new Logger('HTTP'); + + intercept(context: ExecutionContext, next: CallHandler): Observable { + const ctx = context.switchToHttp(); + const request = ctx.getRequest(); + const response = ctx.getResponse(); + const startTime = Date.now(); + const { method, url } = request; + + this.logger.log(`→ ${method} ${url}`); + + return next.handle().pipe( + tap(() => { + this.logger.log(`← ${method} ${url} - ${response.statusCode} - ${Date.now() - startTime}ms`); + }), + catchError((error) => { + this.logger.error(`← ${method} ${url} - ${error.status || 500} - ${Date.now() - startTime}ms - ${error.message}`); + throw error; + }), + ); + } +} diff --git a/packages/services/presence-service/src/shared/utils/timezone.util.ts b/packages/services/presence-service/src/shared/utils/timezone.util.ts new file mode 100644 index 0000000..ae5f33a --- /dev/null +++ b/packages/services/presence-service/src/shared/utils/timezone.util.ts @@ -0,0 +1,21 @@ +import { format, startOfDay, endOfDay } from 'date-fns'; +import { utcToZonedTime, zonedTimeToUtc } from 'date-fns-tz'; + +const DEFAULT_TIMEZONE = 'Asia/Shanghai'; + +export function startOfDayInTimezone(date: Date, timezone: string = DEFAULT_TIMEZONE): Date { + const zonedDate = utcToZonedTime(date, timezone); + const start = startOfDay(zonedDate); + return zonedTimeToUtc(start, timezone); +} + +export function endOfDayInTimezone(date: Date, timezone: string = DEFAULT_TIMEZONE): Date { + const zonedDate = utcToZonedTime(date, timezone); + const end = endOfDay(zonedDate); + return zonedTimeToUtc(end, timezone); +} + +export function formatToDateKey(date: Date, timezone: string = DEFAULT_TIMEZONE): string { + const zonedDate = utcToZonedTime(date, timezone); + return format(zonedDate, 'yyyy-MM-dd'); +} diff --git a/packages/services/presence-service/tsconfig.json b/packages/services/presence-service/tsconfig.json new file mode 100644 index 0000000..8231993 --- /dev/null +++ b/packages/services/presence-service/tsconfig.json @@ -0,0 +1,23 @@ +{ + "compilerOptions": { + "module": "commonjs", + "declaration": true, + "removeComments": true, + "emitDecoratorMetadata": true, + "experimentalDecorators": true, + "allowSyntheticDefaultImports": true, + "target": "ES2021", + "sourceMap": true, + "outDir": "./dist", + "baseUrl": "./", + "incremental": true, + "skipLibCheck": true, + "strictNullChecks": false, + "noImplicitAny": false, + "strictBindCallApply": false, + "forceConsistentCasingInFileNames": false, + "noFallthroughCasesInSwitch": false + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist", "test"] +} diff --git a/packages/shared/database/src/migrations/010-create-presence-tables.sql b/packages/shared/database/src/migrations/010-create-presence-tables.sql new file mode 100644 index 0000000..8f6abbb --- /dev/null +++ b/packages/shared/database/src/migrations/010-create-presence-tables.sql @@ -0,0 +1,60 @@ +-- IT0 Presence & Analytics Tables (public schema) +-- Telemetry event log, device profiles, DAU stats, online snapshots + +CREATE TABLE IF NOT EXISTS presence_event_log ( + id BIGSERIAL PRIMARY KEY, + user_id VARCHAR(36), + install_id VARCHAR(64) NOT NULL, + event_name VARCHAR(64) NOT NULL, + event_time TIMESTAMPTZ NOT NULL, + device_brand VARCHAR(64), + device_model VARCHAR(64), + device_os VARCHAR(32), + app_version VARCHAR(32), + locale VARCHAR(16), + properties JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_presence_event_log_event_time ON presence_event_log (event_time); +CREATE INDEX IF NOT EXISTS idx_presence_event_log_event_name ON presence_event_log (event_name); +CREATE INDEX IF NOT EXISTS idx_presence_event_log_event_name_time ON presence_event_log (event_name, event_time); +CREATE INDEX IF NOT EXISTS idx_presence_event_log_user_id ON presence_event_log (user_id); +CREATE INDEX IF NOT EXISTS idx_presence_event_log_device_brand ON presence_event_log (device_brand); +CREATE INDEX IF NOT EXISTS idx_presence_event_log_app_version ON presence_event_log (app_version); + +CREATE TABLE IF NOT EXISTS presence_device_profile ( + install_id VARCHAR(64) PRIMARY KEY, + user_id VARCHAR(36), + device_brand VARCHAR(64), + device_model VARCHAR(64), + device_os VARCHAR(32), + app_version VARCHAR(32), + locale VARCHAR(16), + first_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + event_count INTEGER NOT NULL DEFAULT 1 +); + +CREATE INDEX IF NOT EXISTS idx_presence_device_profile_brand ON presence_device_profile (device_brand); +CREATE INDEX IF NOT EXISTS idx_presence_device_profile_app_version ON presence_device_profile (app_version); +CREATE INDEX IF NOT EXISTS idx_presence_device_profile_user_id ON presence_device_profile (user_id); +CREATE INDEX IF NOT EXISTS idx_presence_device_profile_last_seen ON presence_device_profile (last_seen_at DESC); + +CREATE TABLE IF NOT EXISTS presence_daily_active_users ( + day DATE PRIMARY KEY, + dau_count INTEGER NOT NULL, + dau_by_province JSONB, + dau_by_city JSONB, + calculated_at TIMESTAMPTZ NOT NULL, + version INTEGER NOT NULL DEFAULT 1 +); + +CREATE TABLE IF NOT EXISTS presence_online_snapshots ( + id BIGSERIAL PRIMARY KEY, + ts TIMESTAMPTZ NOT NULL UNIQUE, + online_count INTEGER NOT NULL, + window_seconds INTEGER NOT NULL DEFAULT 300 +); + +CREATE INDEX IF NOT EXISTS idx_presence_online_snapshots_ts ON presence_online_snapshots (ts DESC); diff --git a/packages/shared/database/src/run-migrations.ts b/packages/shared/database/src/run-migrations.ts index b511cf9..a579214 100644 --- a/packages/shared/database/src/run-migrations.ts +++ b/packages/shared/database/src/run-migrations.ts @@ -70,6 +70,10 @@ async function runSharedSchema(client: Client) { log('Running 005-create-billing-tables.sql ...'); await runSqlFile(client, path.join(MIGRATIONS_DIR, '005-create-billing-tables.sql')); log('Billing tables created.'); + + log('Running 010-create-presence-tables.sql ...'); + await runSqlFile(client, path.join(MIGRATIONS_DIR, '010-create-presence-tables.sql')); + log('Presence tables created.'); } async function runTenantSchema(client: Client, tenantId: string) {