feat(reporting): 实现事件驱动的仪表板统计架构

## 概述
将 reporting-service Dashboard 从 HTTP API 调用改为事件驱动架构,
通过消费 Kafka 事件在本地维护统计数据,实现微服务间解耦。

## 架构变更
之前: Dashboard → HTTP → planting/authorization/identity-service
现在: 各服务 → Kafka → reporting-service → 本地统计表 → Dashboard

## 新增表
- RealtimeStats: 每日实时统计 (认种数/订单数/新用户/授权数)
- GlobalStats: 全局累计统计 (总认种/总用户/总公司数)

## 新增仓储
- IRealtimeStatsRepository: 实时统计接口及实现
- IGlobalStatsRepository: 全局统计接口及实现

## Kafka 消费者更新
- identity.UserAccountCreated: 累加用户统计
- identity.UserAccountAutoCreated: 累加用户统计
- authorization-events: 累加省/市公司统计
- planting.order.paid: 累加认种统计

## Dashboard 服务更新
- getStats(): 从 GlobalStats/RealtimeStats 读取,计算环比变化
- getTrendData(): 从 RealtimeStats 获取趋势数据

## 优势
- 消除跨服务 HTTP 调用延迟
- 统计数据实时更新
- 微服务间完全解耦

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-18 00:25:31 -08:00
parent 0310865834
commit e684068eae
14 changed files with 1940 additions and 0 deletions

View File

@ -0,0 +1,44 @@
-- CreateTable
CREATE TABLE "realtime_stats" (
"stats_id" BIGSERIAL NOT NULL,
"stats_date" DATE NOT NULL,
"daily_planting_count" INTEGER NOT NULL DEFAULT 0,
"daily_order_count" INTEGER NOT NULL DEFAULT 0,
"daily_planting_amount" DECIMAL(20,8) NOT NULL DEFAULT 0,
"daily_new_user_count" INTEGER NOT NULL DEFAULT 0,
"daily_province_auth_count" INTEGER NOT NULL DEFAULT 0,
"daily_city_auth_count" INTEGER NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "realtime_stats_pkey" PRIMARY KEY ("stats_id")
);
-- CreateTable
CREATE TABLE "global_stats" (
"stats_id" BIGSERIAL NOT NULL,
"stats_key" VARCHAR(20) NOT NULL DEFAULT 'global',
"total_planting_count" INTEGER NOT NULL DEFAULT 0,
"total_order_count" INTEGER NOT NULL DEFAULT 0,
"total_planting_amount" DECIMAL(20,8) NOT NULL DEFAULT 0,
"total_user_count" INTEGER NOT NULL DEFAULT 0,
"total_province_company_count" INTEGER NOT NULL DEFAULT 0,
"total_city_company_count" INTEGER NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "global_stats_pkey" PRIMARY KEY ("stats_id")
);
-- CreateIndex
CREATE UNIQUE INDEX "uk_realtime_stats_date" ON "realtime_stats"("stats_date");
-- CreateIndex
CREATE INDEX "idx_rs_date" ON "realtime_stats"("stats_date" DESC);
-- CreateIndex
CREATE UNIQUE INDEX "global_stats_stats_key_key" ON "global_stats"("stats_key");
-- Insert initial global stats record
INSERT INTO "global_stats" ("stats_key", "total_planting_count", "total_order_count", "total_planting_amount", "total_user_count", "total_province_company_count", "total_city_company_count", "updated_at")
VALUES ('global', 0, 0, 0, 0, 0, 0, CURRENT_TIMESTAMP);

View File

@ -0,0 +1,364 @@
-- CreateTable
CREATE TABLE "report_definitions" (
"definition_id" BIGSERIAL NOT NULL,
"report_type" VARCHAR(50) NOT NULL,
"report_name" VARCHAR(200) NOT NULL,
"report_code" VARCHAR(50) NOT NULL,
"description" TEXT,
"parameters" JSONB NOT NULL,
"schedule_cron" VARCHAR(100),
"schedule_timezone" VARCHAR(50) DEFAULT 'Asia/Shanghai',
"schedule_enabled" BOOLEAN NOT NULL DEFAULT false,
"output_formats" TEXT[],
"is_active" BOOLEAN NOT NULL DEFAULT true,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
"last_generated_at" TIMESTAMP(3),
CONSTRAINT "report_definitions_pkey" PRIMARY KEY ("definition_id")
);
-- CreateTable
CREATE TABLE "report_snapshots" (
"snapshot_id" BIGSERIAL NOT NULL,
"report_type" VARCHAR(50) NOT NULL,
"report_code" VARCHAR(50) NOT NULL,
"report_period" VARCHAR(20) NOT NULL,
"period_key" VARCHAR(30) NOT NULL,
"snapshot_data" JSONB NOT NULL,
"summary_data" JSONB,
"data_sources" TEXT[],
"data_freshness" INTEGER NOT NULL DEFAULT 0,
"filter_params" JSONB,
"row_count" INTEGER NOT NULL DEFAULT 0,
"period_start_at" TIMESTAMP(3) NOT NULL,
"period_end_at" TIMESTAMP(3) NOT NULL,
"generated_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"expires_at" TIMESTAMP(3),
CONSTRAINT "report_snapshots_pkey" PRIMARY KEY ("snapshot_id")
);
-- CreateTable
CREATE TABLE "report_files" (
"file_id" BIGSERIAL NOT NULL,
"snapshot_id" BIGINT NOT NULL,
"file_name" VARCHAR(500) NOT NULL,
"file_path" VARCHAR(1000) NOT NULL,
"file_url" VARCHAR(1000),
"file_size" BIGINT NOT NULL,
"file_format" VARCHAR(20) NOT NULL,
"mime_type" VARCHAR(100) NOT NULL,
"download_count" INTEGER NOT NULL DEFAULT 0,
"last_download_at" TIMESTAMP(3),
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"expires_at" TIMESTAMP(3),
CONSTRAINT "report_files_pkey" PRIMARY KEY ("file_id")
);
-- CreateTable
CREATE TABLE "analytics_metrics" (
"metric_id" BIGSERIAL NOT NULL,
"metric_type" VARCHAR(50) NOT NULL,
"metric_code" VARCHAR(50) NOT NULL,
"dimension_time" DATE,
"dimension_region" VARCHAR(100),
"dimension_user_type" VARCHAR(50),
"dimension_right_type" VARCHAR(50),
"metric_value" DECIMAL(20,8) NOT NULL,
"metric_data" JSONB,
"calculated_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "analytics_metrics_pkey" PRIMARY KEY ("metric_id")
);
-- CreateTable
CREATE TABLE "planting_daily_stats" (
"stat_id" BIGSERIAL NOT NULL,
"stat_date" DATE NOT NULL,
"province_code" VARCHAR(10),
"city_code" VARCHAR(10),
"order_count" INTEGER NOT NULL DEFAULT 0,
"tree_count" INTEGER NOT NULL DEFAULT 0,
"total_amount" DECIMAL(20,8) NOT NULL DEFAULT 0,
"new_user_count" INTEGER NOT NULL DEFAULT 0,
"active_user_count" INTEGER NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "planting_daily_stats_pkey" PRIMARY KEY ("stat_id")
);
-- CreateTable
CREATE TABLE "community_stats" (
"stat_id" BIGSERIAL NOT NULL,
"community_id" BIGINT NOT NULL,
"community_name" VARCHAR(200) NOT NULL,
"parent_community_id" BIGINT,
"stat_date" DATE NOT NULL,
"total_planting" INTEGER NOT NULL DEFAULT 0,
"daily_planting" INTEGER NOT NULL DEFAULT 0,
"weekly_planting" INTEGER NOT NULL DEFAULT 0,
"monthly_planting" INTEGER NOT NULL DEFAULT 0,
"member_count" INTEGER NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "community_stats_pkey" PRIMARY KEY ("stat_id")
);
-- CreateTable
CREATE TABLE "system_account_monthly_stats" (
"stat_id" BIGSERIAL NOT NULL,
"account_id" BIGINT NOT NULL,
"account_type" VARCHAR(30) NOT NULL,
"account_name" VARCHAR(200) NOT NULL,
"region_code" VARCHAR(10) NOT NULL,
"stat_month" VARCHAR(7) NOT NULL,
"monthly_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0,
"cumulative_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0,
"monthly_mining" DECIMAL(20,8) NOT NULL DEFAULT 0,
"cumulative_mining" DECIMAL(20,8) NOT NULL DEFAULT 0,
"monthly_commission" DECIMAL(20,8) NOT NULL DEFAULT 0,
"cumulative_commission" DECIMAL(20,8) NOT NULL DEFAULT 0,
"monthly_planting_bonus" DECIMAL(20,8) NOT NULL DEFAULT 0,
"cumulative_planting_bonus" DECIMAL(20,8) NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "system_account_monthly_stats_pkey" PRIMARY KEY ("stat_id")
);
-- CreateTable
CREATE TABLE "system_account_income_records" (
"record_id" BIGSERIAL NOT NULL,
"account_id" BIGINT NOT NULL,
"account_type" VARCHAR(30) NOT NULL,
"income_type" VARCHAR(50) NOT NULL,
"income_amount" DECIMAL(20,8) NOT NULL,
"currency" VARCHAR(10) NOT NULL,
"source_type" VARCHAR(50) NOT NULL,
"source_id" VARCHAR(100),
"source_user_id" BIGINT,
"source_address" VARCHAR(200),
"transaction_no" VARCHAR(100),
"memo" TEXT,
"occurred_at" TIMESTAMP(3) NOT NULL,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "system_account_income_records_pkey" PRIMARY KEY ("record_id")
);
-- CreateTable
CREATE TABLE "report_events" (
"event_id" BIGSERIAL NOT NULL,
"event_type" VARCHAR(50) NOT NULL,
"aggregate_id" VARCHAR(100) NOT NULL,
"aggregate_type" VARCHAR(50) NOT NULL,
"event_data" JSONB NOT NULL,
"user_id" BIGINT,
"occurred_at" TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"version" INTEGER NOT NULL DEFAULT 1,
CONSTRAINT "report_events_pkey" PRIMARY KEY ("event_id")
);
-- CreateTable
CREATE TABLE "dashboard_stats_snapshots" (
"snapshot_id" BIGSERIAL NOT NULL,
"snapshot_date" DATE NOT NULL,
"total_planting_count" INTEGER NOT NULL DEFAULT 0,
"total_planting_change" DECIMAL(5,2) NOT NULL DEFAULT 0,
"active_user_count" INTEGER NOT NULL DEFAULT 0,
"active_user_change" DECIMAL(5,2) NOT NULL DEFAULT 0,
"province_company_count" INTEGER NOT NULL DEFAULT 0,
"province_company_change" DECIMAL(5,2) NOT NULL DEFAULT 0,
"city_company_count" INTEGER NOT NULL DEFAULT 0,
"city_company_change" DECIMAL(5,2) NOT NULL DEFAULT 0,
"region_distribution" JSONB,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "dashboard_stats_snapshots_pkey" PRIMARY KEY ("snapshot_id")
);
-- CreateTable
CREATE TABLE "dashboard_trend_data" (
"trend_id" BIGSERIAL NOT NULL,
"trend_date" DATE NOT NULL,
"planting_count" INTEGER NOT NULL DEFAULT 0,
"order_count" INTEGER NOT NULL DEFAULT 0,
"new_user_count" INTEGER NOT NULL DEFAULT 0,
"active_user_count" INTEGER NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "dashboard_trend_data_pkey" PRIMARY KEY ("trend_id")
);
-- CreateTable
CREATE TABLE "system_activities" (
"activity_id" BIGSERIAL NOT NULL,
"activity_type" VARCHAR(50) NOT NULL,
"title" VARCHAR(200) NOT NULL,
"description" VARCHAR(500) NOT NULL,
"icon" VARCHAR(10) NOT NULL DEFAULT '📌',
"related_user_id" BIGINT,
"related_entity_id" VARCHAR(100),
"related_entity_type" VARCHAR(50),
"metadata" JSONB,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "system_activities_pkey" PRIMARY KEY ("activity_id")
);
-- CreateIndex
CREATE UNIQUE INDEX "report_definitions_report_code_key" ON "report_definitions"("report_code");
-- CreateIndex
CREATE INDEX "idx_def_type" ON "report_definitions"("report_type");
-- CreateIndex
CREATE INDEX "idx_def_active" ON "report_definitions"("is_active");
-- CreateIndex
CREATE INDEX "idx_def_scheduled" ON "report_definitions"("schedule_enabled");
-- CreateIndex
CREATE INDEX "idx_snapshot_type" ON "report_snapshots"("report_type");
-- CreateIndex
CREATE INDEX "idx_snapshot_code" ON "report_snapshots"("report_code");
-- CreateIndex
CREATE INDEX "idx_snapshot_period" ON "report_snapshots"("period_key");
-- CreateIndex
CREATE INDEX "idx_snapshot_generated" ON "report_snapshots"("generated_at" DESC);
-- CreateIndex
CREATE INDEX "idx_snapshot_expires" ON "report_snapshots"("expires_at");
-- CreateIndex
CREATE UNIQUE INDEX "report_snapshots_report_code_period_key_key" ON "report_snapshots"("report_code", "period_key");
-- CreateIndex
CREATE INDEX "idx_file_snapshot" ON "report_files"("snapshot_id");
-- CreateIndex
CREATE INDEX "idx_file_format" ON "report_files"("file_format");
-- CreateIndex
CREATE INDEX "idx_file_created" ON "report_files"("created_at" DESC);
-- CreateIndex
CREATE INDEX "idx_metric_type" ON "analytics_metrics"("metric_type");
-- CreateIndex
CREATE INDEX "idx_metric_code" ON "analytics_metrics"("metric_code");
-- CreateIndex
CREATE INDEX "idx_metric_time" ON "analytics_metrics"("dimension_time");
-- CreateIndex
CREATE INDEX "idx_metric_region" ON "analytics_metrics"("dimension_region");
-- CreateIndex
CREATE UNIQUE INDEX "analytics_metrics_metric_code_dimension_time_dimension_regi_key" ON "analytics_metrics"("metric_code", "dimension_time", "dimension_region", "dimension_user_type", "dimension_right_type");
-- CreateIndex
CREATE INDEX "idx_pds_date" ON "planting_daily_stats"("stat_date");
-- CreateIndex
CREATE INDEX "idx_pds_province" ON "planting_daily_stats"("province_code");
-- CreateIndex
CREATE INDEX "idx_pds_city" ON "planting_daily_stats"("city_code");
-- CreateIndex
CREATE UNIQUE INDEX "planting_daily_stats_stat_date_province_code_city_code_key" ON "planting_daily_stats"("stat_date", "province_code", "city_code");
-- CreateIndex
CREATE INDEX "idx_cs_community" ON "community_stats"("community_id");
-- CreateIndex
CREATE INDEX "idx_cs_name" ON "community_stats"("community_name");
-- CreateIndex
CREATE INDEX "idx_cs_date" ON "community_stats"("stat_date");
-- CreateIndex
CREATE INDEX "idx_cs_parent" ON "community_stats"("parent_community_id");
-- CreateIndex
CREATE UNIQUE INDEX "community_stats_community_id_stat_date_key" ON "community_stats"("community_id", "stat_date");
-- CreateIndex
CREATE INDEX "idx_sams_type" ON "system_account_monthly_stats"("account_type");
-- CreateIndex
CREATE INDEX "idx_sams_month" ON "system_account_monthly_stats"("stat_month");
-- CreateIndex
CREATE INDEX "idx_sams_region" ON "system_account_monthly_stats"("region_code");
-- CreateIndex
CREATE UNIQUE INDEX "system_account_monthly_stats_account_id_stat_month_key" ON "system_account_monthly_stats"("account_id", "stat_month");
-- CreateIndex
CREATE INDEX "idx_sair_account" ON "system_account_income_records"("account_id");
-- CreateIndex
CREATE INDEX "idx_sair_type" ON "system_account_income_records"("account_type");
-- CreateIndex
CREATE INDEX "idx_sair_income_type" ON "system_account_income_records"("income_type");
-- CreateIndex
CREATE INDEX "idx_sair_source_type" ON "system_account_income_records"("source_type");
-- CreateIndex
CREATE INDEX "idx_sair_address" ON "system_account_income_records"("source_address");
-- CreateIndex
CREATE INDEX "idx_sair_txno" ON "system_account_income_records"("transaction_no");
-- CreateIndex
CREATE INDEX "idx_sair_occurred" ON "system_account_income_records"("occurred_at" DESC);
-- CreateIndex
CREATE INDEX "idx_report_event_aggregate" ON "report_events"("aggregate_type", "aggregate_id");
-- CreateIndex
CREATE INDEX "idx_report_event_type" ON "report_events"("event_type");
-- CreateIndex
CREATE INDEX "idx_report_event_occurred" ON "report_events"("occurred_at");
-- CreateIndex
CREATE INDEX "idx_dss_date" ON "dashboard_stats_snapshots"("snapshot_date" DESC);
-- CreateIndex
CREATE UNIQUE INDEX "dashboard_stats_snapshots_snapshot_date_key" ON "dashboard_stats_snapshots"("snapshot_date");
-- CreateIndex
CREATE INDEX "idx_dtd_date" ON "dashboard_trend_data"("trend_date" DESC);
-- CreateIndex
CREATE UNIQUE INDEX "dashboard_trend_data_trend_date_key" ON "dashboard_trend_data"("trend_date");
-- CreateIndex
CREATE INDEX "idx_sa_type" ON "system_activities"("activity_type");
-- CreateIndex
CREATE INDEX "idx_sa_created" ON "system_activities"("created_at" DESC);
-- CreateIndex
CREATE INDEX "idx_sa_user" ON "system_activities"("related_user_id");
-- CreateIndex
CREATE INDEX "idx_sa_entity" ON "system_activities"("related_entity_type", "related_entity_id");
-- AddForeignKey
ALTER TABLE "report_files" ADD CONSTRAINT "report_files_snapshot_id_fkey" FOREIGN KEY ("snapshot_id") REFERENCES "report_snapshots"("snapshot_id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@ -0,0 +1,3 @@
# Please do not edit this file manually
# It should be added in your version-control system (i.e. Git)
provider = "postgresql"

View File

@ -320,3 +320,157 @@ model ReportEvent {
@@index([eventType], name: "idx_report_event_type")
@@index([occurredAt], name: "idx_report_event_occurred")
}
// ============================================
// 仪表板统计快照表
// 缓存仪表板统计数据,避免实时计算
// ============================================
model DashboardStatsSnapshot {
id BigInt @id @default(autoincrement()) @map("snapshot_id")
// === 快照日期 ===
snapshotDate DateTime @map("snapshot_date") @db.Date
// === 核心统计数据 ===
totalPlantingCount Int @default(0) @map("total_planting_count")
totalPlantingChange Decimal @default(0) @map("total_planting_change") @db.Decimal(5, 2)
activeUserCount Int @default(0) @map("active_user_count")
activeUserChange Decimal @default(0) @map("active_user_change") @db.Decimal(5, 2)
provinceCompanyCount Int @default(0) @map("province_company_count")
provinceCompanyChange Decimal @default(0) @map("province_company_change") @db.Decimal(5, 2)
cityCompanyCount Int @default(0) @map("city_company_count")
cityCompanyChange Decimal @default(0) @map("city_company_change") @db.Decimal(5, 2)
// === 区域分布数据 (JSON) ===
regionDistribution Json? @map("region_distribution")
// === 时间戳 ===
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@map("dashboard_stats_snapshots")
@@unique([snapshotDate], name: "uk_dashboard_snapshot_date")
@@index([snapshotDate(sort: Desc)], name: "idx_dss_date")
}
// ============================================
// 仪表板趋势数据表
// 存储每日趋势数据点
// ============================================
model DashboardTrendData {
id BigInt @id @default(autoincrement()) @map("trend_id")
// === 数据日期 ===
trendDate DateTime @map("trend_date") @db.Date
// === 趋势指标 ===
plantingCount Int @default(0) @map("planting_count")
orderCount Int @default(0) @map("order_count")
newUserCount Int @default(0) @map("new_user_count")
activeUserCount Int @default(0) @map("active_user_count")
// === 时间戳 ===
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@map("dashboard_trend_data")
@@unique([trendDate], name: "uk_trend_date")
@@index([trendDate(sort: Desc)], name: "idx_dtd_date")
}
// ============================================
// 实时统计表 - 通过 Kafka 事件累加
// Dashboard 直接从此表读取数据
// ============================================
model RealtimeStats {
id BigInt @id @default(autoincrement()) @map("stats_id")
// === 统计日期 ===
statsDate DateTime @map("stats_date") @db.Date
// === 认种统计 (来自 planting.order.paid) ===
dailyPlantingCount Int @default(0) @map("daily_planting_count")
dailyOrderCount Int @default(0) @map("daily_order_count")
dailyPlantingAmount Decimal @default(0) @map("daily_planting_amount") @db.Decimal(20, 8)
// === 用户统计 (来自 identity.UserAccountCreated) ===
dailyNewUserCount Int @default(0) @map("daily_new_user_count")
// === 授权统计 (来自 authorization-events) ===
dailyProvinceAuthCount Int @default(0) @map("daily_province_auth_count")
dailyCityAuthCount Int @default(0) @map("daily_city_auth_count")
// === 时间戳 ===
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@map("realtime_stats")
@@unique([statsDate], name: "uk_realtime_stats_date")
@@index([statsDate(sort: Desc)], name: "idx_rs_date")
}
// ============================================
// 全局累计统计表 - 存储总量数据
// ============================================
model GlobalStats {
id BigInt @id @default(autoincrement()) @map("stats_id")
// === 统计键 (单行记录) ===
statsKey String @unique @default("global") @map("stats_key") @db.VarChar(20)
// === 累计认种统计 ===
totalPlantingCount Int @default(0) @map("total_planting_count")
totalOrderCount Int @default(0) @map("total_order_count")
totalPlantingAmount Decimal @default(0) @map("total_planting_amount") @db.Decimal(20, 8)
// === 累计用户统计 ===
totalUserCount Int @default(0) @map("total_user_count")
// === 累计授权统计 ===
totalProvinceCompanyCount Int @default(0) @map("total_province_company_count")
totalCityCompanyCount Int @default(0) @map("total_city_company_count")
// === 时间戳 ===
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@map("global_stats")
}
// ============================================
// 系统活动日志表
// 记录系统中的重要活动事件
// ============================================
model SystemActivity {
id BigInt @id @default(autoincrement()) @map("activity_id")
// === 活动类型 ===
// user_register: 用户注册
// company_authorization: 公司授权
// planting_order: 认种订单
// system_update: 系统更新
// report_generated: 报表生成
activityType String @map("activity_type") @db.VarChar(50)
// === 活动信息 ===
title String @map("title") @db.VarChar(200)
description String @map("description") @db.VarChar(500)
icon String @default("📌") @map("icon") @db.VarChar(10)
// === 关联信息 ===
relatedUserId BigInt? @map("related_user_id")
relatedEntityId String? @map("related_entity_id") @db.VarChar(100)
relatedEntityType String? @map("related_entity_type") @db.VarChar(50)
// === 元数据 ===
metadata Json? @map("metadata")
// === 时间戳 ===
createdAt DateTime @default(now()) @map("created_at")
@@map("system_activities")
@@index([activityType], name: "idx_sa_type")
@@index([createdAt(sort: Desc)], name: "idx_sa_created")
@@index([relatedUserId], name: "idx_sa_user")
@@index([relatedEntityType, relatedEntityId], name: "idx_sa_entity")
}

View File

@ -0,0 +1,417 @@
import { Injectable, Logger, Inject } from '@nestjs/common';
import { DashboardPeriod, DashboardPeriodDays } from '../../domain/value-objects';
import {
DashboardStatItemDto,
DashboardStatsResponseDto,
} from '../../api/dto/response/dashboard-stats.dto';
import {
DashboardTrendDataDto,
DashboardTrendResponseDto,
TrendDataPointDto,
} from '../../api/dto/response/dashboard-trend.dto';
import {
DashboardActivityItemDto,
DashboardActivitiesResponseDto,
ActivityType,
} from '../../api/dto/response/dashboard-activity.dto';
import {
RegionDistributionItemDto,
DashboardRegionResponseDto,
} from '../../api/dto/response/dashboard-region.dto';
import {
IDashboardStatsSnapshotRepository,
IDashboardTrendDataRepository,
ISystemActivityRepository,
IRealtimeStatsRepository,
IGlobalStatsRepository,
DASHBOARD_STATS_SNAPSHOT_REPOSITORY,
DASHBOARD_TREND_DATA_REPOSITORY,
SYSTEM_ACTIVITY_REPOSITORY,
REALTIME_STATS_REPOSITORY,
GLOBAL_STATS_REPOSITORY,
} from '../../domain/repositories';
@Injectable()
export class DashboardApplicationService {
private readonly logger = new Logger(DashboardApplicationService.name);
constructor(
@Inject(DASHBOARD_STATS_SNAPSHOT_REPOSITORY)
private readonly statsSnapshotRepo: IDashboardStatsSnapshotRepository,
@Inject(DASHBOARD_TREND_DATA_REPOSITORY)
private readonly trendDataRepo: IDashboardTrendDataRepository,
@Inject(SYSTEM_ACTIVITY_REPOSITORY)
private readonly activityRepo: ISystemActivityRepository,
@Inject(REALTIME_STATS_REPOSITORY)
private readonly realtimeStatsRepo: IRealtimeStatsRepository,
@Inject(GLOBAL_STATS_REPOSITORY)
private readonly globalStatsRepo: IGlobalStatsRepository,
) {}
/**
*
* GlobalStats RealtimeStats Kafka
*/
async getStats(): Promise<DashboardStatsResponseDto> {
this.logger.debug('Fetching dashboard stats from local tables');
try {
// 从本地表获取全局统计和今日统计
const [globalStats, todayStats, yesterdayStats] = await Promise.all([
this.globalStatsRepo.getGlobalStats(),
this.realtimeStatsRepo.findByDate(new Date()),
this.realtimeStatsRepo.findByDate(this.getYesterday()),
]);
// 计算环比变化
const todayPlanting = todayStats?.dailyPlantingCount || 0;
const yesterdayPlanting = yesterdayStats?.dailyPlantingCount || 0;
const plantingChange = this.calculateChangePercent(todayPlanting, yesterdayPlanting);
const todayNewUsers = todayStats?.dailyNewUserCount || 0;
const yesterdayNewUsers = yesterdayStats?.dailyNewUserCount || 0;
const userChange = this.calculateChangePercent(todayNewUsers, yesterdayNewUsers);
const todayProvinceAuth = todayStats?.dailyProvinceAuthCount || 0;
const yesterdayProvinceAuth = yesterdayStats?.dailyProvinceAuthCount || 0;
const provinceChange = this.calculateChangePercent(todayProvinceAuth, yesterdayProvinceAuth);
const todayCityAuth = todayStats?.dailyCityAuthCount || 0;
const yesterdayCityAuth = yesterdayStats?.dailyCityAuthCount || 0;
const cityChange = this.calculateChangePercent(todayCityAuth, yesterdayCityAuth);
const stats: DashboardStatItemDto[] = [
{
title: '总认种量',
value: globalStats.totalPlantingCount,
suffix: '棵',
change: {
value: Math.abs(plantingChange),
trend: plantingChange >= 0 ? 'up' : 'down',
},
color: '#1565C0',
},
{
title: '总用户数',
value: globalStats.totalUserCount,
suffix: '人',
change: {
value: Math.abs(userChange),
trend: userChange >= 0 ? 'up' : 'down',
},
color: '#4CAF50',
},
{
title: '省级公司',
value: globalStats.totalProvinceCompanyCount,
suffix: '家',
change: {
value: Math.abs(provinceChange),
trend: provinceChange >= 0 ? 'up' : 'down',
},
color: '#F5A623',
},
{
title: '市级公司',
value: globalStats.totalCityCompanyCount,
suffix: '家',
change: {
value: Math.abs(cityChange),
trend: cityChange >= 0 ? 'up' : 'down',
},
color: '#9C27B0',
},
];
return { stats };
} catch (error) {
this.logger.error('Failed to fetch dashboard stats from local tables', error);
// 返回默认值
return {
stats: [
{ title: '总认种量', value: 0, suffix: '棵', change: { value: 0, trend: 'up' }, color: '#1565C0' },
{ title: '总用户数', value: 0, suffix: '人', change: { value: 0, trend: 'up' }, color: '#4CAF50' },
{ title: '省级公司', value: 0, suffix: '家', change: { value: 0, trend: 'up' }, color: '#F5A623' },
{ title: '市级公司', value: 0, suffix: '家', change: { value: 0, trend: 'up' }, color: '#9C27B0' },
],
};
}
}
/**
*
*/
private calculateChangePercent(current: number, previous: number): number {
if (previous === 0) {
return current > 0 ? 100 : 0;
}
return Math.round(((current - previous) / previous) * 100 * 10) / 10;
}
/**
*
*/
private getYesterday(): Date {
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
return yesterday;
}
/**
*
* RealtimeStats
*/
async getTrendData(period: DashboardPeriod): Promise<DashboardTrendResponseDto> {
this.logger.debug(`Fetching dashboard trend data for period: ${period}`);
const days = DashboardPeriodDays[period];
// 从 RealtimeStats 获取最近N天的数据
const realtimeStats = await this.realtimeStatsRepo.findRecentDays(days);
let data: TrendDataPointDto[];
if (realtimeStats.length > 0) {
// 使用真实数据
data = realtimeStats.map((item) => ({
date: this.formatDate(item.statsDate),
value: item.dailyPlantingCount,
}));
} else {
// 如果没有数据,尝试从旧的 trendDataRepo 获取
const trendData = await this.trendDataRepo.findRecentDays(days);
if (trendData.length > 0) {
data = trendData.map((item) => ({
date: this.formatDate(item.trendDate),
value: item.plantingCount,
}));
} else {
// 如果没有数据,生成模拟数据
this.logger.debug('No trend data found, generating mock data');
data = this.generateTrendData(days);
}
}
return {
trend: {
period,
data,
},
};
}
/**
*
*
*/
async getActivities(limit: number): Promise<DashboardActivitiesResponseDto> {
this.logger.debug(`Fetching dashboard activities, limit: ${limit}`);
// 从数据库获取活动记录
const dbActivities = await this.activityRepo.findRecent(limit);
let activities: DashboardActivityItemDto[];
if (dbActivities.length > 0) {
// 使用真实数据
activities = dbActivities.map((activity) => ({
id: String(activity.id),
type: activity.activityType as ActivityType,
icon: activity.icon || '📌',
title: activity.title,
description: activity.description,
timestamp: this.formatRelativeTime(
Math.floor((Date.now() - (activity.createdAt?.getTime() || Date.now())) / 60000),
),
createdAt: activity.createdAt?.toISOString() || new Date().toISOString(),
}));
} else {
// 如果没有数据,返回模拟数据
this.logger.debug('No activities found, generating mock data');
activities = this.generateMockActivities(limit);
}
return { activities };
}
/**
*
*/
async recordActivity(
activityType: ActivityType | string,
title: string,
description: string,
options?: {
icon?: string;
relatedUserId?: bigint;
relatedEntityId?: string;
relatedEntityType?: string;
metadata?: Record<string, unknown>;
},
): Promise<void> {
await this.activityRepo.create({
activityType,
title,
description,
icon: options?.icon,
relatedUserId: options?.relatedUserId,
relatedEntityId: options?.relatedEntityId,
relatedEntityType: options?.relatedEntityType,
metadata: options?.metadata,
});
}
/**
*
*/
async getRegionDistribution(): Promise<DashboardRegionResponseDto> {
this.logger.debug('Fetching region distribution');
// TODO: 从 planting-service 获取真实区域分布
// 目前返回模拟数据
const regions: RegionDistributionItemDto[] = [
{ region: '华东地区', percentage: 35, color: '#1565C0' },
{ region: '华南地区', percentage: 25, color: '#4CAF50' },
{ region: '华北地区', percentage: 20, color: '#F5A623' },
{ region: '华中地区', percentage: 12, color: '#9C27B0' },
{ region: '其他地区', percentage: 8, color: '#607D8B' },
];
return { regions };
}
/**
*
*/
private generateTrendData(days: number): TrendDataPointDto[] {
const data: TrendDataPointDto[] = [];
const now = new Date();
for (let i = days - 1; i >= 0; i--) {
const date = new Date(now);
date.setDate(date.getDate() - i);
data.push({
date: this.formatDate(date),
value: Math.floor(Math.random() * 300) + 100, // 100-400随机值
});
}
return data;
}
/**
*
*/
private generateMockActivities(limit: number): DashboardActivityItemDto[] {
const activityTemplates: Array<{
type: ActivityType;
icon: string;
title: string;
descriptionTemplate: string;
}> = [
{
type: 'user_register',
icon: '👤',
title: '新用户注册',
descriptionTemplate: '用户 {name} 完成注册',
},
{
type: 'company_authorization',
icon: '🏢',
title: '公司授权',
descriptionTemplate: '{province}省公司完成授权',
},
{
type: 'planting_order',
icon: '🌳',
title: '认种订单',
descriptionTemplate: '用户 {name} 认种了 {count} 棵榴莲树',
},
{
type: 'system_update',
icon: '⚙️',
title: '系统更新',
descriptionTemplate: '{feature} 已更新',
},
{
type: 'report_generated',
icon: '📊',
title: '报表生成',
descriptionTemplate: '{month}月份运营报表已生成',
},
];
const names = ['张三', '李四', '王五', '赵六', '钱七'];
const provinces = ['广东', '浙江', '江苏', '山东', '四川'];
const features = ['龙虎榜规则', '结算规则', '授权流程'];
const months = ['1', '2', '3', '11', '12'];
const activities: DashboardActivityItemDto[] = [];
const now = new Date();
for (let i = 0; i < limit; i++) {
const template = activityTemplates[i % activityTemplates.length];
const minutesAgo = (i + 1) * 15 + Math.floor(Math.random() * 30);
const createdAt = new Date(now.getTime() - minutesAgo * 60 * 1000);
let description = template.descriptionTemplate;
description = description.replace('{name}', names[Math.floor(Math.random() * names.length)]);
description = description.replace('{province}', provinces[Math.floor(Math.random() * provinces.length)]);
description = description.replace('{count}', String(Math.floor(Math.random() * 10) + 1));
description = description.replace('{feature}', features[Math.floor(Math.random() * features.length)]);
description = description.replace('{month}', months[Math.floor(Math.random() * months.length)]);
activities.push({
id: String(i + 1),
type: template.type,
icon: template.icon,
title: template.title,
description,
timestamp: this.formatRelativeTime(minutesAgo),
createdAt: createdAt.toISOString(),
});
}
return activities;
}
/**
* MM-DD
*/
private formatDate(date: Date): string {
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
return `${month}-${day}`;
}
/**
*
*/
private formatRelativeTime(minutesAgo: number): string {
if (minutesAgo < 60) {
return `${minutesAgo}分钟前`;
}
const hoursAgo = Math.floor(minutesAgo / 60);
if (hoursAgo < 24) {
return `${hoursAgo}小时前`;
}
const daysAgo = Math.floor(hoursAgo / 24);
return `${daysAgo}天前`;
}
/**
*
*/
private getStartOfMonth(): Date {
const now = new Date();
return new Date(now.getFullYear(), now.getMonth(), 1);
}
/**
* 00:00:00
*/
private getStartOfDay(date: Date): Date {
return new Date(date.getFullYear(), date.getMonth(), date.getDate());
}
}

View File

@ -0,0 +1,57 @@
import { Decimal } from '@prisma/client/runtime/library';
/**
*
*
*/
export const GLOBAL_STATS_REPOSITORY = Symbol('GLOBAL_STATS_REPOSITORY');
export interface GlobalStatsData {
id?: bigint;
statsKey: string;
totalPlantingCount: number;
totalOrderCount: number;
totalPlantingAmount: Decimal;
totalUserCount: number;
totalProvinceCompanyCount: number;
totalCityCompanyCount: number;
createdAt?: Date;
updatedAt?: Date;
}
export interface IGlobalStatsRepository {
/**
*
*/
getGlobalStats(): Promise<GlobalStatsData>;
/**
*
*/
incrementPlanting(treeCount: number, amount: Decimal): Promise<GlobalStatsData>;
/**
*
*/
incrementOrder(): Promise<GlobalStatsData>;
/**
*
*/
incrementUser(): Promise<GlobalStatsData>;
/**
*
*/
incrementProvinceCompany(): Promise<GlobalStatsData>;
/**
*
*/
incrementCityCompany(): Promise<GlobalStatsData>;
/**
*
*/
updateStats(updates: Partial<GlobalStatsData>): Promise<GlobalStatsData>;
}

View File

@ -1,3 +1,8 @@
export * from './report-definition.repository.interface';
export * from './report-snapshot.repository.interface';
export * from './report-file.repository.interface';
export * from './dashboard-stats-snapshot.repository.interface';
export * from './dashboard-trend-data.repository.interface';
export * from './system-activity.repository.interface';
export * from './realtime-stats.repository.interface';
export * from './global-stats.repository.interface';

View File

@ -0,0 +1,71 @@
import { Decimal } from '@prisma/client/runtime/library';
/**
*
* Kafka
*/
export const REALTIME_STATS_REPOSITORY = Symbol('REALTIME_STATS_REPOSITORY');
export interface RealtimeStatsData {
id?: bigint;
statsDate: Date;
dailyPlantingCount: number;
dailyOrderCount: number;
dailyPlantingAmount: Decimal;
dailyNewUserCount: number;
dailyProvinceAuthCount: number;
dailyCityAuthCount: number;
createdAt?: Date;
updatedAt?: Date;
}
export interface IRealtimeStatsRepository {
/**
*
*/
getOrCreateByDate(date: Date): Promise<RealtimeStatsData>;
/**
*
*/
findByDate(date: Date): Promise<RealtimeStatsData | null>;
/**
*
*/
incrementPlanting(
date: Date,
treeCount: number,
amount: Decimal,
): Promise<RealtimeStatsData>;
/**
*
*/
incrementOrder(date: Date): Promise<RealtimeStatsData>;
/**
*
*/
incrementNewUser(date: Date): Promise<RealtimeStatsData>;
/**
*
*/
incrementProvinceAuth(date: Date): Promise<RealtimeStatsData>;
/**
*
*/
incrementCityAuth(date: Date): Promise<RealtimeStatsData>;
/**
* N天的统计数据
*/
findRecentDays(days: number): Promise<RealtimeStatsData[]>;
/**
*
*/
findByDateRange(startDate: Date, endDate: Date): Promise<RealtimeStatsData[]>;
}

View File

@ -3,13 +3,25 @@ import { PrismaService } from './persistence/prisma/prisma.service';
import { ReportDefinitionRepository } from './persistence/repositories/report-definition.repository.impl';
import { ReportSnapshotRepository } from './persistence/repositories/report-snapshot.repository.impl';
import { ReportFileRepository } from './persistence/repositories/report-file.repository.impl';
import { DashboardStatsSnapshotRepository } from './persistence/repositories/dashboard-stats-snapshot.repository.impl';
import { DashboardTrendDataRepository } from './persistence/repositories/dashboard-trend-data.repository.impl';
import { SystemActivityRepository } from './persistence/repositories/system-activity.repository.impl';
import { RealtimeStatsRepository } from './persistence/repositories/realtime-stats.repository.impl';
import { GlobalStatsRepository } from './persistence/repositories/global-stats.repository.impl';
import {
REPORT_DEFINITION_REPOSITORY,
REPORT_SNAPSHOT_REPOSITORY,
REPORT_FILE_REPOSITORY,
DASHBOARD_STATS_SNAPSHOT_REPOSITORY,
DASHBOARD_TREND_DATA_REPOSITORY,
SYSTEM_ACTIVITY_REPOSITORY,
REALTIME_STATS_REPOSITORY,
GLOBAL_STATS_REPOSITORY,
} from '../domain/repositories';
import { LeaderboardServiceClient } from './external/leaderboard-service/leaderboard-service.client';
import { PlantingServiceClient } from './external/planting-service/planting-service.client';
import { AuthorizationServiceClient } from './external/authorization-service/authorization-service.client';
import { IdentityServiceClient } from './external/identity-service/identity-service.client';
import { ExportModule } from './export/export.module';
import { RedisModule } from './redis/redis.module';
@ -29,16 +41,45 @@ import { RedisModule } from './redis/redis.module';
provide: REPORT_FILE_REPOSITORY,
useClass: ReportFileRepository,
},
{
provide: DASHBOARD_STATS_SNAPSHOT_REPOSITORY,
useClass: DashboardStatsSnapshotRepository,
},
{
provide: DASHBOARD_TREND_DATA_REPOSITORY,
useClass: DashboardTrendDataRepository,
},
{
provide: SYSTEM_ACTIVITY_REPOSITORY,
useClass: SystemActivityRepository,
},
{
provide: REALTIME_STATS_REPOSITORY,
useClass: RealtimeStatsRepository,
},
{
provide: GLOBAL_STATS_REPOSITORY,
useClass: GlobalStatsRepository,
},
LeaderboardServiceClient,
PlantingServiceClient,
AuthorizationServiceClient,
IdentityServiceClient,
],
exports: [
PrismaService,
REPORT_DEFINITION_REPOSITORY,
REPORT_SNAPSHOT_REPOSITORY,
REPORT_FILE_REPOSITORY,
DASHBOARD_STATS_SNAPSHOT_REPOSITORY,
DASHBOARD_TREND_DATA_REPOSITORY,
SYSTEM_ACTIVITY_REPOSITORY,
REALTIME_STATS_REPOSITORY,
GLOBAL_STATS_REPOSITORY,
LeaderboardServiceClient,
PlantingServiceClient,
AuthorizationServiceClient,
IdentityServiceClient,
ExportModule,
RedisModule,
],

