feat(presence): 添加设备档案表,实现事件流水+设备快照分离

大厂标准架构(Amplitude/Mixpanel):
- analytics_event_log: 事件流水(append-only,每条事件一行)
- analytics_device_profile: 设备快照(每台设备一行,upsert 更新)

设备分布查询从 O(events) 降为 O(devices):
- SELECT COUNT(*), device_brand FROM analytics_device_profile GROUP BY device_brand
  不再需要 COUNT(DISTINCT install_id) 扫描全量事件表

ON CONFLICT (install_id) DO UPDATE:
- COALESCE 保留已有字段(不被 NULL 覆盖)
- last_seen_at 每次上报更新
- event_count 累加(可用于活跃度分析)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-05 19:31:33 -08:00
parent be415c1eb6
commit 893513ad78
7 changed files with 173 additions and 4 deletions

View File

@ -0,0 +1,22 @@
-- 设备档案表: 每台设备一行, upsert 更新
-- 大厂标准: 事件流水(event_log) + 设备快照(device_profile) 分离
-- 设备分布查询直接走此表O(devices) 而非 O(events)
CREATE TABLE "analytics_device_profile" (
"install_id" VARCHAR(64) NOT NULL,
"user_id" VARCHAR(20),
"device_brand" VARCHAR(64),
"device_model" VARCHAR(64),
"device_os" VARCHAR(32),
"app_version" VARCHAR(32),
"locale" VARCHAR(16),
"first_seen_at" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
"last_seen_at" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
"event_count" INTEGER NOT NULL DEFAULT 1,
CONSTRAINT "analytics_device_profile_pkey" PRIMARY KEY ("install_id")
);
CREATE INDEX "idx_device_profile_brand" ON "analytics_device_profile"("device_brand");
CREATE INDEX "idx_device_profile_app_version" ON "analytics_device_profile"("app_version");
CREATE INDEX "idx_device_profile_user_id" ON "analytics_device_profile"("user_id");
CREATE INDEX "idx_device_profile_last_seen" ON "analytics_device_profile"("last_seen_at" DESC);

View File

