diff --git a/backend/services/presence-service/prisma/migrations/20250306010000_add_device_profile/migration.sql b/backend/services/presence-service/prisma/migrations/20250306010000_add_device_profile/migration.sql new file mode 100644 index 00000000..0e16b671 --- /dev/null +++ b/backend/services/presence-service/prisma/migrations/20250306010000_add_device_profile/migration.sql @@ -0,0 +1,22 @@ +-- 设备档案表: 每台设备一行, upsert 更新 +-- 大厂标准: 事件流水(event_log) + 设备快照(device_profile) 分离 +-- 设备分布查询直接走此表,O(devices) 而非 O(events) +CREATE TABLE "analytics_device_profile" ( + "install_id" VARCHAR(64) NOT NULL, + "user_id" VARCHAR(20), + "device_brand" VARCHAR(64), + "device_model" VARCHAR(64), + "device_os" VARCHAR(32), + "app_version" VARCHAR(32), + "locale" VARCHAR(16), + "first_seen_at" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + "last_seen_at" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + "event_count" INTEGER NOT NULL DEFAULT 1, + + CONSTRAINT "analytics_device_profile_pkey" PRIMARY KEY ("install_id") +); + +CREATE INDEX "idx_device_profile_brand" ON "analytics_device_profile"("device_brand"); +CREATE INDEX "idx_device_profile_app_version" ON "analytics_device_profile"("app_version"); +CREATE INDEX "idx_device_profile_user_id" ON "analytics_device_profile"("user_id"); +CREATE INDEX "idx_device_profile_last_seen" ON "analytics_device_profile"("last_seen_at" DESC); diff --git a/backend/services/presence-service/prisma/schema.prisma b/backend/services/presence-service/prisma/schema.prisma index 1d0a850b..aa731e3e 100644 --- a/backend/services/presence-service/prisma/schema.prisma +++ b/backend/services/presence-service/prisma/schema.prisma @@ -35,6 +35,27 @@ model EventLog { @@map("analytics_event_log") } +// 设备档案表 (每台设备一行, upsert 更新) +// 大厂标准: 事件流水 + 设备快照分离, 设备分布查询直接走此表 +model DeviceProfile { + installId String @id @map("install_id") @db.VarChar(64) + userId String? @map("user_id") @db.VarChar(20) + deviceBrand String? @map("device_brand") @db.VarChar(64) + deviceModel String? @map("device_model") @db.VarChar(64) + deviceOs String? @map("device_os") @db.VarChar(32) + appVersion String? @map("app_version") @db.VarChar(32) + locale String? @map("locale") @db.VarChar(16) + firstSeenAt DateTime @default(now()) @map("first_seen_at") @db.Timestamptz() + lastSeenAt DateTime @updatedAt @map("last_seen_at") @db.Timestamptz() + eventCount Int @default(1) @map("event_count") + + @@index([deviceBrand], name: "idx_device_profile_brand") + @@index([appVersion], name: "idx_device_profile_app_version") + @@index([userId], name: "idx_device_profile_user_id") + @@index([lastSeenAt(sort: Desc)], name: "idx_device_profile_last_seen") + @@map("analytics_device_profile") +} + // 日活统计表 model DailyActiveStats { day DateTime @id @map("day") @db.Date diff --git a/backend/services/presence-service/src/application/commands/record-events/record-events.handler.ts b/backend/services/presence-service/src/application/commands/record-events/record-events.handler.ts index 40535001..98e00822 100644 --- a/backend/services/presence-service/src/application/commands/record-events/record-events.handler.ts +++ b/backend/services/presence-service/src/application/commands/record-events/record-events.handler.ts @@ -2,6 +2,7 @@ import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; import { Inject, Injectable } from '@nestjs/common'; import { RecordEventsCommand, EventItemDto } from './record-events.command'; import { EventLog } from '../../../domain/entities/event-log.entity'; +import { DeviceProfile } from '../../../domain/entities/device-profile.entity'; import { InstallId } from '../../../domain/value-objects/install-id.vo'; import { EventName } from '../../../domain/value-objects/event-name.vo'; import { EventProperties } from '../../../domain/value-objects/event-properties.vo'; @@ -9,6 +10,10 @@ import { IEventLogRepository, EVENT_LOG_REPOSITORY, } from '../../../domain/repositories/event-log.repository.interface'; +import { + IDeviceProfileRepository, + DEVICE_PROFILE_REPOSITORY, +} from '../../../domain/repositories/device-profile.repository.interface'; import { RedisService } from '../../../infrastructure/redis/redis.service'; import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service'; import { MetricsService } from '../../../infrastructure/metrics/metrics.service'; @@ -27,6 +32,8 @@ export class RecordEventsHandler implements ICommandHandler constructor( @Inject(EVENT_LOG_REPOSITORY) private readonly eventLogRepository: IEventLogRepository, + @Inject(DEVICE_PROFILE_REPOSITORY) + private readonly deviceProfileRepository: IDeviceProfileRepository, private readonly redisService: RedisService, private readonly eventPublisher: EventPublisherService, private readonly metricsService: MetricsService, @@ -52,10 +59,28 @@ export class RecordEventsHandler implements ICommandHandler return { accepted: 0, failed: events.length, errors }; } - // 2. 批量写入数据库 + // 2. 批量写入事件流水 await this.eventLogRepository.batchInsert(validLogs); - // 3. 更新实时DAU (HyperLogLog) + // 3. Upsert 设备档案(大厂标准:事件流水 + 设备快照分离) + // 只有携带设备信息的事件才更新档案,每台设备一行,O(devices) 而非 O(events) + const deviceProfiles = validLogs + .filter((log) => log.installId.value) + .map((log) => + DeviceProfile.create({ + installId: log.installId.value, + userId: log.userId, + deviceBrand: log.deviceBrand, + deviceModel: log.deviceModel, + deviceOs: log.deviceOs, + appVersion: log.appVersion, + locale: log.locale, + eventCount: 1, + }), + ); + await this.deviceProfileRepository.upsertBatch(deviceProfiles); + + // 4. 更新实时DAU (HyperLogLog) const todayKey = formatToDateKey(new Date()); for (const log of validLogs) { if (log.eventName.isDauEvent()) { @@ -64,7 +89,7 @@ export class RecordEventsHandler implements ICommandHandler log.dauIdentifier, ); - // 4. 发布领域事件 + // 5. 发布领域事件 await this.eventPublisher.publish( SessionStartedEvent.EVENT_NAME, new SessionStartedEvent( @@ -81,7 +106,7 @@ export class RecordEventsHandler implements ICommandHandler } } - // 5. 记录 Prometheus 指标 + // 6. 记录 Prometheus 指标 for (const log of validLogs) { this.metricsService.recordEvent(log.eventName.value); } diff --git a/backend/services/presence-service/src/domain/entities/device-profile.entity.ts b/backend/services/presence-service/src/domain/entities/device-profile.entity.ts new file mode 100644 index 00000000..72304e90 --- /dev/null +++ b/backend/services/presence-service/src/domain/entities/device-profile.entity.ts @@ -0,0 +1,36 @@ +export interface DeviceProfileProps { + installId: string; + userId: string | null; + deviceBrand: string | null; + deviceModel: string | null; + deviceOs: string | null; + appVersion: string | null; + locale: string | null; + eventCount: number; +} + +export class DeviceProfile { + readonly installId: string; + readonly userId: string | null; + readonly deviceBrand: string | null; + readonly deviceModel: string | null; + readonly deviceOs: string | null; + readonly appVersion: string | null; + readonly locale: string | null; + readonly eventCount: number; + + private constructor(props: DeviceProfileProps) { + this.installId = props.installId; + this.userId = props.userId; + this.deviceBrand = props.deviceBrand; + this.deviceModel = props.deviceModel; + this.deviceOs = props.deviceOs; + this.appVersion = props.appVersion; + this.locale = props.locale; + this.eventCount = props.eventCount; + } + + static create(props: DeviceProfileProps): DeviceProfile { + return new DeviceProfile(props); + } +} diff --git a/backend/services/presence-service/src/domain/repositories/device-profile.repository.interface.ts b/backend/services/presence-service/src/domain/repositories/device-profile.repository.interface.ts new file mode 100644 index 00000000..8d4519d2 --- /dev/null +++ b/backend/services/presence-service/src/domain/repositories/device-profile.repository.interface.ts @@ -0,0 +1,12 @@ +import { DeviceProfile } from '../entities/device-profile.entity'; + +export const DEVICE_PROFILE_REPOSITORY = 'DEVICE_PROFILE_REPOSITORY'; + +export interface IDeviceProfileRepository { + /** + * 批量 upsert 设备档案 + * INSERT ... ON CONFLICT (install_id) DO UPDATE + * 每次事件上报时更新 last_seen_at / app_version / event_count + */ + upsertBatch(profiles: DeviceProfile[]): Promise; +} diff --git a/backend/services/presence-service/src/infrastructure/infrastructure.module.ts b/backend/services/presence-service/src/infrastructure/infrastructure.module.ts index 38dc3cba..67da0589 100644 --- a/backend/services/presence-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/presence-service/src/infrastructure/infrastructure.module.ts @@ -6,12 +6,14 @@ import { OnlineSnapshotMapper } from './persistence/mappers/online-snapshot.mapp import { EventLogRepositoryImpl } from './persistence/repositories/event-log.repository.impl'; import { DailyActiveStatsRepositoryImpl } from './persistence/repositories/daily-active-stats.repository.impl'; import { OnlineSnapshotRepositoryImpl } from './persistence/repositories/online-snapshot.repository.impl'; +import { DeviceProfileRepositoryImpl } from './persistence/repositories/device-profile.repository.impl'; import { RedisModule } from './redis/redis.module'; import { KafkaModule } from './kafka/kafka.module'; import { MetricsModule } from './metrics/metrics.module'; import { EVENT_LOG_REPOSITORY } from '../domain/repositories/event-log.repository.interface'; import { DAILY_ACTIVE_STATS_REPOSITORY } from '../domain/repositories/daily-active-stats.repository.interface'; import { ONLINE_SNAPSHOT_REPOSITORY } from '../domain/repositories/online-snapshot.repository.interface'; +import { DEVICE_PROFILE_REPOSITORY } from '../domain/repositories/device-profile.repository.interface'; @Module({ imports: [RedisModule, KafkaModule, MetricsModule], @@ -32,12 +34,17 @@ import { ONLINE_SNAPSHOT_REPOSITORY } from '../domain/repositories/online-snapsh provide: ONLINE_SNAPSHOT_REPOSITORY, useClass: OnlineSnapshotRepositoryImpl, }, + { + provide: DEVICE_PROFILE_REPOSITORY, + useClass: DeviceProfileRepositoryImpl, + }, ], exports: [ PrismaService, EVENT_LOG_REPOSITORY, DAILY_ACTIVE_STATS_REPOSITORY, ONLINE_SNAPSHOT_REPOSITORY, + DEVICE_PROFILE_REPOSITORY, RedisModule, KafkaModule, MetricsModule, diff --git a/backend/services/presence-service/src/infrastructure/persistence/repositories/device-profile.repository.impl.ts b/backend/services/presence-service/src/infrastructure/persistence/repositories/device-profile.repository.impl.ts new file mode 100644 index 00000000..54ef64c7 --- /dev/null +++ b/backend/services/presence-service/src/infrastructure/persistence/repositories/device-profile.repository.impl.ts @@ -0,0 +1,46 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { IDeviceProfileRepository } from '../../../domain/repositories/device-profile.repository.interface'; +import { DeviceProfile } from '../../../domain/entities/device-profile.entity'; + +@Injectable() +export class DeviceProfileRepositoryImpl implements IDeviceProfileRepository { + constructor(private readonly prisma: PrismaService) {} + + async upsertBatch(profiles: DeviceProfile[]): Promise { + if (profiles.length === 0) return; + + // 同一批次中同一 install_id 可能重复,取最后一条 + const map = new Map(); + for (const p of profiles) { + map.set(p.installId, p); + } + const unique = Array.from(map.values()); + + // 使用原生 SQL 实现高效 upsert: + // ON CONFLICT (install_id) DO UPDATE + // - 覆盖设备字段(始终更新为最新值) + // - last_seen_at 更新为当前时间 + // - event_count 累加 + const now = new Date(); + for (const p of unique) { + await this.prisma.$executeRaw` + INSERT INTO analytics_device_profile + (install_id, user_id, device_brand, device_model, device_os, app_version, locale, + first_seen_at, last_seen_at, event_count) + VALUES + (${p.installId}, ${p.userId}, ${p.deviceBrand}, ${p.deviceModel}, + ${p.deviceOs}, ${p.appVersion}, ${p.locale}, ${now}, ${now}, ${p.eventCount}) + ON CONFLICT (install_id) DO UPDATE SET + user_id = COALESCE(EXCLUDED.user_id, analytics_device_profile.user_id), + device_brand = COALESCE(EXCLUDED.device_brand, analytics_device_profile.device_brand), + device_model = COALESCE(EXCLUDED.device_model, analytics_device_profile.device_model), + device_os = COALESCE(EXCLUDED.device_os, analytics_device_profile.device_os), + app_version = COALESCE(EXCLUDED.app_version, analytics_device_profile.app_version), + locale = COALESCE(EXCLUDED.locale, analytics_device_profile.locale), + last_seen_at = ${now}, + event_count = analytics_device_profile.event_count + ${p.eventCount} + `; + } + } +}