View File

@ -0,0 +1,355 @@
import { Controller, Logger, Inject } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { Decimal } from '@prisma/client/runtime/library';
import {
ISystemActivityRepository,
SYSTEM_ACTIVITY_REPOSITORY,
ActivityType,
IRealtimeStatsRepository,
REALTIME_STATS_REPOSITORY,
IGlobalStatsRepository,
GLOBAL_STATS_REPOSITORY,
} from '../../domain/repositories';
/**
* Outbox (B方案)
*/
interface OutboxMeta {
id: string;
aggregateId: string;
eventType: string;
}
/**
* (identity-service)
* Topic: identity.UserAccountCreated identity.UserAccountAutoCreated
*/
interface UserAccountCreatedEvent {
eventId: string;
eventType: string;
occurredAt: string;
aggregateId: string;
aggregateType: string;
payload: {
userId: string;
accountSequence: string;
phoneNumber?: string;
nickname?: string;
referralCode: string;
inviterSequence?: string;
registeredAt: string;
};
_outbox?: OutboxMeta;
}
/**
* / (authorization-service)
* Topic: authorization-events
*/
interface AuthorizationRoleEvent {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
authorizationId?: string;
userId: string;
accountSequence: string;
roleType: string;
regionCode: string;
regionName: string;
status?: string;
authorizedAt?: string;
};
_outbox?: OutboxMeta;
}
/**
* (planting-service)
* Topic: planting.order.paid
*/
interface PlantingOrderPaidEvent {
eventId: string;
eventType: string;
occurredAt: string;
aggregateId: string;
payload: {
orderId: string;
orderNumber: string;
userId: string;
treeCount: number;
provinceCode?: string;
cityCode?: string;
totalAmount: string;
paidAt: string;
};
_outbox?: OutboxMeta;
}
@Controller()
export class ActivityEventConsumerController {
private readonly logger = new Logger(ActivityEventConsumerController.name);
constructor(
@Inject(SYSTEM_ACTIVITY_REPOSITORY)
private readonly activityRepo: ISystemActivityRepository,
@Inject(REALTIME_STATS_REPOSITORY)
private readonly realtimeStatsRepo: IRealtimeStatsRepository,
@Inject(GLOBAL_STATS_REPOSITORY)
private readonly globalStatsRepo: IGlobalStatsRepository,
) {}
/**
* (identity-service)
* Topic: identity.UserAccountCreated
*/
@MessagePattern('identity.UserAccountCreated')
async handleUserAccountCreated(@Payload() message: UserAccountCreatedEvent) {
this.logger.log(`Received identity.UserAccountCreated event`);
try {
const { payload } = message;
// 记录活动日志
await this.activityRepo.create({
activityType: 'user_register' as ActivityType,
title: '新用户注册',
description: `用户 ${this.maskPhone(payload.phoneNumber)} 完成注册`,
icon: '👤',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'user',
relatedEntityId: payload.userId,
metadata: {
accountSequence: payload.accountSequence,
nickname: payload.nickname,
referralCode: payload.referralCode,
inviterSequence: payload.inviterSequence,
registeredAt: payload.registeredAt,
},
});
// 累加统计数据
const today = new Date();
await Promise.all([
this.realtimeStatsRepo.incrementNewUser(today),
this.globalStatsRepo.incrementUser(),
]);
this.logger.log(`Activity and stats recorded for user registration: ${payload.userId}`);
} catch (error) {
this.logger.error(`Error recording user registration activity:`, error);
}
}
/**
* (identity-service)
* Topic: identity.UserAccountAutoCreated
*/
@MessagePattern('identity.UserAccountAutoCreated')
async handleUserAccountAutoCreated(@Payload() message: UserAccountCreatedEvent) {
this.logger.log(`Received identity.UserAccountAutoCreated event`);
try {
const { payload } = message;
// 记录活动日志
await this.activityRepo.create({
activityType: 'user_register' as ActivityType,
title: '新用户自动注册',
description: `用户通过推荐链接完成注册`,
icon: '👤',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'user',
relatedEntityId: payload.userId,
metadata: {
accountSequence: payload.accountSequence,
nickname: payload.nickname,
referralCode: payload.referralCode,
inviterSequence: payload.inviterSequence,
registeredAt: payload.registeredAt,
autoCreated: true,
},
});
// 累加统计数据
const today = new Date();
await Promise.all([
this.realtimeStatsRepo.incrementNewUser(today),
this.globalStatsRepo.incrementUser(),
]);
this.logger.log(`Activity and stats recorded for auto user registration: ${payload.userId}`);
} catch (error) {
this.logger.error(`Error recording auto user registration activity:`, error);
}
}
/**
* (authorization-service)
* Topic: authorization-events
*/
@MessagePattern('authorization-events')
async handleAuthorizationEvent(@Payload() message: AuthorizationRoleEvent) {
this.logger.log(`Received authorization-events: ${message.eventType}`);
try {
const { payload, eventType } = message;
// 根据事件类型决定是否记录活动
if (eventType.includes('authorized') || eventType.includes('Authorized')) {
const roleTypeLabel = this.getRoleTypeLabel(payload.roleType);
// 记录活动日志
await this.activityRepo.create({
activityType: 'company_authorization' as ActivityType,
title: '授权成功',
description: `${payload.regionName} ${roleTypeLabel} 完成授权`,
icon: '🏢',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'authorization',
relatedEntityId: payload.authorizationId || message.aggregateId,
metadata: {
roleType: payload.roleType,
regionCode: payload.regionCode,
regionName: payload.regionName,
accountSequence: payload.accountSequence,
authorizedAt: payload.authorizedAt || message.occurredAt,
},
});
// 累加统计数据 - 区分省公司和市公司
const today = new Date();
const isProvinceCompany =
payload.roleType === 'PROVINCE_COMPANY' ||
payload.roleType === 'AUTH_PROVINCE_COMPANY';
const isCityCompany =
payload.roleType === 'CITY_COMPANY' ||
payload.roleType === 'AUTH_CITY_COMPANY';
if (isProvinceCompany) {
await Promise.all([
this.realtimeStatsRepo.incrementProvinceAuth(today),
this.globalStatsRepo.incrementProvinceCompany(),
]);
this.logger.log(`Province company stats incremented: ${payload.regionCode}`);
} else if (isCityCompany) {
await Promise.all([
this.realtimeStatsRepo.incrementCityAuth(today),
this.globalStatsRepo.incrementCityCompany(),
]);
this.logger.log(`City company stats incremented: ${payload.regionCode}`);
}
this.logger.log(`Activity and stats recorded for authorization: ${payload.accountSequence}`);
}
} catch (error) {
this.logger.error(`Error recording authorization activity:`, error);
}
}
/**
* (planting-service)
* Topic: planting.order.paid
*/
@MessagePattern('planting.order.paid')
async handlePlantingOrderPaid(@Payload() message: PlantingOrderPaidEvent) {
this.logger.log(`Received planting.order.paid event`);
try {
const { payload } = message;
// 记录活动日志
await this.activityRepo.create({
activityType: 'planting_order' as ActivityType,
title: '认种订单',
description: `用户认种了 ${payload.treeCount} 棵榴莲树`,
icon: '🌳',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'order',
relatedEntityId: payload.orderId,
metadata: {
orderNumber: payload.orderNumber,
treeCount: payload.treeCount,
totalAmount: payload.totalAmount,
provinceCode: payload.provinceCode,
cityCode: payload.cityCode,
paidAt: payload.paidAt,
},
});
// 累加统计数据
const today = new Date();
const amount = new Decimal(payload.totalAmount || '0');
await Promise.all([
this.realtimeStatsRepo.incrementPlanting(today, payload.treeCount, amount),
this.globalStatsRepo.incrementPlanting(payload.treeCount, amount),
]);
this.logger.log(`Activity and stats recorded for planting order: ${payload.orderId}`);
} catch (error) {
this.logger.error(`Error recording planting order activity:`, error);
}
}
/**
*
* Topic: reporting.report.generated
*/
@MessagePattern('reporting.report.generated')
async handleReportGenerated(
@Payload()
message: {
reportCode: string;
reportName: string;
reportType: string;
periodKey: string;
generatedAt: string;
},
) {
this.logger.log(`Received reporting.report.generated event`);
try {
await this.activityRepo.create({
activityType: 'report_generated' as ActivityType,
title: '报表生成',
description: `${message.reportName} 已生成`,
icon: '📊',
relatedEntityType: 'report',
relatedEntityId: message.reportCode,
metadata: {
reportType: message.reportType,
periodKey: message.periodKey,
generatedAt: message.generatedAt,
},
});
this.logger.log(`Activity recorded for report generation: ${message.reportCode}`);
} catch (error) {
this.logger.error(`Error recording report generation activity:`, error);
}
}
/**
*
*/
private getRoleTypeLabel(roleType: string): string {
const labels: Record<string, string> = {
COMMUNITY: '社区',
AUTH_PROVINCE_COMPANY: '授权省公司',
PROVINCE_COMPANY: '正式省公司',
AUTH_CITY_COMPANY: '授权市公司',
CITY_COMPANY: '正式市公司',
};
return labels[roleType] || roleType;
}
/**
*
*/
private maskPhone(phone?: string): string {
if (!phone || phone.length < 7) {
return '***用户';
}
return phone.slice(0, 3) + '****' + phone.slice(-4);
}
}

