refactor(evolution): use API instead of shared database tables
Breaking change: evolution-service no longer directly accesses conversations and messages tables. Changes: - Add internal API endpoints to conversation-service for service-to-service calls - Create ConversationClient in evolution-service to call conversation-service API - Remove ConversationORM and MessageORM from evolution-service - Update evolution.service to use ConversationClient This follows microservices best practices: - Each service owns its data - Services communicate via API, not shared tables TODO: Apply same pattern to system_experiences (knowledge-service) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
0992523876
commit
e1bcd0145e
|
|
@ -4,11 +4,12 @@ import { ConversationEntity } from '../domain/entities/conversation.entity';
|
||||||
import { MessageEntity } from '../domain/entities/message.entity';
|
import { MessageEntity } from '../domain/entities/message.entity';
|
||||||
import { ConversationService } from './conversation.service';
|
import { ConversationService } from './conversation.service';
|
||||||
import { ConversationController } from './conversation.controller';
|
import { ConversationController } from './conversation.controller';
|
||||||
|
import { InternalConversationController } from './internal.controller';
|
||||||
import { ConversationGateway } from './conversation.gateway';
|
import { ConversationGateway } from './conversation.gateway';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [TypeOrmModule.forFeature([ConversationEntity, MessageEntity])],
|
imports: [TypeOrmModule.forFeature([ConversationEntity, MessageEntity])],
|
||||||
controllers: [ConversationController],
|
controllers: [ConversationController, InternalConversationController],
|
||||||
providers: [ConversationService, ConversationGateway],
|
providers: [ConversationService, ConversationGateway],
|
||||||
exports: [ConversationService],
|
exports: [ConversationService],
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,105 @@
|
||||||
|
import { Controller, Get, Query, Param } from '@nestjs/common';
|
||||||
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
|
import { Repository, MoreThan, LessThan } from 'typeorm';
|
||||||
|
import { ConversationEntity } from '../domain/entities/conversation.entity';
|
||||||
|
import { MessageEntity } from '../domain/entities/message.entity';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 内部 API - 供其他微服务调用
|
||||||
|
* 不暴露给外部用户
|
||||||
|
*/
|
||||||
|
@Controller('internal/conversations')
|
||||||
|
export class InternalConversationController {
|
||||||
|
constructor(
|
||||||
|
@InjectRepository(ConversationEntity)
|
||||||
|
private conversationRepo: Repository<ConversationEntity>,
|
||||||
|
@InjectRepository(MessageEntity)
|
||||||
|
private messageRepo: Repository<MessageEntity>,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询对话列表(供 evolution-service 使用)
|
||||||
|
*/
|
||||||
|
@Get()
|
||||||
|
async findConversations(
|
||||||
|
@Query('status') status?: string,
|
||||||
|
@Query('hoursBack') hoursBack?: string,
|
||||||
|
@Query('minMessageCount') minMessageCount?: string,
|
||||||
|
@Query('limit') limit?: string,
|
||||||
|
) {
|
||||||
|
const queryBuilder = this.conversationRepo.createQueryBuilder('c');
|
||||||
|
|
||||||
|
if (status) {
|
||||||
|
queryBuilder.andWhere('c.status = :status', { status });
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hoursBack) {
|
||||||
|
const cutoffTime = new Date();
|
||||||
|
cutoffTime.setHours(cutoffTime.getHours() - parseInt(hoursBack));
|
||||||
|
queryBuilder.andWhere('c.created_at > :cutoffTime', { cutoffTime });
|
||||||
|
}
|
||||||
|
|
||||||
|
if (minMessageCount) {
|
||||||
|
queryBuilder.andWhere('c.message_count > :minCount', {
|
||||||
|
minCount: parseInt(minMessageCount),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
queryBuilder.orderBy('c.created_at', 'DESC');
|
||||||
|
|
||||||
|
if (limit) {
|
||||||
|
queryBuilder.take(parseInt(limit));
|
||||||
|
}
|
||||||
|
|
||||||
|
const conversations = await queryBuilder.getMany();
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
data: conversations,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取对话的消息(供 evolution-service 使用)
|
||||||
|
*/
|
||||||
|
@Get(':id/messages')
|
||||||
|
async getMessages(@Param('id') conversationId: string) {
|
||||||
|
const messages = await this.messageRepo.find({
|
||||||
|
where: { conversationId },
|
||||||
|
order: { createdAt: 'ASC' },
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
data: messages,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 统计对话数量(供 evolution-service 使用)
|
||||||
|
*/
|
||||||
|
@Get('count')
|
||||||
|
async countConversations(
|
||||||
|
@Query('status') status?: string,
|
||||||
|
@Query('daysBack') daysBack?: string,
|
||||||
|
) {
|
||||||
|
const queryBuilder = this.conversationRepo.createQueryBuilder('c');
|
||||||
|
|
||||||
|
if (status) {
|
||||||
|
queryBuilder.andWhere('c.status = :status', { status });
|
||||||
|
}
|
||||||
|
|
||||||
|
if (daysBack) {
|
||||||
|
const cutoffTime = new Date();
|
||||||
|
cutoffTime.setDate(cutoffTime.getDate() - parseInt(daysBack));
|
||||||
|
queryBuilder.andWhere('c.updated_at > :cutoffTime', { cutoffTime });
|
||||||
|
}
|
||||||
|
|
||||||
|
const count = await queryBuilder.getCount();
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
data: { count },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,22 +3,20 @@ import { TypeOrmModule } from '@nestjs/typeorm';
|
||||||
import { EvolutionController } from './evolution.controller';
|
import { EvolutionController } from './evolution.controller';
|
||||||
import { EvolutionService } from './evolution.service';
|
import { EvolutionService } from './evolution.service';
|
||||||
import { ExperienceExtractorService } from '../infrastructure/claude/experience-extractor.service';
|
import { ExperienceExtractorService } from '../infrastructure/claude/experience-extractor.service';
|
||||||
import { ConversationORM } from '../infrastructure/database/entities/conversation.orm';
|
import { ConversationClient } from '../infrastructure/clients/conversation.client';
|
||||||
import { MessageORM } from '../infrastructure/database/entities/message.orm';
|
|
||||||
import { SystemExperienceORM } from '../infrastructure/database/entities/system-experience.orm';
|
import { SystemExperienceORM } from '../infrastructure/database/entities/system-experience.orm';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
TypeOrmModule.forFeature([
|
// 只保留自己 Bounded Context 的实体
|
||||||
ConversationORM,
|
TypeOrmModule.forFeature([SystemExperienceORM]),
|
||||||
MessageORM,
|
|
||||||
SystemExperienceORM,
|
|
||||||
]),
|
|
||||||
],
|
],
|
||||||
controllers: [EvolutionController],
|
controllers: [EvolutionController],
|
||||||
providers: [
|
providers: [
|
||||||
EvolutionService,
|
EvolutionService,
|
||||||
ExperienceExtractorService,
|
ExperienceExtractorService,
|
||||||
|
// 使用 API 客户端访问其他服务的数据
|
||||||
|
ConversationClient,
|
||||||
],
|
],
|
||||||
exports: [EvolutionService],
|
exports: [EvolutionService],
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,8 @@
|
||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
import { InjectRepository } from '@nestjs/typeorm';
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
import { Repository, MoreThan, LessThan } from 'typeorm';
|
import { Repository } from 'typeorm';
|
||||||
import { ExperienceExtractorService } from '../infrastructure/claude/experience-extractor.service';
|
import { ExperienceExtractorService } from '../infrastructure/claude/experience-extractor.service';
|
||||||
import { ConversationORM } from '../infrastructure/database/entities/conversation.orm';
|
import { ConversationClient, ConversationDto, MessageDto } from '../infrastructure/clients/conversation.client';
|
||||||
import { MessageORM } from '../infrastructure/database/entities/message.orm';
|
|
||||||
import { SystemExperienceORM } from '../infrastructure/database/entities/system-experience.orm';
|
import { SystemExperienceORM } from '../infrastructure/database/entities/system-experience.orm';
|
||||||
import { v4 as uuidv4 } from 'uuid';
|
import { v4 as uuidv4 } from 'uuid';
|
||||||
|
|
||||||
|
|
@ -26,10 +25,7 @@ export interface EvolutionTaskResult {
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class EvolutionService {
|
export class EvolutionService {
|
||||||
constructor(
|
constructor(
|
||||||
@InjectRepository(ConversationORM)
|
private conversationClient: ConversationClient,
|
||||||
private conversationRepo: Repository<ConversationORM>,
|
|
||||||
@InjectRepository(MessageORM)
|
|
||||||
private messageRepo: Repository<MessageORM>,
|
|
||||||
@InjectRepository(SystemExperienceORM)
|
@InjectRepository(SystemExperienceORM)
|
||||||
private experienceRepo: Repository<SystemExperienceORM>,
|
private experienceRepo: Repository<SystemExperienceORM>,
|
||||||
private experienceExtractor: ExperienceExtractorService,
|
private experienceExtractor: ExperienceExtractorService,
|
||||||
|
|
@ -60,18 +56,12 @@ export class EvolutionService {
|
||||||
console.log(`[Evolution] Starting task ${taskId}`);
|
console.log(`[Evolution] Starting task ${taskId}`);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 1. 获取待分析的对话
|
// 1. 通过 API 获取待分析的对话
|
||||||
const cutoffTime = new Date();
|
const conversations = await this.conversationClient.findConversations({
|
||||||
cutoffTime.setHours(cutoffTime.getHours() - hoursBack);
|
status: 'ENDED',
|
||||||
|
hoursBack,
|
||||||
const conversations = await this.conversationRepo.find({
|
minMessageCount,
|
||||||
where: {
|
limit,
|
||||||
status: 'ENDED',
|
|
||||||
createdAt: MoreThan(cutoffTime),
|
|
||||||
messageCount: MoreThan(minMessageCount),
|
|
||||||
},
|
|
||||||
order: { createdAt: 'DESC' },
|
|
||||||
take: limit,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
console.log(`[Evolution] Found ${conversations.length} conversations to analyze`);
|
console.log(`[Evolution] Found ${conversations.length} conversations to analyze`);
|
||||||
|
|
@ -81,11 +71,8 @@ export class EvolutionService {
|
||||||
|
|
||||||
for (const conversation of conversations) {
|
for (const conversation of conversations) {
|
||||||
try {
|
try {
|
||||||
// 获取对话消息
|
// 通过 API 获取对话消息
|
||||||
const messages = await this.messageRepo.find({
|
const messages = await this.conversationClient.getMessages(conversation.id);
|
||||||
where: { conversationId: conversation.id },
|
|
||||||
order: { createdAt: 'ASC' },
|
|
||||||
});
|
|
||||||
|
|
||||||
// 分析对话
|
// 分析对话
|
||||||
const analysis = await this.experienceExtractor.analyzeConversation({
|
const analysis = await this.experienceExtractor.analyzeConversation({
|
||||||
|
|
@ -125,8 +112,6 @@ export class EvolutionService {
|
||||||
const uniqueGaps = [...new Set(allKnowledgeGaps)];
|
const uniqueGaps = [...new Set(allKnowledgeGaps)];
|
||||||
result.knowledgeGapsFound = uniqueGaps.length;
|
result.knowledgeGapsFound = uniqueGaps.length;
|
||||||
|
|
||||||
// 可以在这里调用知识服务创建待处理的知识缺口任务
|
|
||||||
|
|
||||||
console.log(`[Evolution] Task ${taskId} completed:`, result);
|
console.log(`[Evolution] Task ${taskId} completed:`, result);
|
||||||
|
|
||||||
if (result.errors.length > 0) {
|
if (result.errors.length > 0) {
|
||||||
|
|
@ -219,14 +204,10 @@ export class EvolutionService {
|
||||||
this.experienceRepo.count({ where: { isActive: true } }),
|
this.experienceRepo.count({ where: { isActive: true } }),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
// 获取最近分析的对话数(过去7天)
|
// 通过 API 获取最近分析的对话数(过去7天)
|
||||||
const weekAgo = new Date();
|
const recentConversations = await this.conversationClient.countConversations({
|
||||||
weekAgo.setDate(weekAgo.getDate() - 7);
|
status: 'ENDED',
|
||||||
const recentConversations = await this.conversationRepo.count({
|
daysBack: 7,
|
||||||
where: {
|
|
||||||
status: 'ENDED',
|
|
||||||
updatedAt: MoreThan(weekAgo),
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// 获取经验类型分布
|
// 获取经验类型分布
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,129 @@
|
||||||
|
import { Injectable } from '@nestjs/common';
|
||||||
|
import { ConfigService } from '@nestjs/config';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 对话数据传输对象
|
||||||
|
*/
|
||||||
|
export interface ConversationDto {
|
||||||
|
id: string;
|
||||||
|
userId: string;
|
||||||
|
status: string;
|
||||||
|
title: string;
|
||||||
|
summary: string;
|
||||||
|
category: string;
|
||||||
|
messageCount: number;
|
||||||
|
rating: number;
|
||||||
|
hasConverted: boolean;
|
||||||
|
createdAt: Date;
|
||||||
|
updatedAt: Date;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息数据传输对象
|
||||||
|
*/
|
||||||
|
export interface MessageDto {
|
||||||
|
id: string;
|
||||||
|
conversationId: string;
|
||||||
|
role: 'user' | 'assistant' | 'system';
|
||||||
|
content: string;
|
||||||
|
createdAt: Date;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Conversation Service 客户端
|
||||||
|
* 用于调用 conversation-service 的内部 API
|
||||||
|
*/
|
||||||
|
@Injectable()
|
||||||
|
export class ConversationClient {
|
||||||
|
private readonly baseUrl: string;
|
||||||
|
|
||||||
|
constructor(private configService: ConfigService) {
|
||||||
|
this.baseUrl = this.configService.get<string>(
|
||||||
|
'CONVERSATION_SERVICE_URL',
|
||||||
|
'http://conversation-service:3004',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询对话列表
|
||||||
|
*/
|
||||||
|
async findConversations(options: {
|
||||||
|
status?: string;
|
||||||
|
hoursBack?: number;
|
||||||
|
minMessageCount?: number;
|
||||||
|
limit?: number;
|
||||||
|
}): Promise<ConversationDto[]> {
|
||||||
|
const params = new URLSearchParams();
|
||||||
|
|
||||||
|
if (options.status) params.append('status', options.status);
|
||||||
|
if (options.hoursBack) params.append('hoursBack', options.hoursBack.toString());
|
||||||
|
if (options.minMessageCount) params.append('minMessageCount', options.minMessageCount.toString());
|
||||||
|
if (options.limit) params.append('limit', options.limit.toString());
|
||||||
|
|
||||||
|
const url = `${this.baseUrl}/api/internal/conversations?${params.toString()}`;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await fetch(url);
|
||||||
|
const data = await response.json();
|
||||||
|
|
||||||
|
if (!data.success) {
|
||||||
|
throw new Error('Failed to fetch conversations');
|
||||||
|
}
|
||||||
|
|
||||||
|
return data.data;
|
||||||
|
} catch (error) {
|
||||||
|
console.error('[ConversationClient] Error fetching conversations:', error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取对话的消息
|
||||||
|
*/
|
||||||
|
async getMessages(conversationId: string): Promise<MessageDto[]> {
|
||||||
|
const url = `${this.baseUrl}/api/internal/conversations/${conversationId}/messages`;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await fetch(url);
|
||||||
|
const data = await response.json();
|
||||||
|
|
||||||
|
if (!data.success) {
|
||||||
|
throw new Error('Failed to fetch messages');
|
||||||
|
}
|
||||||
|
|
||||||
|
return data.data;
|
||||||
|
} catch (error) {
|
||||||
|
console.error('[ConversationClient] Error fetching messages:', error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 统计对话数量
|
||||||
|
*/
|
||||||
|
async countConversations(options: {
|
||||||
|
status?: string;
|
||||||
|
daysBack?: number;
|
||||||
|
}): Promise<number> {
|
||||||
|
const params = new URLSearchParams();
|
||||||
|
|
||||||
|
if (options.status) params.append('status', options.status);
|
||||||
|
if (options.daysBack) params.append('daysBack', options.daysBack.toString());
|
||||||
|
|
||||||
|
const url = `${this.baseUrl}/api/internal/conversations/count?${params.toString()}`;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await fetch(url);
|
||||||
|
const data = await response.json();
|
||||||
|
|
||||||
|
if (!data.success) {
|
||||||
|
throw new Error('Failed to count conversations');
|
||||||
|
}
|
||||||
|
|
||||||
|
return data.data.count;
|
||||||
|
} catch (error) {
|
||||||
|
console.error('[ConversationClient] Error counting conversations:', error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,61 +0,0 @@
|
||||||
import {
|
|
||||||
Entity,
|
|
||||||
Column,
|
|
||||||
PrimaryColumn,
|
|
||||||
CreateDateColumn,
|
|
||||||
UpdateDateColumn,
|
|
||||||
} from 'typeorm';
|
|
||||||
|
|
||||||
@Entity('conversations')
|
|
||||||
export class ConversationORM {
|
|
||||||
@PrimaryColumn('uuid')
|
|
||||||
id: string;
|
|
||||||
|
|
||||||
@Column({ name: 'user_id', nullable: true })
|
|
||||||
userId: string;
|
|
||||||
|
|
||||||
@Column({ length: 20, default: 'ACTIVE' })
|
|
||||||
status: string;
|
|
||||||
|
|
||||||
@Column({ length: 255, nullable: true })
|
|
||||||
title: string;
|
|
||||||
|
|
||||||
@Column('text', { nullable: true })
|
|
||||||
summary: string;
|
|
||||||
|
|
||||||
@Column({ length: 50, nullable: true })
|
|
||||||
category: string;
|
|
||||||
|
|
||||||
@Column({ name: 'message_count', default: 0 })
|
|
||||||
messageCount: number;
|
|
||||||
|
|
||||||
@Column({ name: 'user_message_count', default: 0 })
|
|
||||||
userMessageCount: number;
|
|
||||||
|
|
||||||
@Column({ name: 'assistant_message_count', default: 0 })
|
|
||||||
assistantMessageCount: number;
|
|
||||||
|
|
||||||
@Column({ name: 'total_input_tokens', default: 0 })
|
|
||||||
totalInputTokens: number;
|
|
||||||
|
|
||||||
@Column({ name: 'total_output_tokens', default: 0 })
|
|
||||||
totalOutputTokens: number;
|
|
||||||
|
|
||||||
@Column({ type: 'smallint', nullable: true })
|
|
||||||
rating: number;
|
|
||||||
|
|
||||||
@Column('text', { nullable: true })
|
|
||||||
feedback: string;
|
|
||||||
|
|
||||||
@Column({ name: 'has_converted', default: false })
|
|
||||||
hasConverted: boolean;
|
|
||||||
|
|
||||||
@CreateDateColumn({ name: 'created_at' })
|
|
||||||
createdAt: Date;
|
|
||||||
|
|
||||||
@UpdateDateColumn({ name: 'updated_at' })
|
|
||||||
updatedAt: Date;
|
|
||||||
|
|
||||||
@Column({ name: 'ended_at', nullable: true })
|
|
||||||
endedAt: Date;
|
|
||||||
}
|
|
||||||
|
|
@ -1,36 +0,0 @@
|
||||||
import {
|
|
||||||
Entity,
|
|
||||||
Column,
|
|
||||||
PrimaryColumn,
|
|
||||||
CreateDateColumn,
|
|
||||||
} from 'typeorm';
|
|
||||||
|
|
||||||
@Entity('messages')
|
|
||||||
export class MessageORM {
|
|
||||||
@PrimaryColumn('uuid')
|
|
||||||
id: string;
|
|
||||||
|
|
||||||
@Column({ name: 'conversation_id' })
|
|
||||||
conversationId: string;
|
|
||||||
|
|
||||||
@Column({ length: 20 })
|
|
||||||
role: string;
|
|
||||||
|
|
||||||
@Column({ length: 30, default: 'TEXT' })
|
|
||||||
type: string;
|
|
||||||
|
|
||||||
@Column('text')
|
|
||||||
content: string;
|
|
||||||
|
|
||||||
@Column({ name: 'input_tokens', default: 0 })
|
|
||||||
inputTokens: number;
|
|
||||||
|
|
||||||
@Column({ name: 'output_tokens', default: 0 })
|
|
||||||
outputTokens: number;
|
|
||||||
|
|
||||||
@Column('jsonb', { nullable: true })
|
|
||||||
metadata: Record<string, unknown>;
|
|
||||||
|
|
||||||
@CreateDateColumn({ name: 'created_at' })
|
|
||||||
createdAt: Date;
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue