diff --git a/backend/kong/kong.yml b/backend/kong/kong.yml index 4f3ac27..623ef94 100644 --- a/backend/kong/kong.yml +++ b/backend/kong/kong.yml @@ -43,6 +43,22 @@ services: paths: - /api/v1/admin/system strip_path: false + - name: telemetry-routes + paths: + - /api/v1/telemetry + strip_path: false + - name: admin-telemetry-routes + paths: + - /api/v1/admin/telemetry + strip_path: false + - name: app-version-routes + paths: + - /api/v1/app/version + strip_path: false + - name: admin-version-routes + paths: + - /api/v1/admin/versions + strip_path: false # --- issuer-service (NestJS :3002) --- - name: issuer-service diff --git a/backend/migrations/032_create_telemetry_events.sql b/backend/migrations/032_create_telemetry_events.sql new file mode 100644 index 0000000..6b9f2df --- /dev/null +++ b/backend/migrations/032_create_telemetry_events.sql @@ -0,0 +1,16 @@ +-- Telemetry event log (append-only) +-- High-volume table for client-side event collection +CREATE TABLE IF NOT EXISTS telemetry_events ( + id BIGSERIAL PRIMARY KEY, + user_id UUID REFERENCES users(id) ON DELETE SET NULL, + install_id VARCHAR(128) NOT NULL, + event_name VARCHAR(64) NOT NULL, + event_time TIMESTAMPTZ NOT NULL, + properties JSONB DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_telemetry_events_time ON telemetry_events(event_time); +CREATE INDEX idx_telemetry_events_name_time ON telemetry_events(event_name, event_time); +CREATE INDEX idx_telemetry_events_user ON telemetry_events(user_id); +CREATE INDEX idx_telemetry_events_install ON telemetry_events(install_id); diff --git a/backend/migrations/033_create_daily_active_stats.sql b/backend/migrations/033_create_daily_active_stats.sql new file mode 100644 index 0000000..4ed998e --- /dev/null +++ b/backend/migrations/033_create_daily_active_stats.sql @@ -0,0 +1,10 @@ +-- Daily Active User statistics +-- Aggregated from telemetry_events, recalculated nightly +CREATE TABLE IF NOT EXISTS daily_active_stats ( + day DATE PRIMARY KEY, + dau_count INT NOT NULL DEFAULT 0, + dau_by_platform JSONB DEFAULT '{}', + dau_by_region JSONB DEFAULT '{}', + calculated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + version INT NOT NULL DEFAULT 1 +); diff --git a/backend/migrations/034_create_online_snapshots.sql b/backend/migrations/034_create_online_snapshots.sql new file mode 100644 index 0000000..cc7335b --- /dev/null +++ b/backend/migrations/034_create_online_snapshots.sql @@ -0,0 +1,10 @@ +-- Online user count snapshots +-- Recorded every minute for historical trend analysis +CREATE TABLE IF NOT EXISTS online_snapshots ( + id BIGSERIAL PRIMARY KEY, + ts TIMESTAMPTZ NOT NULL UNIQUE, + online_count INT NOT NULL DEFAULT 0, + window_seconds INT NOT NULL DEFAULT 180 +); + +CREATE INDEX idx_online_snapshots_ts ON online_snapshots(ts DESC); diff --git a/backend/migrations/035_create_app_versions.sql b/backend/migrations/035_create_app_versions.sql new file mode 100644 index 0000000..8126952 --- /dev/null +++ b/backend/migrations/035_create_app_versions.sql @@ -0,0 +1,26 @@ +-- Mobile app version management for OTA updates +-- Supports Android (APK) and iOS (IPA) +CREATE TABLE IF NOT EXISTS app_versions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + platform VARCHAR(10) NOT NULL CHECK (platform IN ('ANDROID', 'IOS')), + version_code INT NOT NULL, + version_name VARCHAR(32) NOT NULL, + build_number VARCHAR(64) NOT NULL, + download_url TEXT NOT NULL, + file_size BIGINT NOT NULL DEFAULT 0, + file_sha256 VARCHAR(64) NOT NULL, + min_os_version VARCHAR(16), + changelog TEXT NOT NULL DEFAULT '', + is_force_update BOOLEAN NOT NULL DEFAULT FALSE, + is_enabled BOOLEAN NOT NULL DEFAULT TRUE, + release_date TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by UUID, + updated_by UUID, + version INT NOT NULL DEFAULT 1 +); + +CREATE INDEX idx_app_versions_platform ON app_versions(platform, is_enabled); +CREATE INDEX idx_app_versions_code ON app_versions(platform, version_code DESC); +CREATE UNIQUE INDEX idx_app_versions_platform_code ON app_versions(platform, version_code); diff --git a/backend/services/user-service/package.json b/backend/services/user-service/package.json index 0b1ebb2..6dd2804 100644 --- a/backend/services/user-service/package.json +++ b/backend/services/user-service/package.json @@ -18,6 +18,7 @@ "@nestjs/jwt": "^10.2.0", "@nestjs/passport": "^10.0.3", "@nestjs/swagger": "^7.2.0", + "@nestjs/schedule": "^4.0.0", "@nestjs/throttler": "^5.1.0", "typeorm": "^0.3.19", "pg": "^8.11.3", @@ -28,6 +29,7 @@ "class-transformer": "^0.5.1", "ioredis": "^5.3.2", "kafkajs": "^2.2.4", + "minio": "^8.0.0", "reflect-metadata": "^0.2.1", "rxjs": "^7.8.1" }, @@ -37,6 +39,7 @@ "@types/node": "^20.11.0", "@types/passport-jwt": "^4.0.1", "@types/bcryptjs": "^2.4.6", + "@types/multer": "^1.4.11", "typescript": "^5.3.0", "jest": "^29.7.0", "ts-jest": "^29.1.0", diff --git a/backend/services/user-service/src/application/services/app-version.service.ts b/backend/services/user-service/src/application/services/app-version.service.ts new file mode 100644 index 0000000..543316b --- /dev/null +++ b/backend/services/user-service/src/application/services/app-version.service.ts @@ -0,0 +1,128 @@ +import { Injectable, NotFoundException, ConflictException, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { AppVersion, Platform } from '../../domain/entities/app-version.entity'; + +@Injectable() +export class AppVersionService { + private readonly logger = new Logger(AppVersionService.name); + + constructor( + @InjectRepository(AppVersion) private readonly versionRepo: Repository, + ) {} + + /** Check for update - mobile client API */ + async checkUpdate(platform: Platform, currentVersionCode: number) { + const latest = await this.versionRepo.findOne({ + where: { platform, isEnabled: true }, + order: { versionCode: 'DESC' }, + }); + + if (!latest || latest.versionCode <= currentVersionCode) { + return { needUpdate: false }; + } + + return { + needUpdate: true, + forceUpdate: latest.isForceUpdate && latest.isEnabled, + version: latest.versionName, + versionCode: latest.versionCode, + downloadUrl: latest.downloadUrl, + fileSize: Number(latest.fileSize), + fileSizeFriendly: this.formatFileSize(BigInt(latest.fileSize)), + sha256: latest.fileSha256, + updateLog: latest.changelog, + minOsVersion: latest.minOsVersion, + releaseDate: latest.releaseDate?.toISOString() || null, + }; + } + + /** List versions (admin) */ + async listVersions(platform?: Platform, includeDisabled = false) { + const where: any = {}; + if (platform) where.platform = platform; + if (!includeDisabled) where.isEnabled = true; + + return this.versionRepo.find({ + where, + order: { versionCode: 'DESC' }, + }); + } + + /** Get version detail */ + async getVersion(id: string) { + const version = await this.versionRepo.findOne({ where: { id } }); + if (!version) throw new NotFoundException('Version not found'); + return version; + } + + /** Create version (admin) */ + async createVersion(data: { + platform: Platform; + versionCode: number; + versionName: string; + buildNumber: string; + downloadUrl: string; + fileSize: string; + fileSha256: string; + changelog: string; + isForceUpdate: boolean; + minOsVersion?: string; + releaseDate?: Date; + createdBy?: string; + }) { + // Check duplicate + const existing = await this.versionRepo.findOne({ + where: { platform: data.platform, versionCode: data.versionCode }, + }); + if (existing) { + throw new ConflictException( + `Version code ${data.versionCode} already exists for ${data.platform}`, + ); + } + + const version = this.versionRepo.create({ + ...data, + isEnabled: true, + }); + return this.versionRepo.save(version); + } + + /** Update version (admin) */ + async updateVersion(id: string, data: Partial<{ + downloadUrl: string; + fileSize: string; + fileSha256: string; + changelog: string; + isForceUpdate: boolean; + minOsVersion: string; + releaseDate: Date; + updatedBy: string; + }>) { + const version = await this.getVersion(id); + Object.assign(version, data); + return this.versionRepo.save(version); + } + + /** Toggle enable/disable */ + async toggleVersion(id: string, isEnabled: boolean) { + await this.getVersion(id); // Verify exists + await this.versionRepo.update(id, { isEnabled }); + return { success: true }; + } + + /** Delete version */ + async deleteVersion(id: string) { + await this.getVersion(id); // Verify exists + await this.versionRepo.delete(id); + return { success: true }; + } + + private formatFileSize(bytes: bigint): string { + const n = Number(bytes); + if (n < 1024) return `${n} B`; + if (n < 1024 * 1024) return `${(n / 1024).toFixed(1)} KB`; + if (n < 1024 * 1024 * 1024) return `${(n / (1024 * 1024)).toFixed(1)} MB`; + return `${(n / (1024 * 1024 * 1024)).toFixed(2)} GB`; + } +} diff --git a/backend/services/user-service/src/application/services/file-storage.service.ts b/backend/services/user-service/src/application/services/file-storage.service.ts new file mode 100644 index 0000000..f429db9 --- /dev/null +++ b/backend/services/user-service/src/application/services/file-storage.service.ts @@ -0,0 +1,71 @@ +import { Injectable, Logger } from '@nestjs/common'; +import * as crypto from 'crypto'; +import { Client as MinioClient } from 'minio'; + +const BUCKET = 'app-releases'; + +@Injectable() +export class FileStorageService { + private readonly logger = new Logger(FileStorageService.name); + private readonly minio: MinioClient; + + constructor() { + this.minio = new MinioClient({ + endPoint: process.env.MINIO_ENDPOINT || 'localhost', + port: parseInt(process.env.MINIO_PORT || '9000', 10), + useSSL: process.env.MINIO_USE_SSL === 'true', + accessKey: process.env.MINIO_ACCESS_KEY || 'minioadmin', + secretKey: process.env.MINIO_SECRET_KEY || 'minioadmin', + }); + } + + /** Upload file to MinIO and return metadata */ + async uploadFile( + buffer: Buffer, + originalName: string, + platform: string, + versionName: string, + ) { + // Ensure bucket exists + const exists = await this.minio.bucketExists(BUCKET); + if (!exists) { + await this.minio.makeBucket(BUCKET); + } + + // Compute SHA256 + const sha256 = crypto.createHash('sha256').update(buffer).digest('hex'); + + // Generate object name + const ext = originalName.split('.').pop() || 'bin'; + const timestamp = Date.now(); + const random = crypto.randomBytes(4).toString('hex'); + const objectName = `${platform}/${versionName}/${timestamp}-${random}.${ext}`; + + // Upload + await this.minio.putObject(BUCKET, objectName, buffer, buffer.length, { + 'Content-Type': + ext === 'apk' + ? 'application/vnd.android.package-archive' + : 'application/octet-stream', + }); + + // Generate presigned download URL (24h) + const downloadUrl = await this.minio.presignedGetObject( + BUCKET, + objectName, + 24 * 3600, + ); + + return { + objectName, + downloadUrl, + fileSize: buffer.length.toString(), + sha256, + }; + } + + /** Generate presigned URL for download */ + async getDownloadUrl(objectName: string): Promise { + return this.minio.presignedGetObject(BUCKET, objectName, 24 * 3600); + } +} diff --git a/backend/services/user-service/src/application/services/telemetry-scheduler.service.ts b/backend/services/user-service/src/application/services/telemetry-scheduler.service.ts new file mode 100644 index 0000000..271228c --- /dev/null +++ b/backend/services/user-service/src/application/services/telemetry-scheduler.service.ts @@ -0,0 +1,139 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { OnlineSnapshot } from '../../domain/entities/online-snapshot.entity'; +import { DailyActiveStats } from '../../domain/entities/daily-active-stats.entity'; +import { TelemetryEvent } from '../../domain/entities/telemetry-event.entity'; +import { PresenceRedisService } from '../../infrastructure/redis/presence-redis.service'; + +@Injectable() +export class TelemetrySchedulerService { + private readonly logger = new Logger(TelemetrySchedulerService.name); + + constructor( + @InjectRepository(OnlineSnapshot) private readonly snapshotRepo: Repository, + @InjectRepository(DailyActiveStats) private readonly dauRepo: Repository, + @InjectRepository(TelemetryEvent) private readonly eventRepo: Repository, + private readonly presenceRedis: PresenceRedisService, + ) {} + + /** Record online snapshot every minute */ + @Cron(CronExpression.EVERY_MINUTE) + async recordOnlineSnapshot() { + try { + const count = await this.presenceRedis.countOnline(); + const snapshot = this.snapshotRepo.create({ + ts: new Date(), + onlineCount: count, + windowSeconds: this.presenceRedis.getWindowSeconds(), + }); + await this.snapshotRepo.save(snapshot); + } catch (err) { + this.logger.error(`Failed to record online snapshot: ${err.message}`); + } + } + + /** Clean expired Redis presence data every hour */ + @Cron(CronExpression.EVERY_HOUR) + async cleanupExpiredPresence() { + try { + const removed = await this.presenceRedis.cleanupExpired(); + if (removed > 0) { + this.logger.log(`Cleaned up ${removed} expired presence entries`); + } + } catch (err) { + this.logger.error(`Failed to cleanup presence: ${err.message}`); + } + } + + /** Calculate yesterday's DAU at 1:00 AM */ + @Cron('0 1 * * *') + async calculateYesterdayDau() { + try { + const yesterday = new Date(); + yesterday.setDate(yesterday.getDate() - 1); + const dayStr = yesterday.toISOString().slice(0, 10); + await this.calculateDauForDate(dayStr); + this.logger.log(`Calculated DAU for ${dayStr}`); + } catch (err) { + this.logger.error(`Failed to calculate DAU: ${err.message}`); + } + } + + /** Rolling DAU update every hour */ + @Cron('30 * * * *') + async calculateTodayDauRolling() { + try { + const today = new Date().toISOString().slice(0, 10); + await this.calculateDauForDate(today); + } catch (err) { + this.logger.error(`Failed to calculate rolling DAU: ${err.message}`); + } + } + + private async calculateDauForDate(dayStr: string) { + const startTime = new Date(`${dayStr}T00:00:00Z`); + const endTime = new Date(`${dayStr}T23:59:59.999Z`); + + // Count distinct users/installIds for app_session_start events + const result = await this.eventRepo + .createQueryBuilder('e') + .select("COUNT(DISTINCT COALESCE(e.user_id::text, e.install_id))", 'dauCount') + .where("e.event_name = 'app_session_start'") + .andWhere('e.event_time >= :startTime AND e.event_time < :endTime', { + startTime, + endTime: new Date(endTime.getTime() + 1), + }) + .getRawOne(); + + // Platform breakdown + const platformResult = await this.eventRepo + .createQueryBuilder('e') + .select("e.properties->>'platform'", 'platform') + .addSelect("COUNT(DISTINCT COALESCE(e.user_id::text, e.install_id))", 'count') + .where("e.event_name = 'app_session_start'") + .andWhere('e.event_time >= :startTime AND e.event_time < :endTime', { + startTime, + endTime: new Date(endTime.getTime() + 1), + }) + .groupBy("e.properties->>'platform'") + .getRawMany(); + + // Region breakdown + const regionResult = await this.eventRepo + .createQueryBuilder('e') + .select("e.properties->>'region'", 'region') + .addSelect("COUNT(DISTINCT COALESCE(e.user_id::text, e.install_id))", 'count') + .where("e.event_name = 'app_session_start'") + .andWhere('e.event_time >= :startTime AND e.event_time < :endTime', { + startTime, + endTime: new Date(endTime.getTime() + 1), + }) + .andWhere("e.properties->>'region' IS NOT NULL") + .groupBy("e.properties->>'region'") + .getRawMany(); + + const dauByPlatform: Record = {}; + for (const r of platformResult) { + if (r.platform) dauByPlatform[r.platform] = parseInt(r.count, 10); + } + + const dauByRegion: Record = {}; + for (const r of regionResult) { + if (r.region) dauByRegion[r.region] = parseInt(r.count, 10); + } + + // Upsert + await this.dauRepo.upsert( + { + day: dayStr, + dauCount: parseInt(result.dauCount, 10) || 0, + dauByPlatform, + dauByRegion, + calculatedAt: new Date(), + }, + ['day'], + ); + } +} diff --git a/backend/services/user-service/src/application/services/telemetry.service.ts b/backend/services/user-service/src/application/services/telemetry.service.ts new file mode 100644 index 0000000..cea8c18 --- /dev/null +++ b/backend/services/user-service/src/application/services/telemetry.service.ts @@ -0,0 +1,125 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { TelemetryEvent } from '../../domain/entities/telemetry-event.entity'; +import { OnlineSnapshot } from '../../domain/entities/online-snapshot.entity'; +import { DailyActiveStats } from '../../domain/entities/daily-active-stats.entity'; +import { PresenceRedisService } from '../../infrastructure/redis/presence-redis.service'; + +@Injectable() +export class TelemetryService { + private readonly logger = new Logger(TelemetryService.name); + + constructor( + @InjectRepository(TelemetryEvent) private readonly eventRepo: Repository, + @InjectRepository(OnlineSnapshot) private readonly snapshotRepo: Repository, + @InjectRepository(DailyActiveStats) private readonly dauRepo: Repository, + private readonly presenceRedis: PresenceRedisService, + ) {} + + /** Batch insert telemetry events */ + async recordEvents(events: Array<{ + userId?: string; + installId: string; + eventName: string; + clientTs: number; + properties?: Record; + }>): Promise<{ recorded: number }> { + const entities = events.map((e) => { + const event = new TelemetryEvent(); + event.userId = e.userId || null; + event.installId = e.installId; + event.eventName = e.eventName; + event.eventTime = new Date(e.clientTs * 1000); + event.properties = e.properties || {}; + return event; + }); + + await this.eventRepo.save(entities); + + // Update HyperLogLog for DAU on session_start events + const today = new Date().toISOString().slice(0, 10); + for (const e of events) { + if (e.eventName === 'app_session_start') { + const identifier = e.userId || e.installId; + await this.presenceRedis.addDauIdentifier(today, identifier); + } + } + + return { recorded: entities.length }; + } + + /** Record heartbeat */ + async recordHeartbeat(userId: string, installId: string, appVersion: string): Promise { + await this.presenceRedis.updatePresence(userId); + } + + /** Get current online count */ + async getOnlineCount(): Promise<{ count: number; windowSeconds: number; queriedAt: string }> { + const count = await this.presenceRedis.countOnline(); + return { + count, + windowSeconds: this.presenceRedis.getWindowSeconds(), + queriedAt: new Date().toISOString(), + }; + } + + /** Get online history with interval aggregation */ + async getOnlineHistory(startTime: Date, endTime: Date, interval: '1m' | '5m' | '1h' = '5m') { + const snapshots = await this.snapshotRepo + .createQueryBuilder('s') + .where('s.ts >= :startTime AND s.ts <= :endTime', { startTime, endTime }) + .orderBy('s.ts', 'ASC') + .getMany(); + + // Aggregate by interval + const intervalMs = interval === '1m' ? 60000 : interval === '5m' ? 300000 : 3600000; + const buckets = new Map(); + + for (const snap of snapshots) { + const ts = snap.ts.getTime(); + const bucketKey = Math.floor(ts / intervalMs) * intervalMs; + if (!buckets.has(bucketKey)) buckets.set(bucketKey, []); + buckets.get(bucketKey)!.push(snap.onlineCount); + } + + const dataPoints = Array.from(buckets.entries()).map(([ts, counts]) => ({ + timestamp: new Date(ts).toISOString(), + onlineCount: Math.round(counts.reduce((a, b) => a + b, 0) / counts.length), + windowSeconds: 180, + })); + + const allCounts = dataPoints.map((d) => d.onlineCount); + const maxOnline = allCounts.length > 0 ? Math.max(...allCounts) : 0; + const minOnline = allCounts.length > 0 ? Math.min(...allCounts) : 0; + const avgOnline = allCounts.length > 0 ? Math.round(allCounts.reduce((a, b) => a + b, 0) / allCounts.length) : 0; + + return { + data: dataPoints, + interval, + startTime: startTime.toISOString(), + endTime: endTime.toISOString(), + total: dataPoints.length, + summary: { maxOnline, minOnline, avgOnline }, + }; + } + + /** Get DAU stats for date range */ + async getDauStats(startDate: string, endDate: string) { + const stats = await this.dauRepo + .createQueryBuilder('d') + .where('d.day >= :startDate AND d.day <= :endDate', { startDate, endDate }) + .orderBy('d.day', 'ASC') + .getMany(); + + return { + data: stats.map((s) => ({ + day: s.day, + dauCount: s.dauCount, + byPlatform: s.dauByPlatform, + byRegion: s.dauByRegion, + })), + total: stats.length, + }; + } +} diff --git a/backend/services/user-service/src/domain/entities/app-version.entity.ts b/backend/services/user-service/src/domain/entities/app-version.entity.ts new file mode 100644 index 0000000..1bd0fe2 --- /dev/null +++ b/backend/services/user-service/src/domain/entities/app-version.entity.ts @@ -0,0 +1,63 @@ +import { + Entity, Column, PrimaryGeneratedColumn, CreateDateColumn, + UpdateDateColumn, VersionColumn, Index, +} from 'typeorm'; + +export enum Platform { + ANDROID = 'ANDROID', + IOS = 'IOS', +} + +@Entity('app_versions') +@Index('idx_app_versions_platform', ['platform', 'isEnabled']) +@Index('idx_app_versions_code', ['platform', 'versionCode']) +export class AppVersion { + @PrimaryGeneratedColumn('uuid') id: string; + + @Column({ type: 'varchar', length: 10 }) + platform: Platform; + + @Column({ name: 'version_code', type: 'int' }) + versionCode: number; + + @Column({ name: 'version_name', length: 32 }) + versionName: string; + + @Column({ name: 'build_number', length: 64 }) + buildNumber: string; + + @Column({ name: 'download_url', type: 'text' }) + downloadUrl: string; + + @Column({ name: 'file_size', type: 'bigint', default: 0 }) + fileSize: string; // bigint as string in TypeORM + + @Column({ name: 'file_sha256', length: 64 }) + fileSha256: string; + + @Column({ name: 'min_os_version', length: 16, nullable: true }) + minOsVersion: string | null; + + @Column({ type: 'text', default: '' }) + changelog: string; + + @Column({ name: 'is_force_update', type: 'boolean', default: false }) + isForceUpdate: boolean; + + @Column({ name: 'is_enabled', type: 'boolean', default: true }) + isEnabled: boolean; + + @Column({ name: 'release_date', type: 'timestamptz', nullable: true }) + releaseDate: Date | null; + + @CreateDateColumn({ name: 'created_at', type: 'timestamptz' }) createdAt: Date; + @UpdateDateColumn({ name: 'updated_at', type: 'timestamptz' }) updatedAt: Date; + + @Column({ name: 'created_by', type: 'uuid', nullable: true }) + createdBy: string | null; + + @Column({ name: 'updated_by', type: 'uuid', nullable: true }) + updatedBy: string | null; + + @VersionColumn({ default: 1 }) version: number; +} diff --git a/backend/services/user-service/src/domain/entities/daily-active-stats.entity.ts b/backend/services/user-service/src/domain/entities/daily-active-stats.entity.ts new file mode 100644 index 0000000..acc5597 --- /dev/null +++ b/backend/services/user-service/src/domain/entities/daily-active-stats.entity.ts @@ -0,0 +1,22 @@ +import { Entity, Column, PrimaryColumn, VersionColumn } from 'typeorm'; + +@Entity('daily_active_stats') +export class DailyActiveStats { + @PrimaryColumn({ type: 'date' }) + day: string; + + @Column({ name: 'dau_count', type: 'int', default: 0 }) + dauCount: number; + + @Column({ name: 'dau_by_platform', type: 'jsonb', default: '{}' }) + dauByPlatform: Record; + + @Column({ name: 'dau_by_region', type: 'jsonb', default: '{}' }) + dauByRegion: Record; + + @Column({ name: 'calculated_at', type: 'timestamptz' }) + calculatedAt: Date; + + @VersionColumn({ default: 1 }) + version: number; +} diff --git a/backend/services/user-service/src/domain/entities/online-snapshot.entity.ts b/backend/services/user-service/src/domain/entities/online-snapshot.entity.ts new file mode 100644 index 0000000..9932830 --- /dev/null +++ b/backend/services/user-service/src/domain/entities/online-snapshot.entity.ts @@ -0,0 +1,16 @@ +import { Entity, Column, PrimaryGeneratedColumn } from 'typeorm'; + +@Entity('online_snapshots') +export class OnlineSnapshot { + @PrimaryGeneratedColumn({ type: 'bigint' }) + id: string; + + @Column({ type: 'timestamptz', unique: true }) + ts: Date; + + @Column({ name: 'online_count', type: 'int', default: 0 }) + onlineCount: number; + + @Column({ name: 'window_seconds', type: 'int', default: 180 }) + windowSeconds: number; +} diff --git a/backend/services/user-service/src/domain/entities/telemetry-event.entity.ts b/backend/services/user-service/src/domain/entities/telemetry-event.entity.ts new file mode 100644 index 0000000..e8e2c43 --- /dev/null +++ b/backend/services/user-service/src/domain/entities/telemetry-event.entity.ts @@ -0,0 +1,26 @@ +import { Entity, Column, PrimaryGeneratedColumn, CreateDateColumn, Index } from 'typeorm'; + +@Entity('telemetry_events') +@Index('idx_telemetry_events_name_time', ['eventName', 'eventTime']) +export class TelemetryEvent { + @PrimaryGeneratedColumn({ type: 'bigint' }) + id: string; + + @Column({ name: 'user_id', type: 'uuid', nullable: true }) + userId: string | null; + + @Column({ name: 'install_id', length: 128 }) + installId: string; + + @Column({ name: 'event_name', length: 64 }) + eventName: string; + + @Column({ name: 'event_time', type: 'timestamptz' }) + eventTime: Date; + + @Column({ type: 'jsonb', default: '{}' }) + properties: Record; + + @CreateDateColumn({ name: 'created_at', type: 'timestamptz' }) + createdAt: Date; +} diff --git a/backend/services/user-service/src/infrastructure/redis/presence-redis.service.ts b/backend/services/user-service/src/infrastructure/redis/presence-redis.service.ts new file mode 100644 index 0000000..3ce78c7 --- /dev/null +++ b/backend/services/user-service/src/infrastructure/redis/presence-redis.service.ts @@ -0,0 +1,59 @@ +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import Redis from 'ioredis'; + +const ONLINE_KEY = 'genex:presence:online'; +const DAU_KEY_PREFIX = 'genex:dau:'; +const ONLINE_WINDOW = 180; // 3 minutes + +@Injectable() +export class PresenceRedisService implements OnModuleDestroy { + private readonly redis: Redis; + + constructor() { + this.redis = new Redis({ + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379', 10), + password: process.env.REDIS_PASSWORD || undefined, + db: parseInt(process.env.REDIS_DB || '0', 10), + }); + } + + async onModuleDestroy() { + await this.redis.quit(); + } + + /** Update user heartbeat timestamp */ + async updatePresence(userId: string): Promise { + const now = Math.floor(Date.now() / 1000); + await this.redis.zadd(ONLINE_KEY, now, userId); + } + + /** Count users online within window */ + async countOnline(): Promise { + const threshold = Math.floor(Date.now() / 1000) - ONLINE_WINDOW; + return this.redis.zcount(ONLINE_KEY, threshold, '+inf'); + } + + /** Add user/installId to HyperLogLog DAU */ + async addDauIdentifier(date: string, identifier: string): Promise { + const key = `${DAU_KEY_PREFIX}${date}`; + await this.redis.pfadd(key, identifier); + // Auto-expire after 48 hours + await this.redis.expire(key, 48 * 3600); + } + + /** Get approximate DAU from HyperLogLog */ + async getApproxDau(date: string): Promise { + return this.redis.pfcount(`${DAU_KEY_PREFIX}${date}`); + } + + /** Clean up expired presence data (>24h old) */ + async cleanupExpired(): Promise { + const cutoff = Math.floor(Date.now() / 1000) - 24 * 3600; + return this.redis.zremrangebyscore(ONLINE_KEY, '-inf', cutoff); + } + + getWindowSeconds(): number { + return ONLINE_WINDOW; + } +} diff --git a/backend/services/user-service/src/interface/http/controllers/admin-telemetry.controller.ts b/backend/services/user-service/src/interface/http/controllers/admin-telemetry.controller.ts new file mode 100644 index 0000000..8a9308e --- /dev/null +++ b/backend/services/user-service/src/interface/http/controllers/admin-telemetry.controller.ts @@ -0,0 +1,77 @@ +import { Controller, Get, Query, UseGuards } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; +import { JwtAuthGuard, Roles, RolesGuard, UserRole } from '@genex/common'; +import { TelemetryService } from '../../../application/services/telemetry.service'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { TelemetryEvent } from '../../../domain/entities/telemetry-event.entity'; +import { PresenceRedisService } from '../../../infrastructure/redis/presence-redis.service'; + +@ApiTags('admin-telemetry') +@Controller('admin/telemetry') +@UseGuards(JwtAuthGuard, RolesGuard) +@Roles(UserRole.ADMIN) +@ApiBearerAuth() +export class AdminTelemetryController { + constructor( + private readonly telemetryService: TelemetryService, + @InjectRepository(TelemetryEvent) private readonly eventRepo: Repository, + private readonly presenceRedis: PresenceRedisService, + ) {} + + @Get('dau') + @ApiOperation({ summary: 'Query DAU statistics' }) + async getDauStats(@Query('startDate') startDate: string, @Query('endDate') endDate: string) { + const result = await this.telemetryService.getDauStats(startDate, endDate); + return { code: 0, data: result }; + } + + @Get('events') + @ApiOperation({ summary: 'Query telemetry events' }) + async listEvents( + @Query('page') page = 1, + @Query('limit') limit = 20, + @Query('eventName') eventName?: string, + @Query('userId') userId?: string, + ) { + const qb = this.eventRepo.createQueryBuilder('e'); + if (eventName) qb.andWhere('e.event_name = :eventName', { eventName }); + if (userId) qb.andWhere('e.user_id = :userId', { userId }); + + qb.orderBy('e.event_time', 'DESC') + .skip((+page - 1) * +limit) + .take(+limit); + + const [items, total] = await qb.getManyAndCount(); + return { code: 0, data: { items, total, page: +page, limit: +limit } }; + } + + @Get('realtime') + @ApiOperation({ summary: 'Get realtime analytics dashboard data' }) + async getRealtimeData() { + const today = new Date().toISOString().slice(0, 10); + const [onlineCount, approxDau] = await Promise.all([ + this.presenceRedis.countOnline(), + this.presenceRedis.getApproxDau(today), + ]); + + // Events today + const todayStart = new Date(`${today}T00:00:00Z`); + const eventsToday = await this.eventRepo + .createQueryBuilder('e') + .select('COUNT(*)', 'count') + .where('e.event_time >= :todayStart', { todayStart }) + .getRawOne(); + + return { + code: 0, + data: { + onlineUsers: onlineCount, + dauToday: approxDau, + eventsToday: parseInt(eventsToday.count, 10), + windowSeconds: 180, + queriedAt: new Date().toISOString(), + }, + }; + } +} diff --git a/backend/services/user-service/src/interface/http/controllers/admin-version.controller.ts b/backend/services/user-service/src/interface/http/controllers/admin-version.controller.ts new file mode 100644 index 0000000..501f6f2 --- /dev/null +++ b/backend/services/user-service/src/interface/http/controllers/admin-version.controller.ts @@ -0,0 +1,153 @@ +import { + Controller, Get, Post, Put, Patch, Delete, + Param, Query, Body, UseGuards, UseInterceptors, UploadedFile, Req, +} from '@nestjs/common'; +import { FileInterceptor } from '@nestjs/platform-express'; +import { ApiTags, ApiOperation, ApiBearerAuth, ApiConsumes } from '@nestjs/swagger'; +import { JwtAuthGuard, Roles, RolesGuard, UserRole } from '@genex/common'; +import { AppVersionService } from '../../../application/services/app-version.service'; +import { FileStorageService } from '../../../application/services/file-storage.service'; +import { Platform } from '../../../domain/entities/app-version.entity'; + +@ApiTags('Admin - App Versions') +@Controller('admin/versions') +@UseGuards(JwtAuthGuard, RolesGuard) +@Roles(UserRole.ADMIN) +@ApiBearerAuth() +export class AdminVersionController { + constructor( + private readonly versionService: AppVersionService, + private readonly fileStorage: FileStorageService, + ) {} + + @Get() + @ApiOperation({ summary: 'List app versions' }) + async listVersions( + @Query('platform') platform?: string, + @Query('includeDisabled') includeDisabled?: string, + ) { + const platformEnum = platform + ? (platform.toUpperCase() as Platform) + : undefined; + const versions = await this.versionService.listVersions( + platformEnum, + includeDisabled === 'true', + ); + return { code: 0, data: versions }; + } + + @Get(':id') + @ApiOperation({ summary: 'Get version details' }) + async getVersion(@Param('id') id: string) { + const version = await this.versionService.getVersion(id); + return { code: 0, data: version }; + } + + @Post() + @ApiOperation({ summary: 'Create version manually' }) + async createVersion( + @Body() body: { + platform: string; + versionCode: number; + versionName: string; + buildNumber: string; + downloadUrl: string; + fileSize: string; + fileSha256: string; + changelog: string; + isForceUpdate: boolean; + minOsVersion?: string; + releaseDate?: string; + }, + @Req() req: any, + ) { + const version = await this.versionService.createVersion({ + ...body, + platform: body.platform.toUpperCase() as Platform, + releaseDate: body.releaseDate ? new Date(body.releaseDate) : undefined, + createdBy: req.user?.sub, + }); + return { code: 0, data: version }; + } + + @Post('upload') + @UseInterceptors(FileInterceptor('file')) + @ApiConsumes('multipart/form-data') + @ApiOperation({ summary: 'Upload APK/IPA and create version' }) + async uploadVersion( + @UploadedFile() file: Express.Multer.File, + @Body() body: { + platform: string; + versionCode?: string; + versionName?: string; + buildNumber?: string; + changelog?: string; + isForceUpdate?: string; + minOsVersion?: string; + releaseDate?: string; + }, + @Req() req: any, + ) { + const platform = body.platform.toUpperCase() as Platform; + const versionCode = body.versionCode + ? parseInt(body.versionCode, 10) + : Date.now(); + const versionName = body.versionName || '1.0.0'; + + // Upload to MinIO + const uploadResult = await this.fileStorage.uploadFile( + file.buffer, + file.originalname, + platform, + versionName, + ); + + const version = await this.versionService.createVersion({ + platform, + versionCode, + versionName, + buildNumber: body.buildNumber || versionCode.toString(), + downloadUrl: uploadResult.downloadUrl, + fileSize: uploadResult.fileSize, + fileSha256: uploadResult.sha256, + changelog: body.changelog || '', + isForceUpdate: body.isForceUpdate === 'true', + minOsVersion: body.minOsVersion, + releaseDate: body.releaseDate ? new Date(body.releaseDate) : undefined, + createdBy: req.user?.sub, + }); + + return { code: 0, data: version }; + } + + @Put(':id') + @ApiOperation({ summary: 'Update version' }) + async updateVersion( + @Param('id') id: string, + @Body() body: any, + @Req() req: any, + ) { + const version = await this.versionService.updateVersion(id, { + ...body, + updatedBy: req.user?.sub, + }); + return { code: 0, data: version }; + } + + @Patch(':id/toggle') + @ApiOperation({ summary: 'Enable/disable version' }) + async toggleVersion( + @Param('id') id: string, + @Body() body: { isEnabled: boolean }, + ) { + const result = await this.versionService.toggleVersion(id, body.isEnabled); + return { code: 0, data: result }; + } + + @Delete(':id') + @ApiOperation({ summary: 'Delete version' }) + async deleteVersion(@Param('id') id: string) { + const result = await this.versionService.deleteVersion(id); + return { code: 0, data: result }; + } +} diff --git a/backend/services/user-service/src/interface/http/controllers/app-version.controller.ts b/backend/services/user-service/src/interface/http/controllers/app-version.controller.ts new file mode 100644 index 0000000..651e1ed --- /dev/null +++ b/backend/services/user-service/src/interface/http/controllers/app-version.controller.ts @@ -0,0 +1,26 @@ +import { Controller, Get, Query } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiQuery } from '@nestjs/swagger'; +import { AppVersionService } from '../../../application/services/app-version.service'; +import { Platform } from '../../../domain/entities/app-version.entity'; + +@ApiTags('App Version') +@Controller('app/version') +export class AppVersionController { + constructor(private readonly versionService: AppVersionService) {} + + @Get('check') + @ApiOperation({ summary: 'Check for app update (mobile client)' }) + @ApiQuery({ name: 'platform', enum: ['android', 'ios', 'ANDROID', 'IOS'] }) + @ApiQuery({ name: 'current_version_code', type: Number }) + async checkUpdate( + @Query('platform') platform: string, + @Query('current_version_code') currentVersionCode: string, + ) { + const platformEnum = platform.toUpperCase() as Platform; + const result = await this.versionService.checkUpdate( + platformEnum, + parseInt(currentVersionCode, 10), + ); + return { code: 0, data: result }; + } +} diff --git a/backend/services/user-service/src/interface/http/controllers/telemetry.controller.ts b/backend/services/user-service/src/interface/http/controllers/telemetry.controller.ts new file mode 100644 index 0000000..c62760f --- /dev/null +++ b/backend/services/user-service/src/interface/http/controllers/telemetry.controller.ts @@ -0,0 +1,60 @@ +import { Controller, Post, Get, Body, Query, UseGuards, Req } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; +import { JwtAuthGuard } from '@genex/common'; +import { TelemetryService } from '../../../application/services/telemetry.service'; + +@ApiTags('telemetry') +@Controller('telemetry') +export class TelemetryController { + constructor(private readonly telemetryService: TelemetryService) {} + + @Post('events') + @ApiOperation({ summary: 'Batch report telemetry events (no auth required)' }) + async batchEvents(@Body() body: { + events: Array<{ + eventName: string; + userId?: string; + installId: string; + clientTs: number; + properties?: Record; + }>; + }) { + const result = await this.telemetryService.recordEvents(body.events); + return { code: 0, data: result }; + } + + @Post('heartbeat') + @UseGuards(JwtAuthGuard) + @ApiBearerAuth() + @ApiOperation({ summary: 'Report heartbeat for online detection' }) + async heartbeat(@Req() req: any, @Body() body: { installId: string; appVersion: string }) { + await this.telemetryService.recordHeartbeat(req.user.sub, body.installId, body.appVersion); + return { code: 0, data: { success: true } }; + } + + @Get('online-count') + @UseGuards(JwtAuthGuard) + @ApiBearerAuth() + @ApiOperation({ summary: 'Get current online user count' }) + async getOnlineCount() { + const result = await this.telemetryService.getOnlineCount(); + return { code: 0, data: result }; + } + + @Get('online-history') + @UseGuards(JwtAuthGuard) + @ApiBearerAuth() + @ApiOperation({ summary: 'Get online user history trend' }) + async getOnlineHistory( + @Query('startTime') startTime: string, + @Query('endTime') endTime: string, + @Query('interval') interval?: '1m' | '5m' | '1h', + ) { + const result = await this.telemetryService.getOnlineHistory( + new Date(startTime), + new Date(endTime), + interval || '5m', + ); + return { code: 0, data: result }; + } +} diff --git a/backend/services/user-service/src/user.module.ts b/backend/services/user-service/src/user.module.ts index 75e1214..6a39bd6 100644 --- a/backend/services/user-service/src/user.module.ts +++ b/backend/services/user-service/src/user.module.ts @@ -2,12 +2,17 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { JwtModule } from '@nestjs/jwt'; import { PassportModule } from '@nestjs/passport'; +import { ScheduleModule } from '@nestjs/schedule'; import { User } from './domain/entities/user.entity'; import { KycSubmission } from './domain/entities/kyc-submission.entity'; import { Wallet } from './domain/entities/wallet.entity'; import { Transaction } from './domain/entities/transaction.entity'; import { Message } from './domain/entities/message.entity'; +import { AppVersion } from './domain/entities/app-version.entity'; +import { TelemetryEvent } from './domain/entities/telemetry-event.entity'; +import { OnlineSnapshot } from './domain/entities/online-snapshot.entity'; +import { DailyActiveStats } from './domain/entities/daily-active-stats.entity'; import { UserRepository } from './infrastructure/persistence/user.repository'; import { KycRepository } from './infrastructure/persistence/kyc.repository'; @@ -15,6 +20,8 @@ import { WalletRepository } from './infrastructure/persistence/wallet.repository import { TransactionRepository } from './infrastructure/persistence/transaction.repository'; import { MessageRepository } from './infrastructure/persistence/message.repository'; +import { PresenceRedisService } from './infrastructure/redis/presence-redis.service'; + import { UserProfileService } from './application/services/user-profile.service'; import { KycService } from './application/services/kyc.service'; import { WalletService } from './application/services/wallet.service'; @@ -23,6 +30,10 @@ import { AdminDashboardService } from './application/services/admin-dashboard.se import { AdminUserService } from './application/services/admin-user.service'; import { AdminSystemService } from './application/services/admin-system.service'; import { AdminAnalyticsService } from './application/services/admin-analytics.service'; +import { TelemetryService } from './application/services/telemetry.service'; +import { TelemetrySchedulerService } from './application/services/telemetry-scheduler.service'; +import { AppVersionService } from './application/services/app-version.service'; +import { FileStorageService } from './application/services/file-storage.service'; import { UserController } from './interface/http/controllers/user.controller'; import { KycController } from './interface/http/controllers/kyc.controller'; @@ -32,23 +43,36 @@ import { AdminDashboardController } from './interface/http/controllers/admin-das import { AdminUserController } from './interface/http/controllers/admin-user.controller'; import { AdminSystemController } from './interface/http/controllers/admin-system.controller'; import { AdminAnalyticsController } from './interface/http/controllers/admin-analytics.controller'; +import { TelemetryController } from './interface/http/controllers/telemetry.controller'; +import { AdminTelemetryController } from './interface/http/controllers/admin-telemetry.controller'; +import { AppVersionController } from './interface/http/controllers/app-version.controller'; +import { AdminVersionController } from './interface/http/controllers/admin-version.controller'; @Module({ imports: [ - TypeOrmModule.forFeature([User, KycSubmission, Wallet, Transaction, Message]), + TypeOrmModule.forFeature([ + User, KycSubmission, Wallet, Transaction, Message, AppVersion, + TelemetryEvent, OnlineSnapshot, DailyActiveStats, + ]), PassportModule.register({ defaultStrategy: 'jwt' }), JwtModule.register({ secret: process.env.JWT_ACCESS_SECRET || 'dev-access-secret', }), + ScheduleModule.forRoot(), ], controllers: [ UserController, KycController, WalletController, MessageController, AdminDashboardController, AdminUserController, AdminSystemController, AdminAnalyticsController, + TelemetryController, AdminTelemetryController, + AppVersionController, AdminVersionController, ], providers: [ UserRepository, KycRepository, WalletRepository, TransactionRepository, MessageRepository, + PresenceRedisService, UserProfileService, KycService, WalletService, MessageService, AdminDashboardService, AdminUserService, AdminSystemService, AdminAnalyticsService, + TelemetryService, TelemetrySchedulerService, + AppVersionService, FileStorageService, ], exports: [UserProfileService, WalletService, MessageService], }) diff --git a/docs/guides/07-遥测与版本管理开发指南.md b/docs/guides/07-遥测与版本管理开发指南.md new file mode 100644 index 0000000..0820d04 --- /dev/null +++ b/docs/guides/07-遥测与版本管理开发指南.md @@ -0,0 +1,382 @@ +# 07 - 遥测 (Telemetry) 与移动端版本管理开发指南 + +> 参考项目: rwadurian/backend/services/presence-service + admin-service +> 目标: 为 Genex 券金融平台增加 **用户遥测分析** 和 **移动端 OTA 版本管理** 能力 + +--- + +## 一、功能概览 + +### 1.1 遥测系统 (Telemetry) + +| 能力 | 说明 | +|------|------| +| 心跳检测 | 客户端定时上报心跳,服务端通过 Redis Sorted Set 实时计算在线用户数 | +| 事件采集 | 批量上报客户端事件(会话开始、页面浏览、操作行为等),写入 PostgreSQL | +| DAU 统计 | 基于 `app_session_start` 事件按 userId/installId 去重,支持省市维度 | +| 在线快照 | 每分钟记录在线用户数快照,支持 1m/5m/1h 区间聚合查询 | +| Prometheus | 暴露 `/metrics` 端点供 Grafana 抓取(心跳数、事件数、在线人数、DAU) | + +### 1.2 版本管理 (App Version / OTA Update) + +| 能力 | 说明 | +|------|------| +| 版本 CRUD | 管理员创建/编辑/删除/启禁用版本记录 | +| APK/IPA 上传 | 上传安装包,自动解析 versionCode/versionName/minSdkVersion | +| 强制更新 | `isForceUpdate` 标志,客户端据此决定是否阻断使用 | +| 检查更新 API | 移动端调用,返回是否有更新、下载地址、SHA256 校验 | +| 断点续传下载 | HTTP Range 206 支持,大文件友好 | +| 文件完整性 | SHA256 哈希校验,防篡改 | + +--- + +## 二、Genex 项目适配方案 + +### 2.1 架构决策 + +rwadurian 项目将遥测放在独立的 `presence-service`,版本管理放在 `admin-service`。 +Genex 项目的适配方案: + +| 功能 | 归属服务 | 理由 | +|------|---------|------| +| **遥测 (Telemetry)** | **user-service (扩展)** | Genex 用户量 MVP 阶段较小,无需独立服务;遥测与用户强关联 | +| **版本管理 (App Version)** | **user-service (扩展)** | 版本管理 API 量少,admin 端已在 user-service 中 | + +> 后续用户量增长可拆分为独立 presence-service + +### 2.2 与参考项目的差异 + +| 维度 | rwadurian | Genex 适配 | +|------|-----------|-----------| +| ORM | Prisma | TypeORM (与现有一致) | +| 架构 | 独立服务 + CQRS | 扩展 user-service,标准 Service/Controller | +| 文件存储 | 本地 `./uploads` | **MinIO** (已有基础设施) | +| 事件总线 | Kafka | **Kafka** (已有 @genex/kafka-client) | +| 缓存 | Redis | **Redis** (已有) | +| APK 解析 | adbkit-apkreader | 同方案 | +| IPA 解析 | unzipper + bplist-parser | 同方案 | +| Prometheus | prom-client | 同方案 | +| 在线检测窗口 | 180s (3 min) | 180s | +| DAU 计算 | 每天凌晨 1:00 + 每小时滚动 | 同方案 | +| 下载端点 | Express 流式 | NestJS StreamableFile + Range | + +### 2.3 数据库表 (新增 4 张) + +#### `telemetry_events` — 事件日志 (append-only) + +```sql +CREATE TABLE IF NOT EXISTS telemetry_events ( + id BIGSERIAL PRIMARY KEY, + user_id UUID REFERENCES users(id), + install_id VARCHAR(128) NOT NULL, + event_name VARCHAR(64) NOT NULL, + event_time TIMESTAMPTZ NOT NULL, + properties JSONB DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_telemetry_events_time ON telemetry_events(event_time); +CREATE INDEX idx_telemetry_events_name_time ON telemetry_events(event_name, event_time); +CREATE INDEX idx_telemetry_events_user ON telemetry_events(user_id); +``` + +#### `daily_active_stats` — DAU 日统计 + +```sql +CREATE TABLE IF NOT EXISTS daily_active_stats ( + day DATE PRIMARY KEY, + dau_count INT NOT NULL DEFAULT 0, + dau_by_platform JSONB DEFAULT '{}', + dau_by_region JSONB DEFAULT '{}', + calculated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + version INT NOT NULL DEFAULT 1 +); +``` + +#### `online_snapshots` — 在线快照 + +```sql +CREATE TABLE IF NOT EXISTS online_snapshots ( + id BIGSERIAL PRIMARY KEY, + ts TIMESTAMPTZ NOT NULL UNIQUE, + online_count INT NOT NULL DEFAULT 0, + window_seconds INT NOT NULL DEFAULT 180 +); +CREATE INDEX idx_online_snapshots_ts ON online_snapshots(ts DESC); +``` + +#### `app_versions` — 应用版本 + +```sql +CREATE TABLE IF NOT EXISTS app_versions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + platform VARCHAR(10) NOT NULL CHECK (platform IN ('ANDROID', 'IOS')), + version_code INT NOT NULL, + version_name VARCHAR(32) NOT NULL, + build_number VARCHAR(64) NOT NULL, + download_url TEXT NOT NULL, + file_size BIGINT NOT NULL DEFAULT 0, + file_sha256 VARCHAR(64) NOT NULL, + min_os_version VARCHAR(16), + changelog TEXT NOT NULL DEFAULT '', + is_force_update BOOLEAN NOT NULL DEFAULT FALSE, + is_enabled BOOLEAN NOT NULL DEFAULT TRUE, + release_date TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by UUID, + updated_by UUID +); +CREATE INDEX idx_app_versions_platform ON app_versions(platform, is_enabled); +CREATE INDEX idx_app_versions_code ON app_versions(platform, version_code DESC); +``` + +### 2.4 Redis 数据结构 + +| Key | 类型 | 用途 | +|-----|------|------| +| `genex:presence:online` | Sorted Set | member=userId, score=heartbeat Unix时间戳 | +| `genex:dau:{YYYY-MM-DD}` | HyperLogLog | 实时近似 DAU(userId/installId) | +| `genex:dau:{YYYY-MM-DD}:ttl` | - | 自动过期 48h | + +### 2.5 Kafka Topics (新增) + +| Topic | 生产者 | 消费者 | 用途 | +|-------|--------|--------|------| +| `telemetry.session.started` | user-service | notification-service | 会话开始事件 | +| `telemetry.heartbeat` | user-service | - | 心跳事件(可选) | + +### 2.6 MinIO Bucket + +| Bucket | 用途 | +|--------|------| +| `app-releases` | 存放 APK/IPA 安装包 | + +--- + +## 三、API 设计 + +### 3.1 遥测 API + +``` +POST /api/v1/telemetry/events — 批量上报事件 (无需认证,支持匿名) +POST /api/v1/telemetry/heartbeat — 心跳上报 (需认证) +GET /api/v1/telemetry/online-count — 当前在线人数 (需认证) +GET /api/v1/telemetry/online-history — 在线历史趋势 (需认证) + +# Admin (遥测分析 — 独立路径避免与 issuer 的 analytics 冲突) +GET /api/v1/admin/telemetry/dau — DAU 统计查询 +GET /api/v1/admin/telemetry/events — 事件列表查询 +GET /api/v1/admin/telemetry/realtime — 实时数据面板 +``` + +### 3.2 版本管理 API + +``` +# 移动端 (无需/轻认证) +GET /api/v1/app/version/check — 检查更新 +GET /api/v1/app/version/download/:id — 下载安装包 (支持断点续传) + +# Admin +GET /api/v1/admin/versions — 版本列表 +GET /api/v1/admin/versions/:id — 版本详情 +POST /api/v1/admin/versions — 创建版本 (手动填写) +POST /api/v1/admin/versions/upload — 上传 APK/IPA 自动创建 +POST /api/v1/admin/versions/parse — 解析安装包 (不保存) +PUT /api/v1/admin/versions/:id — 更新版本信息 +PATCH /api/v1/admin/versions/:id/toggle — 启用/禁用 +DELETE /api/v1/admin/versions/:id — 删除版本 +``` + +### 3.3 检查更新响应示例 + +```json +{ + "code": 0, + "data": { + "needUpdate": true, + "forceUpdate": true, + "version": "2.1.0", + "versionCode": 210, + "downloadUrl": "https://minio.gogenex.com/app-releases/ANDROID-2.1.0.apk", + "fileSize": 52428800, + "fileSizeFriendly": "50.0 MB", + "sha256": "a1b2c3d4e5f6...", + "updateLog": "1. 新增券转让功能\n2. 修复钱包余额显示\n3. 性能优化", + "releaseDate": "2026-02-12T10:00:00Z" + } +} +``` + +### 3.4 强制更新决策流程 + +``` +客户端 → GET /app/version/check?platform=android¤t_version_code=200 + ↓ +服务端: 查找该平台最新 enabled 版本 + ↓ + ┌── 无更新版本 → { needUpdate: false } + │ + └── 有更新版本 (latestCode > currentCode) + ↓ + isForceUpdate && isEnabled? + ├── YES → { needUpdate: true, forceUpdate: true, ... } + └── NO → { needUpdate: true, forceUpdate: false, ... } + ↓ +客户端: + forceUpdate=true → 弹窗阻断,只能更新 + forceUpdate=false → 提示可选更新 +``` + +--- + +## 四、实现文件清单 + +### 4.1 遥测模块 (在 user-service 中扩展) + +``` +services/user-service/src/ +├── domain/entities/ +│ ├── telemetry-event.entity.ts # 事件日志实体 +│ ├── online-snapshot.entity.ts # 在线快照实体 +│ └── daily-active-stats.entity.ts # DAU 统计实体 +├── application/services/ +│ ├── telemetry.service.ts # 事件采集 + 心跳 + DAU +│ └── telemetry-scheduler.service.ts # 定时任务 (快照/DAU/清理) +├── infrastructure/ +│ └── redis/ +│ └── presence-redis.service.ts # Redis 在线检测操作 +└── interface/http/ + ├── controllers/ + │ ├── telemetry.controller.ts # 遥测 API + │ └── admin-telemetry.controller.ts # Admin 遥测分析 API + └── dto/ + ├── batch-events.dto.ts + ├── heartbeat.dto.ts + └── query-dau.dto.ts +``` + +### 4.2 版本管理模块 (在 user-service 中扩展) + +``` +services/user-service/src/ +├── domain/entities/ +│ └── app-version.entity.ts # 版本实体 +├── application/services/ +│ ├── app-version.service.ts # 版本 CRUD + 检查更新 +│ └── file-storage.service.ts # MinIO 文件上传/下载 +├── infrastructure/ +│ └── parsers/ +│ └── package-parser.service.ts # APK/IPA 解析 +└── interface/http/ + ├── controllers/ + │ ├── app-version.controller.ts # 移动端检查更新 + 下载 + │ └── admin-version.controller.ts # Admin 版本管理 + └── dto/ + ├── check-update.dto.ts + ├── create-version.dto.ts + └── upload-version.dto.ts +``` + +### 4.3 数据库迁移 + +``` +migrations/ +├── 032_create_telemetry_events.sql +├── 033_create_daily_active_stats.sql +├── 034_create_online_snapshots.sql +└── 035_create_app_versions.sql +``` + +### 4.4 Kong 路由 (新增) + +```yaml +# user-service 新增路由 +- name: telemetry-routes + paths: + - /api/v1/telemetry + strip_path: false +- name: app-version-routes + paths: + - /api/v1/app/version + strip_path: false +- name: admin-version-routes + paths: + - /api/v1/admin/versions + strip_path: false +``` + +--- + +## 五、关键实现要点 + +### 5.1 心跳在线检测 + +- 客户端每 60s 发送心跳 +- 服务端用 Redis `ZADD genex:presence:online userId timestamp` +- 在线判定窗口: 180s (3 分钟内有心跳即在线) +- 在线人数: `ZCOUNT genex:presence:online (now-180) +inf` +- 每小时清理 24 小时前的过期数据: `ZREMRANGEBYSCORE` + +### 5.2 DAU 计算策略 + +- **实时近似**: Redis HyperLogLog `PFADD genex:dau:2026-02-12 userId` +- **精确计算**: 每天凌晨 1:00 从 `telemetry_events` 查询 `app_session_start` +- **去重优先级**: userId > installId +- **地理维度**: 从 event properties 中提取 province/city + +### 5.3 文件存储 (MinIO) + +- Bucket: `app-releases` +- 对象命名: `{platform}/{versionName}/{timestamp}-{random}.{ext}` +- 预签名 URL: 下载时生成 24h 有效的签名链接 +- SHA256: 上传时计算,存入 `app_versions.file_sha256` + +### 5.4 APK/IPA 解析 + +- **Android**: `adbkit-apkreader` 读取 AndroidManifest.xml + - 提取: packageName, versionCode, versionName, minSdkVersion +- **iOS**: `unzipper` 解压 → `bplist-parser` 读取 Info.plist + - 提取: CFBundleIdentifier, CFBundleVersion, CFBundleShortVersionString, MinimumOSVersion + +### 5.5 断点续传 + +- 响应头: `Accept-Ranges: bytes` +- 请求头: `Range: bytes=1048576-` +- 状态码: 206 Partial Content +- 流式读取: `fs.createReadStream(path, { start, end })` + +--- + +## 六、与现有系统集成 + +### 6.1 Admin-web 管理面板 + +admin-web 已有 Dashboard 页面,新增: +- **遥测面板**: DAU 趋势图、在线用户数、事件分析 +- **版本管理页**: 版本列表、上传、强制更新开关 + +### 6.2 genex-mobile / miniapp + +Flutter/Taro 客户端需要: +- 启动时发送 `app_session_start` 事件 +- 定时心跳 (60s) +- 启动时调用检查更新 API +- 根据 `forceUpdate` 决定是否阻断 + +### 6.3 notification-service + +消费 `telemetry.session.started` 事件: +- 新用户首次登录 → 发送欢迎通知 +- 用户回归 (>7 天未登录) → 发送召回通知 + +--- + +## 七、Prometheus 指标 (可选) + +| 指标名 | 类型 | 标签 | 说明 | +|--------|------|------|------| +| `genex_online_users` | Gauge | - | 当前在线用户数 | +| `genex_dau` | Gauge | date | 每日活跃用户数 | +| `genex_heartbeat_total` | Counter | app_version | 心跳总数 | +| `genex_events_total` | Counter | event_name | 事件总数 | +| `genex_event_batch_duration` | Histogram | - | 批量事件处理耗时 |