View File

@ -0,0 +1,2 @@
export * from './kafka.module';
export * from './activity-event-consumer.controller';

View File

@ -0,0 +1,63 @@
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ActivityEventConsumerController } from './activity-event-consumer.controller';
import { PrismaService } from '../persistence/prisma/prisma.service';
import { SystemActivityRepository } from '../persistence/repositories/system-activity.repository.impl';
import { RealtimeStatsRepository } from '../persistence/repositories/realtime-stats.repository.impl';
import { GlobalStatsRepository } from '../persistence/repositories/global-stats.repository.impl';
import {
SYSTEM_ACTIVITY_REPOSITORY,
REALTIME_STATS_REPOSITORY,
GLOBAL_STATS_REPOSITORY,
} from '../../domain/repositories';
@Module({
imports: [
ClientsModule.registerAsync([
{
name: 'KAFKA_SERVICE',
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
transport: Transport.KAFKA,
options: {
client: {
clientId: configService.get<string>(
'KAFKA_CLIENT_ID',
'reporting-service',
),
brokers: configService
.get<string>('KAFKA_BROKERS', 'localhost:9092')
.split(','),
},
consumer: {
groupId: configService.get<string>(
'KAFKA_GROUP_ID',
'reporting-service-group',
),
},
},
}),
inject: [ConfigService],
},
]),
],
controllers: [ActivityEventConsumerController],
providers: [
PrismaService,
{
provide: SYSTEM_ACTIVITY_REPOSITORY,
useClass: SystemActivityRepository,
},
{
provide: REALTIME_STATS_REPOSITORY,
useClass: RealtimeStatsRepository,
},
{
provide: GLOBAL_STATS_REPOSITORY,
useClass: GlobalStatsRepository,
},
],
exports: [ClientsModule],
})
export class KafkaModule {}

