From 2403ce56364e355b8242155c5328565e2443e178 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 24 Feb 2026 19:04:35 -0800 Subject: [PATCH] feat: multi-turn conversation context management with session history UI Implement DB-based conversation message storage (engine-agnostic) that works across both Claude API and Agent SDK engines. Add ChatGPT/Claude-style conversation history drawer in Flutter with date-grouped session list, session switching, and new chat functionality. Backend: entity, repository, context service, migration 004, session/message API endpoints. Flutter: ConversationDrawer, sessionId flow from backend response via SessionInfoEvent, session list/switch/delete support. Co-Authored-By: Claude Opus 4.6 --- .../datasources/chat_remote_datasource.dart | 54 +++- .../repositories/chat_repository_impl.dart | 6 + .../chat/domain/entities/stream_event.dart | 6 + .../chat/presentation/pages/chat_page.dart | 14 +- .../providers/chat_providers.dart | 69 +++- .../widgets/conversation_drawer.dart | 302 ++++++++++++++++++ .../agent-service/src/agent.module.ts | 6 + .../entities/conversation-message.entity.ts | 34 ++ .../ports/outbound/agent-engine.port.ts | 11 +- .../services/conversation-context.service.ts | 170 ++++++++++ .../claude-agent-sdk-engine.ts | 41 ++- .../engines/claude-api/claude-api-engine.ts | 15 +- .../claude-code-cli/claude-code-engine.ts | 6 +- .../engines/custom/custom-engine.ts | 1 + .../repositories/message.repository.ts | 68 ++++ .../rest/controllers/agent.controller.ts | 81 ++++- .../rest/controllers/session.controller.ts | 64 +++- .../002-create-tenant-schema-template.sql | 19 ++ .../004-add-conversation-messages.sql | 24 ++ .../shared/database/src/run-migrations.ts | 7 + 20 files changed, 947 insertions(+), 51 deletions(-) create mode 100644 it0_app/lib/features/chat/presentation/widgets/conversation_drawer.dart create mode 100644 packages/services/agent-service/src/domain/entities/conversation-message.entity.ts create mode 100644 packages/services/agent-service/src/domain/services/conversation-context.service.ts create mode 100644 packages/services/agent-service/src/infrastructure/repositories/message.repository.ts create mode 100644 packages/shared/database/src/migrations/004-add-conversation-messages.sql diff --git a/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart b/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart index bc5c295..9fa198c 100644 --- a/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart +++ b/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart @@ -1,6 +1,7 @@ import 'package:dio/dio.dart'; import '../../../../core/config/api_endpoints.dart'; import '../models/chat_message_model.dart'; +import '../../presentation/widgets/conversation_drawer.dart'; /// Remote datasource for chat operations against the IT0 backend API. class ChatRemoteDatasource { @@ -19,7 +20,7 @@ class ChatRemoteDatasource { ApiEndpoints.tasks, data: { 'prompt': message, - 'sessionId': sessionId, + if (sessionId != 'new') 'sessionId': sessionId, if (attachments != null && attachments.isNotEmpty) 'attachments': attachments, }, @@ -51,6 +52,57 @@ class ChatRemoteDatasource { .toList(); } + /// Lists all sessions for the current tenant. + Future> listSessions() async { + final response = await _dio.get(ApiEndpoints.sessions); + final data = response.data; + + List sessions; + if (data is List) { + sessions = data; + } else if (data is Map && data.containsKey('sessions')) { + sessions = data['sessions'] as List; + } else { + sessions = []; + } + + final list = sessions + .map((s) => SessionSummary.fromJson(s as Map)) + .toList(); + + // Sort by most recent first + list.sort((a, b) => b.updatedAt.compareTo(a.updatedAt)); + return list; + } + + /// Gets conversation messages for a session (from conversation_messages table). + Future> getSessionMessages(String sessionId) async { + final response = await _dio.get( + '${ApiEndpoints.sessions}/$sessionId/messages', + queryParameters: {'limit': '50'}, + ); + + final data = response.data; + List messages; + + if (data is Map && data.containsKey('messages')) { + messages = data['messages'] as List; + } else if (data is List) { + messages = data; + } else { + messages = []; + } + + return messages + .map((m) => ChatMessageModel.fromJson(m as Map)) + .toList(); + } + + /// Deletes a session and all its messages. + Future deleteSession(String sessionId) async { + await _dio.delete('${ApiEndpoints.sessions}/$sessionId'); + } + /// Approves a pending command for a given task. Future approveCommand(String taskId) async { await _dio.post('${ApiEndpoints.approvals}/$taskId/approve'); diff --git a/it0_app/lib/features/chat/data/repositories/chat_repository_impl.dart b/it0_app/lib/features/chat/data/repositories/chat_repository_impl.dart index 347aca1..aecdf0a 100644 --- a/it0_app/lib/features/chat/data/repositories/chat_repository_impl.dart +++ b/it0_app/lib/features/chat/data/repositories/chat_repository_impl.dart @@ -41,6 +41,9 @@ class ChatRepositoryImpl implements ChatRepository { sessionId; final taskId = response['taskId'] as String? ?? response['task_id'] as String?; + // Emit the real sessionId so the notifier can capture it + yield SessionInfoEvent(returnedSessionId); + // Connect to the agent WebSocket and subscribe to the session final token = await _getAccessToken(); await _webSocketClient.connect('/ws/agent', token: token); @@ -91,6 +94,9 @@ class ChatRepositoryImpl implements ChatRepository { sessionId; final taskId = response['taskId'] as String? ?? response['task_id'] as String?; + // Emit the real sessionId so the notifier can capture it + yield SessionInfoEvent(returnedSessionId); + final voiceToken = await _getAccessToken(); await _webSocketClient.connect('/ws/agent', token: voiceToken); _webSocketClient.send({ diff --git a/it0_app/lib/features/chat/domain/entities/stream_event.dart b/it0_app/lib/features/chat/domain/entities/stream_event.dart index d1c5fe3..8d837a7 100644 --- a/it0_app/lib/features/chat/domain/entities/stream_event.dart +++ b/it0_app/lib/features/chat/domain/entities/stream_event.dart @@ -50,3 +50,9 @@ class StandingOrderConfirmedEvent extends StreamEvent { final String orderName; StandingOrderConfirmedEvent(this.orderId, this.orderName); } + +/// Carries the real sessionId assigned by the backend. +class SessionInfoEvent extends StreamEvent { + final String sessionId; + SessionInfoEvent(this.sessionId); +} diff --git a/it0_app/lib/features/chat/presentation/pages/chat_page.dart b/it0_app/lib/features/chat/presentation/pages/chat_page.dart index 85921ff..6728808 100644 --- a/it0_app/lib/features/chat/presentation/pages/chat_page.dart +++ b/it0_app/lib/features/chat/presentation/pages/chat_page.dart @@ -6,6 +6,7 @@ import '../providers/chat_providers.dart'; import '../widgets/timeline_event_node.dart'; import '../widgets/stream_text_widget.dart'; import '../widgets/approval_action_card.dart'; +import '../widgets/conversation_drawer.dart'; import '../../../agent_call/presentation/pages/agent_call_page.dart'; // --------------------------------------------------------------------------- @@ -189,9 +190,16 @@ class _ChatPageState extends ConsumerState { ref.listen(chatProvider, (_, __) => _scrollToBottom()); return Scaffold( + drawer: const ConversationDrawer(), appBar: AppBar( title: const Text('iAgent'), actions: [ + // New chat button (always visible) + IconButton( + icon: const Icon(Icons.edit_outlined), + tooltip: '新对话', + onPressed: () => ref.read(chatProvider.notifier).startNewChat(), + ), // Stop button during streaming if (chatState.isStreaming) IconButton( @@ -199,12 +207,6 @@ class _ChatPageState extends ConsumerState { tooltip: '停止', onPressed: () => ref.read(chatProvider.notifier).cancelCurrentTask(), ), - if (chatState.messages.isNotEmpty) - IconButton( - icon: const Icon(Icons.delete_outline), - tooltip: '清空对话', - onPressed: () => ref.read(chatProvider.notifier).clearChat(), - ), // Voice call button IconButton( icon: const Icon(Icons.call), diff --git a/it0_app/lib/features/chat/presentation/providers/chat_providers.dart b/it0_app/lib/features/chat/presentation/providers/chat_providers.dart index 7664eae..efa357c 100644 --- a/it0_app/lib/features/chat/presentation/providers/chat_providers.dart +++ b/it0_app/lib/features/chat/presentation/providers/chat_providers.dart @@ -15,6 +15,7 @@ import '../../domain/repositories/chat_repository.dart'; import '../../domain/usecases/cancel_task.dart'; import '../../domain/usecases/get_session_history.dart'; import '../../domain/usecases/send_message.dart'; +import '../widgets/conversation_drawer.dart'; // --------------------------------------------------------------------------- // Dependency providers @@ -42,7 +43,6 @@ final chatRepositoryProvider = Provider((ref) { final ws = ref.watch(webSocketClientProvider); final storage = ref.watch(secureStorageProvider); - // Use a no-op local datasource if SharedPreferences is not yet ready return ChatRepositoryImpl( remoteDatasource: remote, localDatasource: local ?? _NoOpLocalDatasource(), @@ -67,6 +67,15 @@ final cancelTaskUseCaseProvider = Provider((ref) { return CancelTask(ref.watch(chatRepositoryProvider)); }); +// --------------------------------------------------------------------------- +// Session list provider (for conversation drawer) +// --------------------------------------------------------------------------- + +final sessionListProvider = FutureProvider>((ref) async { + final datasource = ref.watch(chatRemoteDatasourceProvider); + return datasource.listSessions(); +}); + // --------------------------------------------------------------------------- // Chat state // --------------------------------------------------------------------------- @@ -134,10 +143,11 @@ class ChatNotifier extends StateNotifier { try { final useCase = _ref.read(sendMessageUseCaseProvider); - final sessionId = state.sessionId ?? 'new'; + // Pass current sessionId to reuse the session for multi-turn context + final sessionId = state.sessionId; final stream = useCase.execute( - sessionId: sessionId, + sessionId: sessionId ?? 'new', message: prompt, ); @@ -194,8 +204,6 @@ class ChatNotifier extends StateNotifier { ); case ToolResultEvent(:final toolName, :final output, :final isError): - // First, update the matching ToolUse message's status so its spinner - // transitions to a completed/error icon in the timeline. final updatedMessages = [...state.messages]; for (int i = updatedMessages.length - 1; i >= 0; i--) { final m = updatedMessages[i]; @@ -246,8 +254,6 @@ class ChatNotifier extends StateNotifier { ); case CompletedEvent(:final summary): - // Only show summary as a message if there were no assistant text messages - // (avoids duplicate bubble when the SDK already streamed the full response) final hasAssistantText = state.messages.any( (m) => m.role == MessageRole.assistant && m.type == MessageType.text && m.content.isNotEmpty, ); @@ -288,11 +294,13 @@ class ChatNotifier extends StateNotifier { messages: [...state.messages, msg], agentStatus: AgentStatus.idle, ); + + case SessionInfoEvent(:final sessionId): + // Capture the real sessionId from the backend for multi-turn reuse + state = state.copyWith(sessionId: sessionId); } } - /// Appends text content to the last assistant message if it is of the same - /// type, or creates a new message bubble. void _appendOrUpdateAssistantMessage(String content, MessageType type) { if (state.messages.isNotEmpty) { final last = state.messages.last; @@ -319,7 +327,43 @@ class ChatNotifier extends StateNotifier { state = state.copyWith(messages: [...state.messages, msg]); } - /// Approves a pending command. + /// Starts a new chat — clears messages and resets sessionId. + void startNewChat() { + _eventSubscription?.cancel(); + state = const ChatState(); + } + + /// Switches to an existing session — loads its messages from the backend. + Future switchSession(String sessionId) async { + _eventSubscription?.cancel(); + state = ChatState(sessionId: sessionId, agentStatus: AgentStatus.idle); + + try { + final datasource = _ref.read(chatRemoteDatasourceProvider); + final messages = await datasource.getSessionMessages(sessionId); + state = state.copyWith( + messages: messages.map((m) => m.toEntity()).toList(), + sessionId: sessionId, + ); + } catch (e) { + state = state.copyWith(error: '加载对话历史失败: $e'); + } + } + + /// Deletes a session from the backend. + Future deleteSession(String sessionId) async { + try { + final datasource = _ref.read(chatRemoteDatasourceProvider); + await datasource.deleteSession(sessionId); + // If the deleted session is the current one, reset state + if (state.sessionId == sessionId) { + state = const ChatState(); + } + } catch (e) { + state = state.copyWith(error: '删除对话失败: $e'); + } + } + Future approveCommand(String taskId) async { try { final repo = _ref.read(chatRepositoryProvider); @@ -330,7 +374,6 @@ class ChatNotifier extends StateNotifier { } } - /// Rejects a pending command. Future rejectCommand(String taskId, {String? reason}) async { try { final repo = _ref.read(chatRepositoryProvider); @@ -341,7 +384,6 @@ class ChatNotifier extends StateNotifier { } } - /// Confirms a standing order draft. Future confirmStandingOrder(Map draft) async { if (state.sessionId == null) return; try { @@ -353,7 +395,6 @@ class ChatNotifier extends StateNotifier { } } - /// Cancels the current agent task. Future cancelCurrentTask() async { if (state.sessionId == null) return; try { @@ -366,7 +407,6 @@ class ChatNotifier extends StateNotifier { } } - /// Loads session history from the backend. Future loadSessionHistory(String sessionId) async { try { final useCase = _ref.read(getSessionHistoryUseCaseProvider); @@ -380,7 +420,6 @@ class ChatNotifier extends StateNotifier { } } - /// Clears the chat state and cancels any active subscriptions. void clearChat() { _eventSubscription?.cancel(); state = const ChatState(); diff --git a/it0_app/lib/features/chat/presentation/widgets/conversation_drawer.dart b/it0_app/lib/features/chat/presentation/widgets/conversation_drawer.dart new file mode 100644 index 0000000..794104c --- /dev/null +++ b/it0_app/lib/features/chat/presentation/widgets/conversation_drawer.dart @@ -0,0 +1,302 @@ +import 'package:flutter/material.dart'; +import 'package:flutter_riverpod/flutter_riverpod.dart'; +import '../../../../core/theme/app_colors.dart'; +import '../providers/chat_providers.dart'; + +/// Session summary returned from the backend. +class SessionSummary { + final String id; + final String title; + final String status; + final DateTime createdAt; + final DateTime updatedAt; + + const SessionSummary({ + required this.id, + required this.title, + required this.status, + required this.createdAt, + required this.updatedAt, + }); + + factory SessionSummary.fromJson(Map json) { + return SessionSummary( + id: json['id'] as String, + title: json['systemPrompt'] as String? ?? _generateTitle(json), + status: json['status'] as String? ?? 'active', + createdAt: DateTime.tryParse(json['createdAt'] as String? ?? '') ?? DateTime.now(), + updatedAt: DateTime.tryParse(json['updatedAt'] as String? ?? '') ?? DateTime.now(), + ); + } + + static String _generateTitle(Map json) { + // Use metadata or fallback to date-based title + final meta = json['metadata'] as Map?; + if (meta != null && meta['title'] != null) { + return meta['title'] as String; + } + final createdAt = DateTime.tryParse(json['createdAt'] as String? ?? ''); + if (createdAt != null) { + return '对话 ${createdAt.month}/${createdAt.day} ${createdAt.hour}:${createdAt.minute.toString().padLeft(2, '0')}'; + } + return '新对话'; + } +} + +/// Groups sessions by date category. +enum DateGroup { today, yesterday, previous7Days, previous30Days, older } + +/// Left drawer showing conversation history list. +/// Pattern: ChatGPT / Claude / Gemini style side drawer. +class ConversationDrawer extends ConsumerWidget { + const ConversationDrawer({super.key}); + + @override + Widget build(BuildContext context, WidgetRef ref) { + final sessions = ref.watch(sessionListProvider); + final currentSessionId = ref.watch(currentSessionIdProvider); + + return Drawer( + backgroundColor: AppColors.background, + child: SafeArea( + child: Column( + children: [ + // Header: New Chat button + Padding( + padding: const EdgeInsets.fromLTRB(16, 12, 16, 8), + child: SizedBox( + width: double.infinity, + child: OutlinedButton.icon( + onPressed: () { + ref.read(chatProvider.notifier).startNewChat(); + Navigator.of(context).pop(); + }, + icon: const Icon(Icons.add, size: 18), + label: const Text('新对话'), + style: OutlinedButton.styleFrom( + foregroundColor: AppColors.textPrimary, + side: BorderSide(color: AppColors.surfaceLight.withOpacity(0.6)), + padding: const EdgeInsets.symmetric(vertical: 12), + shape: RoundedRectangleBorder( + borderRadius: BorderRadius.circular(10), + ), + ), + ), + ), + ), + + const Divider(color: AppColors.surfaceLight, height: 1), + + // Session list + Expanded( + child: sessions.when( + data: (list) { + if (list.isEmpty) { + return Center( + child: Column( + mainAxisSize: MainAxisSize.min, + children: [ + Icon(Icons.chat_bubble_outline, size: 48, color: AppColors.textMuted), + const SizedBox(height: 12), + Text('暂无对话历史', style: TextStyle(color: AppColors.textMuted, fontSize: 14)), + ], + ), + ); + } + + final grouped = _groupByDate(list); + return ListView( + padding: const EdgeInsets.symmetric(vertical: 8), + children: grouped.entries.expand((entry) { + return [ + // Date group header + Padding( + padding: const EdgeInsets.fromLTRB(16, 12, 16, 4), + child: Text( + _dateGroupLabel(entry.key), + style: TextStyle( + color: AppColors.textMuted, + fontSize: 11, + fontWeight: FontWeight.w600, + letterSpacing: 0.5, + ), + ), + ), + // Sessions in this group + ...entry.value.map((session) => _SessionTile( + session: session, + isActive: session.id == currentSessionId, + onTap: () { + ref.read(chatProvider.notifier).switchSession(session.id); + Navigator.of(context).pop(); + }, + onDelete: () { + _confirmDelete(context, ref, session); + }, + )), + ]; + }).toList(), + ); + }, + loading: () => const Center(child: CircularProgressIndicator()), + error: (err, _) => Center( + child: Column( + mainAxisSize: MainAxisSize.min, + children: [ + Icon(Icons.error_outline, color: AppColors.error), + const SizedBox(height: 8), + Text('加载失败', style: TextStyle(color: AppColors.error)), + TextButton( + onPressed: () => ref.invalidate(sessionListProvider), + child: const Text('重试'), + ), + ], + ), + ), + ), + ), + ], + ), + ), + ); + } + + Map> _groupByDate(List sessions) { + final now = DateTime.now(); + final today = DateTime(now.year, now.month, now.day); + final yesterday = today.subtract(const Duration(days: 1)); + final week = today.subtract(const Duration(days: 7)); + final month = today.subtract(const Duration(days: 30)); + + final grouped = >{}; + + for (final session in sessions) { + final date = DateTime(session.updatedAt.year, session.updatedAt.month, session.updatedAt.day); + DateGroup group; + if (!date.isBefore(today)) { + group = DateGroup.today; + } else if (!date.isBefore(yesterday)) { + group = DateGroup.yesterday; + } else if (!date.isBefore(week)) { + group = DateGroup.previous7Days; + } else if (!date.isBefore(month)) { + group = DateGroup.previous30Days; + } else { + group = DateGroup.older; + } + grouped.putIfAbsent(group, () => []).add(session); + } + + return grouped; + } + + String _dateGroupLabel(DateGroup group) { + switch (group) { + case DateGroup.today: + return '今天'; + case DateGroup.yesterday: + return '昨天'; + case DateGroup.previous7Days: + return '最近7天'; + case DateGroup.previous30Days: + return '最近30天'; + case DateGroup.older: + return '更早'; + } + } + + void _confirmDelete(BuildContext context, WidgetRef ref, SessionSummary session) { + showDialog( + context: context, + builder: (ctx) => AlertDialog( + backgroundColor: AppColors.surface, + title: const Text('删除对话', style: TextStyle(color: AppColors.textPrimary)), + content: Text( + '确定要删除「${session.title}」吗?此操作不可恢复。', + style: const TextStyle(color: AppColors.textSecondary), + ), + actions: [ + TextButton( + onPressed: () => Navigator.of(ctx).pop(), + child: const Text('取消'), + ), + TextButton( + onPressed: () { + Navigator.of(ctx).pop(); + ref.read(chatProvider.notifier).deleteSession(session.id); + ref.invalidate(sessionListProvider); + }, + style: TextButton.styleFrom(foregroundColor: AppColors.error), + child: const Text('删除'), + ), + ], + ), + ); + } +} + +/// Individual session list tile with long-press menu. +class _SessionTile extends StatelessWidget { + final SessionSummary session; + final bool isActive; + final VoidCallback onTap; + final VoidCallback onDelete; + + const _SessionTile({ + required this.session, + required this.isActive, + required this.onTap, + required this.onDelete, + }); + + @override + Widget build(BuildContext context) { + return ListTile( + dense: true, + selected: isActive, + selectedTileColor: AppColors.surfaceLight.withOpacity(0.4), + shape: RoundedRectangleBorder(borderRadius: BorderRadius.circular(8)), + contentPadding: const EdgeInsets.symmetric(horizontal: 16, vertical: 2), + leading: Icon( + Icons.chat_bubble_outline, + size: 18, + color: isActive ? AppColors.primary : AppColors.textMuted, + ), + title: Text( + session.title, + maxLines: 1, + overflow: TextOverflow.ellipsis, + style: TextStyle( + color: isActive ? AppColors.textPrimary : AppColors.textSecondary, + fontSize: 13, + fontWeight: isActive ? FontWeight.w600 : FontWeight.normal, + ), + ), + onTap: onTap, + onLongPress: () { + showModalBottomSheet( + context: context, + backgroundColor: AppColors.surface, + shape: const RoundedRectangleBorder( + borderRadius: BorderRadius.vertical(top: Radius.circular(16)), + ), + builder: (ctx) => SafeArea( + child: Column( + mainAxisSize: MainAxisSize.min, + children: [ + ListTile( + leading: const Icon(Icons.delete_outline, color: AppColors.error), + title: const Text('删除对话', style: TextStyle(color: AppColors.error)), + onTap: () { + Navigator.of(ctx).pop(); + onDelete(); + }, + ), + ], + ), + ), + ); + }, + ); + } +} diff --git a/packages/services/agent-service/src/agent.module.ts b/packages/services/agent-service/src/agent.module.ts index af350e8..a626775 100644 --- a/packages/services/agent-service/src/agent.module.ts +++ b/packages/services/agent-service/src/agent.module.ts @@ -37,6 +37,9 @@ import { TenantAgentConfig } from './domain/entities/tenant-agent-config.entity' import { AgentConfig } from './domain/entities/agent-config.entity'; import { AgentSkill } from './domain/entities/agent-skill.entity'; import { HookScript } from './domain/entities/hook-script.entity'; +import { ConversationMessage } from './domain/entities/conversation-message.entity'; +import { MessageRepository } from './infrastructure/repositories/message.repository'; +import { ConversationContextService } from './domain/services/conversation-context.service'; @Module({ imports: [ @@ -45,6 +48,7 @@ import { HookScript } from './domain/entities/hook-script.entity'; TypeOrmModule.forFeature([ AgentSession, AgentTask, CommandRecord, StandingOrderRef, TenantAgentConfig, AgentConfig, AgentSkill, HookScript, + ConversationMessage, ]), ], controllers: [ @@ -62,8 +66,10 @@ import { HookScript } from './domain/entities/hook-script.entity'; SkillManagerService, StandingOrderExtractorService, AllowedToolsResolverService, + ConversationContextService, SessionRepository, TaskRepository, + MessageRepository, TenantAgentConfigRepository, AgentConfigRepository, AgentSkillRepository, diff --git a/packages/services/agent-service/src/domain/entities/conversation-message.entity.ts b/packages/services/agent-service/src/domain/entities/conversation-message.entity.ts new file mode 100644 index 0000000..f319ce1 --- /dev/null +++ b/packages/services/agent-service/src/domain/entities/conversation-message.entity.ts @@ -0,0 +1,34 @@ +import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn } from 'typeorm'; + +@Entity('conversation_messages') +export class ConversationMessage { + @PrimaryGeneratedColumn('uuid') + id!: string; + + @Column({ type: 'varchar', length: 20 }) + tenantId!: string; + + @Column({ type: 'uuid' }) + sessionId!: string; + + @Column({ type: 'varchar', length: 20 }) + role!: 'user' | 'assistant' | 'system'; + + @Column({ type: 'text' }) + content!: string; + + @Column({ type: 'jsonb', nullable: true }) + toolCalls?: any[]; + + @Column({ type: 'jsonb', nullable: true }) + toolResults?: any[]; + + @Column({ type: 'int', nullable: true }) + tokenCount?: number; + + @Column({ type: 'int' }) + sequenceNumber!: number; + + @CreateDateColumn({ type: 'timestamptz' }) + createdAt!: Date; +} diff --git a/packages/services/agent-service/src/domain/ports/outbound/agent-engine.port.ts b/packages/services/agent-service/src/domain/ports/outbound/agent-engine.port.ts index 2a348ba..9569145 100644 --- a/packages/services/agent-service/src/domain/ports/outbound/agent-engine.port.ts +++ b/packages/services/agent-service/src/domain/ports/outbound/agent-engine.port.ts @@ -4,7 +4,11 @@ export interface AgentEnginePort { readonly engineType: AgentEngineType; executeTask(params: EngineTaskParams): AsyncGenerator; cancelTask(sessionId: string): Promise; - continueSession(sessionId: string, message: string): AsyncGenerator; + continueSession( + sessionId: string, + message: string, + conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>, + ): AsyncGenerator; healthCheck(): Promise; } @@ -17,6 +21,11 @@ export interface EngineTaskParams { maxBudgetUsd?: number; context?: Record; skill?: { name: string; arguments: string }; + /** Conversation history messages to prepend for multi-turn context. */ + conversationHistory?: Array<{ + role: 'user' | 'assistant'; + content: string | any[]; + }>; } export type EngineStreamEvent = diff --git a/packages/services/agent-service/src/domain/services/conversation-context.service.ts b/packages/services/agent-service/src/domain/services/conversation-context.service.ts new file mode 100644 index 0000000..b0558c5 --- /dev/null +++ b/packages/services/agent-service/src/domain/services/conversation-context.service.ts @@ -0,0 +1,170 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { MessageRepository } from '../../infrastructure/repositories/message.repository'; +import { ConversationMessage } from '../entities/conversation-message.entity'; +import { TenantContextService } from '@it0/common'; +import * as crypto from 'crypto'; + +export interface AnthropicHistoryMessage { + role: 'user' | 'assistant'; + content: string | any[]; +} + +@Injectable() +export class ConversationContextService { + private readonly logger = new Logger(ConversationContextService.name); + + constructor(private readonly messageRepository: MessageRepository) {} + + /** + * Save a user message to the conversation history. + */ + async saveUserMessage(sessionId: string, content: string): Promise { + const tenantId = TenantContextService.getTenantId(); + const sequenceNumber = await this.messageRepository.getNextSequenceNumber(sessionId); + + const message = new ConversationMessage(); + message.id = crypto.randomUUID(); + message.tenantId = tenantId; + message.sessionId = sessionId; + message.role = 'user'; + message.content = content; + message.tokenCount = this.estimateTokens(content); + message.sequenceNumber = sequenceNumber; + message.createdAt = new Date(); + + return this.messageRepository.save(message); + } + + /** + * Save an assistant response to the conversation history. + */ + async saveAssistantMessage( + sessionId: string, + content: string, + toolCalls?: any[], + toolResults?: any[], + ): Promise { + const tenantId = TenantContextService.getTenantId(); + const sequenceNumber = await this.messageRepository.getNextSequenceNumber(sessionId); + + const message = new ConversationMessage(); + message.id = crypto.randomUUID(); + message.tenantId = tenantId; + message.sessionId = sessionId; + message.role = 'assistant'; + message.content = content; + message.toolCalls = toolCalls; + message.toolResults = toolResults; + message.tokenCount = this.estimateTokens(content); + message.sequenceNumber = sequenceNumber; + message.createdAt = new Date(); + + return this.messageRepository.save(message); + } + + /** + * Load conversation history for a session, formatted as Anthropic messages. + * Returns the most recent `maxMessages` messages in chronological order. + */ + async loadContext( + sessionId: string, + maxMessages = 20, + ): Promise { + const messages = await this.messageRepository.findRecentBySessionId( + sessionId, + maxMessages, + ); + + if (messages.length === 0) return []; + + const history: AnthropicHistoryMessage[] = []; + + for (const msg of messages) { + if (msg.role === 'user') { + history.push({ role: 'user', content: msg.content }); + } else if (msg.role === 'assistant') { + // If the assistant message has tool calls, build content blocks + if (msg.toolCalls && msg.toolCalls.length > 0) { + const contentBlocks: any[] = []; + if (msg.content) { + contentBlocks.push({ type: 'text', text: msg.content }); + } + for (const tc of msg.toolCalls) { + contentBlocks.push({ + type: 'tool_use', + id: tc.id, + name: tc.name, + input: tc.input, + }); + } + history.push({ role: 'assistant', content: contentBlocks }); + + // Add tool results as the next user message + if (msg.toolResults && msg.toolResults.length > 0) { + history.push({ + role: 'user', + content: msg.toolResults.map((tr: any) => ({ + type: 'tool_result', + tool_use_id: tr.tool_use_id, + content: tr.content, + })), + }); + } + } else { + history.push({ role: 'assistant', content: msg.content }); + } + } + } + + // Ensure messages alternate correctly — first message should be 'user' + if (history.length > 0 && history[0].role !== 'user') { + history.shift(); + } + + this.logger.log( + `Loaded ${history.length} context messages for session ${sessionId}`, + ); + + return history; + } + + /** + * Build a text summary of conversation history for SDK engine prompt prefix. + */ + async buildPromptPrefix( + sessionId: string, + maxMessages = 20, + ): Promise { + const messages = await this.messageRepository.findRecentBySessionId( + sessionId, + maxMessages, + ); + + if (messages.length === 0) return ''; + + const lines: string[] = ['[Previous conversation]']; + for (const msg of messages) { + const role = msg.role === 'user' ? 'User' : 'Assistant'; + // Truncate long messages in the prefix + const content = msg.content.length > 500 + ? msg.content.slice(0, 500) + '...' + : msg.content; + lines.push(`${role}: ${content}`); + } + lines.push('', '[Current request]'); + + return lines.join('\n'); + } + + /** + * Simple token estimation: ~4 characters per token for English, + * ~2 characters per token for CJK. + */ + private estimateTokens(content: string): number { + if (!content) return 0; + // Count CJK characters + const cjkCount = (content.match(/[\u4e00-\u9fff\u3040-\u30ff\uac00-\ud7af]/g) || []).length; + const nonCjkCount = content.length - cjkCount; + return Math.ceil(cjkCount / 2 + nonCjkCount / 4); + } +} diff --git a/packages/services/agent-service/src/infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine.ts b/packages/services/agent-service/src/infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine.ts index 6eb7f9a..7ee0016 100644 --- a/packages/services/agent-service/src/infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine.ts +++ b/packages/services/agent-service/src/infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine.ts @@ -100,8 +100,14 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort { try { const { query } = await dynamicImport('@anthropic-ai/claude-agent-sdk'); + // Build prompt with conversation history prefix if available + const promptWithContext = this.buildPromptWithHistory( + params.prompt, + params.conversationHistory, + ); + const sdkQuery = query({ - prompt: params.prompt, + prompt: promptWithContext, options: { systemPrompt: params.systemPrompt || undefined, allowedTools: params.allowedTools?.length ? params.allowedTools : undefined, @@ -223,7 +229,11 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort { } } - async *continueSession(sessionId: string, message: string): AsyncGenerator { + async *continueSession( + sessionId: string, + message: string, + conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>, + ): AsyncGenerator { const session = this.activeSessions.get(sessionId); // If there's a pending approval, resolve it @@ -340,6 +350,33 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort { return this.activeSessions.get(sessionId)?.sdkSessionId; } + /** + * Build a prompt that includes conversation history as a prefix. + * Used when SDK session resume is not available (e.g., after process restart). + */ + private buildPromptWithHistory( + prompt: string, + history?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>, + ): string { + if (!history || history.length === 0) return prompt; + + const lines: string[] = ['[Previous conversation]']; + for (const msg of history) { + const role = msg.role === 'user' ? 'User' : 'Assistant'; + const content = typeof msg.content === 'string' + ? msg.content + : JSON.stringify(msg.content); + // Truncate very long messages in the prefix + const truncated = content.length > 500 + ? content.slice(0, 500) + '...' + : content; + lines.push(`${role}: ${truncated}`); + } + lines.push('', '[Current request]', prompt); + + return lines.join('\n'); + } + private classifyToolRisk(toolName: string, toolInput: any): CommandRiskLevel { // Only classify Bash commands for risk; other tools are auto-allowed if (toolName === 'Bash' && typeof toolInput?.command === 'string') { diff --git a/packages/services/agent-service/src/infrastructure/engines/claude-api/claude-api-engine.ts b/packages/services/agent-service/src/infrastructure/engines/claude-api/claude-api-engine.ts index f25d095..f1a5344 100644 --- a/packages/services/agent-service/src/infrastructure/engines/claude-api/claude-api-engine.ts +++ b/packages/services/agent-service/src/infrastructure/engines/claude-api/claude-api-engine.ts @@ -58,8 +58,9 @@ export class ClaudeApiEngine implements AgentEnginePort { // Build tools array from allowedTools const tools = this.buildToolDefinitions(params.allowedTools); - // Initialize conversation with user prompt + // Initialize conversation with history + user prompt const messages: AnthropicMessage[] = [ + ...(params.conversationHistory || []), { role: 'user', content: params.prompt }, ]; @@ -234,16 +235,18 @@ export class ClaudeApiEngine implements AgentEnginePort { this.abortControllers.delete(sessionId); } - async *continueSession(sessionId: string, message: string): AsyncGenerator { - // For the API engine, continuing a session requires the full message history - // which should be managed by the orchestrator layer. Here we treat it as a - // new single-turn interaction for simplicity. + async *continueSession( + sessionId: string, + message: string, + conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>, + ): AsyncGenerator { yield* this.executeTask({ sessionId, prompt: message, systemPrompt: '', allowedTools: [], - maxTurns: 1, + maxTurns: 10, + conversationHistory, }); } diff --git a/packages/services/agent-service/src/infrastructure/engines/claude-code-cli/claude-code-engine.ts b/packages/services/agent-service/src/infrastructure/engines/claude-code-cli/claude-code-engine.ts index c07fff8..410df86 100644 --- a/packages/services/agent-service/src/infrastructure/engines/claude-code-cli/claude-code-engine.ts +++ b/packages/services/agent-service/src/infrastructure/engines/claude-code-cli/claude-code-engine.ts @@ -74,7 +74,11 @@ export class ClaudeCodeCliEngine implements AgentEnginePort { this.processes.delete(sessionId); } - async *continueSession(sessionId: string, message: string): AsyncGenerator { + async *continueSession( + sessionId: string, + message: string, + _conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>, + ): AsyncGenerator { const args = [ '--session-id', sessionId, '--resume', diff --git a/packages/services/agent-service/src/infrastructure/engines/custom/custom-engine.ts b/packages/services/agent-service/src/infrastructure/engines/custom/custom-engine.ts index 6afc845..ae1ccde 100644 --- a/packages/services/agent-service/src/infrastructure/engines/custom/custom-engine.ts +++ b/packages/services/agent-service/src/infrastructure/engines/custom/custom-engine.ts @@ -42,6 +42,7 @@ export class CustomEngine implements AgentEnginePort { async *continueSession( sessionId: string, message: string, + _conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>, ): AsyncGenerator { this.logger.log( `Custom engine continueSession called for session ${sessionId}`, diff --git a/packages/services/agent-service/src/infrastructure/repositories/message.repository.ts b/packages/services/agent-service/src/infrastructure/repositories/message.repository.ts new file mode 100644 index 0000000..1021249 --- /dev/null +++ b/packages/services/agent-service/src/infrastructure/repositories/message.repository.ts @@ -0,0 +1,68 @@ +import { Injectable } from '@nestjs/common'; +import { DataSource } from 'typeorm'; +import { TenantAwareRepository } from '@it0/database'; +import { ConversationMessage } from '../../domain/entities/conversation-message.entity'; + +@Injectable() +export class MessageRepository extends TenantAwareRepository { + constructor(dataSource: DataSource) { + super(dataSource, ConversationMessage); + } + + async findBySessionId( + sessionId: string, + limit = 20, + offset = 0, + ): Promise { + return this.withRepository((repo) => + repo.find({ + where: { sessionId } as any, + order: { sequenceNumber: 'ASC' }, + skip: offset, + take: limit, + }), + ); + } + + /** + * Load the most recent N messages for a session, ordered by sequence number ASC. + */ + async findRecentBySessionId( + sessionId: string, + limit = 20, + ): Promise { + return this.withRepository(async (repo) => { + // Subquery approach: get the last N messages ordered DESC, then re-order ASC + const messages = await repo.find({ + where: { sessionId } as any, + order: { sequenceNumber: 'DESC' }, + take: limit, + }); + return messages.reverse(); + }); + } + + async countBySessionId(sessionId: string): Promise { + return this.withRepository((repo) => + repo.count({ where: { sessionId } as any }), + ); + } + + async getNextSequenceNumber(sessionId: string): Promise { + return this.withRepository(async (repo) => { + const result = await repo + .createQueryBuilder('msg') + .select('COALESCE(MAX(msg.sequence_number), 0)', 'maxSeq') + .where('msg.session_id = :sessionId', { sessionId }) + .getRawOne(); + return (parseInt(result?.maxSeq ?? '0', 10)) + 1; + }); + } + + async deleteBySessionId(sessionId: string): Promise { + await this.withRepository(async (repo) => { + await repo.delete({ sessionId } as any); + return undefined as any; + }); + } +} diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts index 9e78e20..98b36f1 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts @@ -4,6 +4,7 @@ import { EngineRegistry } from '../../../infrastructure/engines/engine-registry' import { AgentStreamGateway } from '../../ws/agent-stream.gateway'; import { SessionRepository } from '../../../infrastructure/repositories/session.repository'; import { TaskRepository } from '../../../infrastructure/repositories/task.repository'; +import { ConversationContextService } from '../../../domain/services/conversation-context.service'; import { AgentSession } from '../../../domain/entities/agent-session.entity'; import { AgentTask } from '../../../domain/entities/agent-task.entity'; import { TaskStatus } from '../../../domain/value-objects/task-status.vo'; @@ -19,27 +20,41 @@ export class AgentController { private readonly gateway: AgentStreamGateway, private readonly sessionRepository: SessionRepository, private readonly taskRepository: TaskRepository, + private readonly contextService: ConversationContextService, ) {} - // TODO 10: Execute task @Post('tasks') async executeTask( @TenantId() tenantId: string, - @Body() body: { prompt: string; systemPrompt?: string; maxTurns?: number; allowedTools?: string[]; engineType?: string }, + @Body() body: { + prompt: string; + sessionId?: string; + systemPrompt?: string; + maxTurns?: number; + allowedTools?: string[]; + engineType?: string; + maxContextMessages?: number; + }, ) { // Allow callers to override the engine (e.g. voice uses claude_api for streaming) const engine = body.engineType ? this.engineRegistry.switchEngine(body.engineType as AgentEngineType) : this.engineRegistry.getActiveEngine(); - const session = new AgentSession(); - session.id = crypto.randomUUID(); - session.tenantId = tenantId; - session.engineType = engine.engineType; + // Reuse existing session or create new one + let session: AgentSession; + if (body.sessionId) { + const existing = await this.sessionRepository.findById(body.sessionId); + if (existing && existing.status === 'active') { + session = existing; + } else { + session = this.createNewSession(tenantId, engine.engineType, body.systemPrompt); + } + } else { + session = this.createNewSession(tenantId, engine.engineType, body.systemPrompt); + } + // Keep session active for multi-turn session.status = 'active'; - session.systemPrompt = body.systemPrompt; - session.metadata = {}; - session.createdAt = new Date(); session.updatedAt = new Date(); await this.sessionRepository.save(session); @@ -53,25 +68,47 @@ export class AgentController { task.createdAt = new Date(); await this.taskRepository.save(task); + // Save user message to conversation history + await this.contextService.saveUserMessage(session.id, body.prompt); + + // Load conversation history for context + const maxCtx = body.maxContextMessages ?? 20; + const conversationHistory = await this.contextService.loadContext(session.id, maxCtx); + this.logger.log(`[Task ${task.id}] Loaded ${conversationHistory.length} history messages for session=${session.id}`); + // Fire-and-forget: iterate engine stream and emit events via gateway (async () => { try { this.logger.log(`[Task ${task.id}] Starting engine stream for session=${session.id}, prompt="${body.prompt.slice(0, 80)}"`); + + // Pass conversation history (excluding the current user message, which is the last one) + // loadContext returns all messages including the one we just saved, + // so we pass the history minus the last user message (it will be added by the engine as params.prompt) + const historyForEngine = conversationHistory.slice(0, -1); + const stream = engine.executeTask({ sessionId: session.id, prompt: body.prompt, systemPrompt: body.systemPrompt || '', allowedTools: body.allowedTools || [], maxTurns: body.maxTurns || 10, + conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined, }); let eventCount = 0; let finished = false; + const textParts: string[] = []; + for await (const event of stream) { eventCount++; this.logger.log(`[Task ${task.id}] Event #${eventCount}: type=${event.type}${event.type === 'text' ? ` len=${(event as any).content?.length}` : ''}${event.type === 'error' ? ` msg=${(event as any).message}` : ''}`); this.gateway.emitStreamEvent(session.id, event); + // Collect text for assistant message + if (event.type === 'text') { + textParts.push(event.content); + } + if (event.type === 'completed' && !finished) { finished = true; task.status = TaskStatus.COMPLETED; @@ -80,7 +117,13 @@ export class AgentController { task.completedAt = new Date(); await this.taskRepository.save(task); - session.status = 'completed'; + // Save assistant response to conversation history + const assistantText = textParts.join('') || event.summary; + if (assistantText) { + await this.contextService.saveAssistantMessage(session.id, assistantText); + } + + // Keep session active (don't mark completed) so it can be reused session.updatedAt = new Date(); await this.sessionRepository.save(session); } @@ -125,7 +168,6 @@ export class AgentController { return { sessionId: session.id, taskId: task.id }; } - // TODO 11: Cancel task @Delete('tasks/:taskId') async cancelTask(@Param('taskId') taskId: string) { const task = await this.taskRepository.findById(taskId); @@ -152,7 +194,6 @@ export class AgentController { return { message: 'Task cancelled', taskId }; } - // TODO 12: Approve command @Post('tasks/:taskId/approve') async approveCommand(@Param('taskId') taskId: string, @Body() body: { approved: boolean }) { const task = await this.taskRepository.findById(taskId); @@ -189,7 +230,6 @@ export class AgentController { task.completedAt = new Date(); await this.taskRepository.save(task); - session.status = 'completed'; session.updatedAt = new Date(); await this.sessionRepository.save(session); } @@ -238,17 +278,28 @@ export class AgentController { } } - // TODO 13: List engines @Get('engines') async listEngines() { const engines = this.engineRegistry.listAvailable(); return { engines }; } - // TODO 14: Switch engine @Post('engines/switch') async switchEngine(@Body() body: { engineType: string }) { const engine = this.engineRegistry.switchEngine(body.engineType as AgentEngineType); return { message: 'Engine switched', engineType: engine.engineType }; } + + private createNewSession(tenantId: string, engineType: string, systemPrompt?: string): AgentSession { + const session = new AgentSession(); + session.id = crypto.randomUUID(); + session.tenantId = tenantId; + session.engineType = engineType; + session.status = 'active'; + session.systemPrompt = systemPrompt; + session.metadata = {}; + session.createdAt = new Date(); + session.updatedAt = new Date(); + return session; + } } diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/session.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/session.controller.ts index 306a6cf..c9388de 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/session.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/session.controller.ts @@ -1,22 +1,22 @@ -import { Controller, Get, Param, NotFoundException } from '@nestjs/common'; +import { Controller, Get, Delete, Param, Query, NotFoundException } from '@nestjs/common'; import { TenantId } from '@it0/common'; import { SessionRepository } from '../../../infrastructure/repositories/session.repository'; import { TaskRepository } from '../../../infrastructure/repositories/task.repository'; +import { MessageRepository } from '../../../infrastructure/repositories/message.repository'; @Controller('api/v1/agent/sessions') export class SessionController { constructor( private readonly sessionRepository: SessionRepository, private readonly taskRepository: TaskRepository, + private readonly messageRepository: MessageRepository, ) {} - // TODO 15: List sessions @Get() async listSessions(@TenantId() tenantId: string) { return this.sessionRepository.findByTenant(tenantId); } - // TODO 16: Get session details @Get(':sessionId') async getSession(@Param('sessionId') sessionId: string) { const session = await this.sessionRepository.findById(sessionId); @@ -26,9 +26,65 @@ export class SessionController { return session; } - // TODO 17: Get session history @Get(':sessionId/history') async getSessionHistory(@Param('sessionId') sessionId: string) { return this.taskRepository.findBySessionId(sessionId); } + + @Get(':sessionId/messages') + async getSessionMessages( + @Param('sessionId') sessionId: string, + @Query('limit') limit?: string, + @Query('offset') offset?: string, + ) { + const session = await this.sessionRepository.findById(sessionId); + if (!session) { + throw new NotFoundException(`Session ${sessionId} not found`); + } + + const messages = await this.messageRepository.findBySessionId( + sessionId, + limit ? parseInt(limit, 10) : 50, + offset ? parseInt(offset, 10) : 0, + ); + const total = await this.messageRepository.countBySessionId(sessionId); + + return { + messages, + total, + sessionId, + }; + } + + @Delete(':sessionId/messages') + async clearSessionMessages(@Param('sessionId') sessionId: string) { + const session = await this.sessionRepository.findById(sessionId); + if (!session) { + throw new NotFoundException(`Session ${sessionId} not found`); + } + + await this.messageRepository.deleteBySessionId(sessionId); + return { message: 'Conversation history cleared', sessionId }; + } + + @Delete(':sessionId') + async deleteSession(@Param('sessionId') sessionId: string) { + const session = await this.sessionRepository.findById(sessionId); + if (!session) { + throw new NotFoundException(`Session ${sessionId} not found`); + } + + // Delete messages first (cascade should handle this, but be explicit) + await this.messageRepository.deleteBySessionId(sessionId); + + // Delete tasks + const tasks = await this.taskRepository.findBySessionId(sessionId); + for (const task of tasks) { + await this.taskRepository.remove(task); + } + + // Delete session + await this.sessionRepository.remove(session); + return { message: 'Session deleted', sessionId }; + } } diff --git a/packages/shared/database/src/migrations/002-create-tenant-schema-template.sql b/packages/shared/database/src/migrations/002-create-tenant-schema-template.sql index 32c1eff..7348336 100644 --- a/packages/shared/database/src/migrations/002-create-tenant-schema-template.sql +++ b/packages/shared/database/src/migrations/002-create-tenant-schema-template.sql @@ -330,6 +330,25 @@ CREATE INDEX idx_api_keys_hash ON api_keys(key_hash); -- Agent Service - Additional Tables -- ========================================================================= +-- Conversation Messages (multi-turn dialogue history per session) +CREATE TABLE conversation_messages ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id VARCHAR(20) NOT NULL, + session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE, + role VARCHAR(20) NOT NULL, + content TEXT NOT NULL, + tool_calls JSONB, + tool_results JSONB, + token_count INTEGER, + sequence_number INTEGER NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_conversation_messages_session + ON conversation_messages(session_id, sequence_number ASC); +CREATE INDEX idx_conversation_messages_session_recent + ON conversation_messages(session_id, sequence_number DESC); + -- Agent Tasks CREATE TABLE agent_tasks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), diff --git a/packages/shared/database/src/migrations/004-add-conversation-messages.sql b/packages/shared/database/src/migrations/004-add-conversation-messages.sql new file mode 100644 index 0000000..12d488f --- /dev/null +++ b/packages/shared/database/src/migrations/004-add-conversation-messages.sql @@ -0,0 +1,24 @@ +-- IT0 Migration 004: Add conversation_messages table for multi-turn context management +-- This table stores conversation history per session, shared across all engine types. +-- Usage: Replace {SCHEMA} with target schema name (e.g., it0_t_t001) + +SET search_path TO {SCHEMA}; + +-- Conversation Messages (multi-turn dialogue history per session) +CREATE TABLE IF NOT EXISTS conversation_messages ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id VARCHAR(20) NOT NULL, + session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE, + role VARCHAR(20) NOT NULL, + content TEXT NOT NULL, + tool_calls JSONB, + tool_results JSONB, + token_count INTEGER, + sequence_number INTEGER NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_conversation_messages_session + ON conversation_messages(session_id, sequence_number ASC); +CREATE INDEX IF NOT EXISTS idx_conversation_messages_session_recent + ON conversation_messages(session_id, sequence_number DESC); diff --git a/packages/shared/database/src/run-migrations.ts b/packages/shared/database/src/run-migrations.ts index c568344..c0c1a16 100644 --- a/packages/shared/database/src/run-migrations.ts +++ b/packages/shared/database/src/run-migrations.ts @@ -86,6 +86,13 @@ async function runTenantSchema(client: Client, tenantId: string) { { '{SCHEMA}': schemaName }, ); + // Run conversation messages table (004) + await runSqlFile( + client, + path.join(MIGRATIONS_DIR, '004-add-conversation-messages.sql'), + { '{SCHEMA}': schemaName }, + ); + log(`Tenant schema ${schemaName} ready.`); }