@ -35,6 +35,27 @@ model EventLog {
@@map("analytics_event_log") @@map("analytics_event_log")
} }
// 设备档案表 (每台设备一行, upsert 更新)
// 大厂标准: 事件流水 + 设备快照分离, 设备分布查询直接走此表
model DeviceProfile {
installId String @id @map("install_id") @db.VarChar(64)
userId String? @map("user_id") @db.VarChar(20)
deviceBrand String? @map("device_brand") @db.VarChar(64)
deviceModel String? @map("device_model") @db.VarChar(64)
deviceOs String? @map("device_os") @db.VarChar(32)
appVersion String? @map("app_version") @db.VarChar(32)
locale String? @map("locale") @db.VarChar(16)
firstSeenAt DateTime @default(now()) @map("first_seen_at") @db.Timestamptz()
lastSeenAt DateTime @updatedAt @map("last_seen_at") @db.Timestamptz()
eventCount Int @default(1) @map("event_count")
@@index([deviceBrand], name: "idx_device_profile_brand")
@@index([appVersion], name: "idx_device_profile_app_version")
@@index([userId], name: "idx_device_profile_user_id")
@@index([lastSeenAt(sort: Desc)], name: "idx_device_profile_last_seen")
@@map("analytics_device_profile")
}
// 日活统计表 // 日活统计表
model DailyActiveStats { model DailyActiveStats {
day DateTime @id @map("day") @db.Date day DateTime @id @map("day") @db.Date

View File

@ -2,6 +2,7 @@ import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { Inject, Injectable } from '@nestjs/common'; import { Inject, Injectable } from '@nestjs/common';
import { RecordEventsCommand, EventItemDto } from './record-events.command'; import { RecordEventsCommand, EventItemDto } from './record-events.command';
import { EventLog } from '../../../domain/entities/event-log.entity'; import { EventLog } from '../../../domain/entities/event-log.entity';
import { DeviceProfile } from '../../../domain/entities/device-profile.entity';
import { InstallId } from '../../../domain/value-objects/install-id.vo'; import { InstallId } from '../../../domain/value-objects/install-id.vo';
import { EventName } from '../../../domain/value-objects/event-name.vo'; import { EventName } from '../../../domain/value-objects/event-name.vo';
import { EventProperties } from '../../../domain/value-objects/event-properties.vo'; import { EventProperties } from '../../../domain/value-objects/event-properties.vo';
@ -9,6 +10,10 @@ import {
IEventLogRepository, IEventLogRepository,
EVENT_LOG_REPOSITORY, EVENT_LOG_REPOSITORY,
} from '../../../domain/repositories/event-log.repository.interface'; } from '../../../domain/repositories/event-log.repository.interface';
import {
IDeviceProfileRepository,
DEVICE_PROFILE_REPOSITORY,
} from '../../../domain/repositories/device-profile.repository.interface';
import { RedisService } from '../../../infrastructure/redis/redis.service'; import { RedisService } from '../../../infrastructure/redis/redis.service';
import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service'; import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service';
import { MetricsService } from '../../../infrastructure/metrics/metrics.service'; import { MetricsService } from '../../../infrastructure/metrics/metrics.service';
@ -27,6 +32,8 @@ export class RecordEventsHandler implements ICommandHandler<RecordEventsCommand>
constructor( constructor(
@Inject(EVENT_LOG_REPOSITORY) @Inject(EVENT_LOG_REPOSITORY)
private readonly eventLogRepository: IEventLogRepository, private readonly eventLogRepository: IEventLogRepository,
@Inject(DEVICE_PROFILE_REPOSITORY)
private readonly deviceProfileRepository: IDeviceProfileRepository,
private readonly redisService: RedisService, private readonly redisService: RedisService,
private readonly eventPublisher: EventPublisherService, private readonly eventPublisher: EventPublisherService,
private readonly metricsService: MetricsService, private readonly metricsService: MetricsService,
@ -52,10 +59,28 @@ export class RecordEventsHandler implements ICommandHandler<RecordEventsCommand>
return { accepted: 0, failed: events.length, errors }; return { accepted: 0, failed: events.length, errors };
} }
// 2. 批量写入数据库 // 2. 批量写入事件流水
await this.eventLogRepository.batchInsert(validLogs); await this.eventLogRepository.batchInsert(validLogs);
// 3. 更新实时DAU (HyperLogLog) // 3. Upsert 设备档案(大厂标准:事件流水 + 设备快照分离)
// 只有携带设备信息的事件才更新档案每台设备一行O(devices) 而非 O(events)
const deviceProfiles = validLogs
.filter((log) => log.installId.value)
.map((log) =>
DeviceProfile.create({
installId: log.installId.value,
userId: log.userId,
deviceBrand: log.deviceBrand,
deviceModel: log.deviceModel,
deviceOs: log.deviceOs,
appVersion: log.appVersion,
locale: log.locale,
eventCount: 1,
}),
);
await this.deviceProfileRepository.upsertBatch(deviceProfiles);
// 4. 更新实时DAU (HyperLogLog)
const todayKey = formatToDateKey(new Date()); const todayKey = formatToDateKey(new Date());
for (const log of validLogs) { for (const log of validLogs) {
if (log.eventName.isDauEvent()) { if (log.eventName.isDauEvent()) {
@ -64,7 +89,7 @@ export class RecordEventsHandler implements ICommandHandler<RecordEventsCommand>
log.dauIdentifier, log.dauIdentifier,
); );
// 4. 发布领域事件 // 5. 发布领域事件
await this.eventPublisher.publish( await this.eventPublisher.publish(
SessionStartedEvent.EVENT_NAME, SessionStartedEvent.EVENT_NAME,
new SessionStartedEvent( new SessionStartedEvent(
@ -81,7 +106,7 @@ export class RecordEventsHandler implements ICommandHandler<RecordEventsCommand>
} }
} }
// 5. 记录 Prometheus 指标 // 6. 记录 Prometheus 指标
for (const log of validLogs) { for (const log of validLogs) {
this.metricsService.recordEvent(log.eventName.value); this.metricsService.recordEvent(log.eventName.value);
} }

View File

@ -0,0 +1,36 @@
export interface DeviceProfileProps {
installId: string;
userId: string | null;
deviceBrand: string | null;
deviceModel: string | null;
deviceOs: string | null;
appVersion: string | null;
locale: string | null;
eventCount: number;
}
export class DeviceProfile {
readonly installId: string;
readonly userId: string | null;
readonly deviceBrand: string | null;
readonly deviceModel: string | null;
readonly deviceOs: string | null;
readonly appVersion: string | null;
readonly locale: string | null;
readonly eventCount: number;
private constructor(props: DeviceProfileProps) {
this.installId = props.installId;
this.userId = props.userId;
this.deviceBrand = props.deviceBrand;
this.deviceModel = props.deviceModel;
this.deviceOs = props.deviceOs;
this.appVersion = props.appVersion;
this.locale = props.locale;
this.eventCount = props.eventCount;
}
static create(props: DeviceProfileProps): DeviceProfile {
return new DeviceProfile(props);
}
}

View File

@ -0,0 +1,12 @@
import { DeviceProfile } from '../entities/device-profile.entity';
export const DEVICE_PROFILE_REPOSITORY = 'DEVICE_PROFILE_REPOSITORY';
export interface IDeviceProfileRepository {
/**
* upsert
* INSERT ... ON CONFLICT (install_id) DO UPDATE
* last_seen_at / app_version / event_count
*/
upsertBatch(profiles: DeviceProfile[]): Promise<void>;
}

View File

@ -6,12 +6,14 @@ import { OnlineSnapshotMapper } from './persistence/mappers/online-snapshot.mapp
import { EventLogRepositoryImpl } from './persistence/repositories/event-log.repository.impl'; import { EventLogRepositoryImpl } from './persistence/repositories/event-log.repository.impl';
import { DailyActiveStatsRepositoryImpl } from './persistence/repositories/daily-active-stats.repository.impl'; import { DailyActiveStatsRepositoryImpl } from './persistence/repositories/daily-active-stats.repository.impl';
import { OnlineSnapshotRepositoryImpl } from './persistence/repositories/online-snapshot.repository.impl'; import { OnlineSnapshotRepositoryImpl } from './persistence/repositories/online-snapshot.repository.impl';
import { DeviceProfileRepositoryImpl } from './persistence/repositories/device-profile.repository.impl';
import { RedisModule } from './redis/redis.module'; import { RedisModule } from './redis/redis.module';
import { KafkaModule } from './kafka/kafka.module'; import { KafkaModule } from './kafka/kafka.module';
import { MetricsModule } from './metrics/metrics.module'; import { MetricsModule } from './metrics/metrics.module';
import { EVENT_LOG_REPOSITORY } from '../domain/repositories/event-log.repository.interface'; import { EVENT_LOG_REPOSITORY } from '../domain/repositories/event-log.repository.interface';
import { DAILY_ACTIVE_STATS_REPOSITORY } from '../domain/repositories/daily-active-stats.repository.interface'; import { DAILY_ACTIVE_STATS_REPOSITORY } from '../domain/repositories/daily-active-stats.repository.interface';
import { ONLINE_SNAPSHOT_REPOSITORY } from '../domain/repositories/online-snapshot.repository.interface'; import { ONLINE_SNAPSHOT_REPOSITORY } from '../domain/repositories/online-snapshot.repository.interface';
import { DEVICE_PROFILE_REPOSITORY } from '../domain/repositories/device-profile.repository.interface';
@Module({ @Module({
imports: [RedisModule, KafkaModule, MetricsModule], imports: [RedisModule, KafkaModule, MetricsModule],
@ -32,12 +34,17 @@ import { ONLINE_SNAPSHOT_REPOSITORY } from '../domain/repositories/online-snapsh
provide: ONLINE_SNAPSHOT_REPOSITORY, provide: ONLINE_SNAPSHOT_REPOSITORY,
useClass: OnlineSnapshotRepositoryImpl, useClass: OnlineSnapshotRepositoryImpl,
}, },
{
provide: DEVICE_PROFILE_REPOSITORY,
useClass: DeviceProfileRepositoryImpl,
},
], ],
exports: [ exports: [
PrismaService, PrismaService,
EVENT_LOG_REPOSITORY, EVENT_LOG_REPOSITORY,
DAILY_ACTIVE_STATS_REPOSITORY, DAILY_ACTIVE_STATS_REPOSITORY,
ONLINE_SNAPSHOT_REPOSITORY, ONLINE_SNAPSHOT_REPOSITORY,
DEVICE_PROFILE_REPOSITORY,
RedisModule, RedisModule,
KafkaModule, KafkaModule,
MetricsModule, MetricsModule,

View File

@ -0,0 +1,46 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { IDeviceProfileRepository } from '../../../domain/repositories/device-profile.repository.interface';
import { DeviceProfile } from '../../../domain/entities/device-profile.entity';
@Injectable()
export class DeviceProfileRepositoryImpl implements IDeviceProfileRepository {
constructor(private readonly prisma: PrismaService) {}
async upsertBatch(profiles: DeviceProfile[]): Promise<void> {
if (profiles.length === 0) return;
// 同一批次中同一 install_id 可能重复,取最后一条
const map = new Map<string, DeviceProfile>();
for (const p of profiles) {
map.set(p.installId, p);
}
const unique = Array.from(map.values());
// 使用原生 SQL 实现高效 upsert:
// ON CONFLICT (install_id) DO UPDATE
// - 覆盖设备字段(始终更新为最新值)
// - last_seen_at 更新为当前时间
// - event_count 累加
const now = new Date();
for (const p of unique) {
await this.prisma.$executeRaw`
INSERT INTO analytics_device_profile
(install_id, user_id, device_brand, device_model, device_os, app_version, locale,
first_seen_at, last_seen_at, event_count)
VALUES
(${p.installId}, ${p.userId}, ${p.deviceBrand}, ${p.deviceModel},
${p.deviceOs}, ${p.appVersion}, ${p.locale}, ${now}, ${now}, ${p.eventCount})
ON CONFLICT (install_id) DO UPDATE SET
user_id = COALESCE(EXCLUDED.user_id, analytics_device_profile.user_id),
device_brand = COALESCE(EXCLUDED.device_brand, analytics_device_profile.device_brand),
device_model = COALESCE(EXCLUDED.device_model, analytics_device_profile.device_model),
device_os = COALESCE(EXCLUDED.device_os, analytics_device_profile.device_os),
app_version = COALESCE(EXCLUDED.app_version, analytics_device_profile.app_version),
locale = COALESCE(EXCLUDED.locale, analytics_device_profile.locale),
last_seen_at = ${now},
event_count = analytics_device_profile.event_count + ${p.eventCount}
`;
}
}
}