View File

@ -0,0 +1,160 @@
import { Injectable, Logger } from '@nestjs/common';
import { Decimal } from '@prisma/client/runtime/library';
import { PrismaService } from '../prisma/prisma.service';
import {
IGlobalStatsRepository,
GlobalStatsData,
} from '../../../domain/repositories';
@Injectable()
export class GlobalStatsRepository implements IGlobalStatsRepository {
private readonly logger = new Logger(GlobalStatsRepository.name);
private readonly GLOBAL_KEY = 'global';
constructor(private readonly prisma: PrismaService) {}
async getGlobalStats(): Promise<GlobalStatsData> {
const stats = await this.prisma.globalStats.findUnique({
where: { statsKey: this.GLOBAL_KEY },
});
if (stats) {
return this.toDomain(stats);
}
// 如果不存在,创建初始记录
const created = await this.prisma.globalStats.create({
data: { statsKey: this.GLOBAL_KEY },
});
return this.toDomain(created);
}
async incrementPlanting(
treeCount: number,
amount: Decimal,
): Promise<GlobalStatsData> {
this.logger.debug(
`Incrementing global planting: trees=${treeCount}, amount=${amount}`,
);
const result = await this.prisma.globalStats.upsert({
where: { statsKey: this.GLOBAL_KEY },
create: {
statsKey: this.GLOBAL_KEY,
totalPlantingCount: treeCount,
totalOrderCount: 1,
totalPlantingAmount: amount,
},
update: {
totalPlantingCount: { increment: treeCount },
totalOrderCount: { increment: 1 },
totalPlantingAmount: { increment: amount },
},
});
return this.toDomain(result);
}
async incrementOrder(): Promise<GlobalStatsData> {
const result = await this.prisma.globalStats.upsert({
where: { statsKey: this.GLOBAL_KEY },
create: {
statsKey: this.GLOBAL_KEY,
totalOrderCount: 1,
},
update: {
totalOrderCount: { increment: 1 },
},
});
return this.toDomain(result);
}
async incrementUser(): Promise<GlobalStatsData> {
this.logger.debug('Incrementing global user count');
const result = await this.prisma.globalStats.upsert({
where: { statsKey: this.GLOBAL_KEY },
create: {
statsKey: this.GLOBAL_KEY,
totalUserCount: 1,
},
update: {
totalUserCount: { increment: 1 },
},
});
return this.toDomain(result);
}
async incrementProvinceCompany(): Promise<GlobalStatsData> {
this.logger.debug('Incrementing global province company count');
const result = await this.prisma.globalStats.upsert({
where: { statsKey: this.GLOBAL_KEY },
create: {
statsKey: this.GLOBAL_KEY,
totalProvinceCompanyCount: 1,
},
update: {
totalProvinceCompanyCount: { increment: 1 },
},
});
return this.toDomain(result);
}
async incrementCityCompany(): Promise<GlobalStatsData> {
this.logger.debug('Incrementing global city company count');
const result = await this.prisma.globalStats.upsert({
where: { statsKey: this.GLOBAL_KEY },
create: {
statsKey: this.GLOBAL_KEY,
totalCityCompanyCount: 1,
},
update: {
totalCityCompanyCount: { increment: 1 },
},
});
return this.toDomain(result);
}
async updateStats(updates: Partial<GlobalStatsData>): Promise<GlobalStatsData> {
const result = await this.prisma.globalStats.update({
where: { statsKey: this.GLOBAL_KEY },
data: {
totalPlantingCount: updates.totalPlantingCount,
totalOrderCount: updates.totalOrderCount,
totalPlantingAmount: updates.totalPlantingAmount,
totalUserCount: updates.totalUserCount,
totalProvinceCompanyCount: updates.totalProvinceCompanyCount,
totalCityCompanyCount: updates.totalCityCompanyCount,
},
});
return this.toDomain(result);
}
private toDomain(
record: Awaited<ReturnType<typeof this.prisma.globalStats.findFirst>>,
): GlobalStatsData {
if (!record) {
throw new Error('Record is null');
}
return {
id: record.id,
statsKey: record.statsKey,
totalPlantingCount: record.totalPlantingCount,
totalOrderCount: record.totalOrderCount,
totalPlantingAmount: record.totalPlantingAmount,
totalUserCount: record.totalUserCount,
totalProvinceCompanyCount: record.totalProvinceCompanyCount,
totalCityCompanyCount: record.totalCityCompanyCount,
createdAt: record.createdAt,
updatedAt: record.updatedAt,
};
}
}

View File

@ -0,0 +1,204 @@
import { Injectable, Logger } from '@nestjs/common';
import { Decimal } from '@prisma/client/runtime/library';
import { PrismaService } from '../prisma/prisma.service';
import {
IRealtimeStatsRepository,
RealtimeStatsData,
} from '../../../domain/repositories';
@Injectable()
export class RealtimeStatsRepository implements IRealtimeStatsRepository {
private readonly logger = new Logger(RealtimeStatsRepository.name);
constructor(private readonly prisma: PrismaService) {}
async getOrCreateByDate(date: Date): Promise<RealtimeStatsData> {
const statsDate = this.normalizeDate(date);
const existing = await this.prisma.realtimeStats.findUnique({
where: { statsDate },
});
if (existing) {
return this.toDomain(existing);
}
const created = await this.prisma.realtimeStats.create({
data: { statsDate },
});
return this.toDomain(created);
}
async findByDate(date: Date): Promise<RealtimeStatsData | null> {
const statsDate = this.normalizeDate(date);
const found = await this.prisma.realtimeStats.findUnique({
where: { statsDate },
});
return found ? this.toDomain(found) : null;
}
async incrementPlanting(
date: Date,
treeCount: number,
amount: Decimal,
): Promise<RealtimeStatsData> {
const statsDate = this.normalizeDate(date);
this.logger.debug(
`Incrementing planting: date=${statsDate.toISOString()}, trees=${treeCount}, amount=${amount}`,
);
const result = await this.prisma.realtimeStats.upsert({
where: { statsDate },
create: {
statsDate,
dailyPlantingCount: treeCount,
dailyOrderCount: 1,
dailyPlantingAmount: amount,
},
update: {
dailyPlantingCount: { increment: treeCount },
dailyOrderCount: { increment: 1 },
dailyPlantingAmount: { increment: amount },
},
});
return this.toDomain(result);
}
async incrementOrder(date: Date): Promise<RealtimeStatsData> {
const statsDate = this.normalizeDate(date);
const result = await this.prisma.realtimeStats.upsert({
where: { statsDate },
create: {
statsDate,
dailyOrderCount: 1,
},
update: {
dailyOrderCount: { increment: 1 },
},
});
return this.toDomain(result);
}
async incrementNewUser(date: Date): Promise<RealtimeStatsData> {
const statsDate = this.normalizeDate(date);
this.logger.debug(`Incrementing new user: date=${statsDate.toISOString()}`);
const result = await this.prisma.realtimeStats.upsert({
where: { statsDate },
create: {
statsDate,
dailyNewUserCount: 1,
},
update: {
dailyNewUserCount: { increment: 1 },
},
});
return this.toDomain(result);
}
async incrementProvinceAuth(date: Date): Promise<RealtimeStatsData> {
const statsDate = this.normalizeDate(date);
this.logger.debug(
`Incrementing province auth: date=${statsDate.toISOString()}`,
);
const result = await this.prisma.realtimeStats.upsert({
where: { statsDate },
create: {
statsDate,
dailyProvinceAuthCount: 1,
},
update: {
dailyProvinceAuthCount: { increment: 1 },
},
});
return this.toDomain(result);
}
async incrementCityAuth(date: Date): Promise<RealtimeStatsData> {
const statsDate = this.normalizeDate(date);
this.logger.debug(`Incrementing city auth: date=${statsDate.toISOString()}`);
const result = await this.prisma.realtimeStats.upsert({
where: { statsDate },
create: {
statsDate,
dailyCityAuthCount: 1,
},
update: {
dailyCityAuthCount: { increment: 1 },
},
});
return this.toDomain(result);
}
async findRecentDays(days: number): Promise<RealtimeStatsData[]> {
const startDate = new Date();
startDate.setDate(startDate.getDate() - days + 1);
startDate.setHours(0, 0, 0, 0);
const results = await this.prisma.realtimeStats.findMany({
where: {
statsDate: { gte: startDate },
},
orderBy: { statsDate: 'asc' },
});
return results.map((r) => this.toDomain(r));
}
async findByDateRange(
startDate: Date,
endDate: Date,
): Promise<RealtimeStatsData[]> {
const results = await this.prisma.realtimeStats.findMany({
where: {
statsDate: {
gte: this.normalizeDate(startDate),
lte: this.normalizeDate(endDate),
},
},
orderBy: { statsDate: 'asc' },
});
return results.map((r) => this.toDomain(r));
}
/**
* 0
*/
private normalizeDate(date: Date): Date {
const normalized = new Date(date);
normalized.setHours(0, 0, 0, 0);
return normalized;
}
private toDomain(
record: Awaited<ReturnType<typeof this.prisma.realtimeStats.findFirst>>,
): RealtimeStatsData {
if (!record) {
throw new Error('Record is null');
}
return {
id: record.id,
statsDate: record.statsDate,
dailyPlantingCount: record.dailyPlantingCount,
dailyOrderCount: record.dailyOrderCount,
dailyPlantingAmount: record.dailyPlantingAmount,
dailyNewUserCount: record.dailyNewUserCount,
dailyProvinceAuthCount: record.dailyProvinceAuthCount,
dailyCityAuthCount: record.dailyCityAuthCount,
createdAt: record.createdAt,
updatedAt: record.updatedAt,
};
}
}