feat: multi-turn conversation context management with session history UI

Implement DB-based conversation message storage (engine-agnostic) that
works across both Claude API and Agent SDK engines. Add ChatGPT/Claude-style
conversation history drawer in Flutter with date-grouped session list,
session switching, and new chat functionality.

Backend: entity, repository, context service, migration 004, session/message
API endpoints. Flutter: ConversationDrawer, sessionId flow from backend
response via SessionInfoEvent, session list/switch/delete support.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-24 19:04:35 -08:00
parent 7cda482e49
commit 2403ce5636
20 changed files with 947 additions and 51 deletions

View File

@ -1,6 +1,7 @@
import 'package:dio/dio.dart'; import 'package:dio/dio.dart';
import '../../../../core/config/api_endpoints.dart'; import '../../../../core/config/api_endpoints.dart';
import '../models/chat_message_model.dart'; import '../models/chat_message_model.dart';
import '../../presentation/widgets/conversation_drawer.dart';
/// Remote datasource for chat operations against the IT0 backend API. /// Remote datasource for chat operations against the IT0 backend API.
class ChatRemoteDatasource { class ChatRemoteDatasource {
@ -19,7 +20,7 @@ class ChatRemoteDatasource {
ApiEndpoints.tasks, ApiEndpoints.tasks,
data: { data: {
'prompt': message, 'prompt': message,
'sessionId': sessionId, if (sessionId != 'new') 'sessionId': sessionId,
if (attachments != null && attachments.isNotEmpty) if (attachments != null && attachments.isNotEmpty)
'attachments': attachments, 'attachments': attachments,
}, },
@ -51,6 +52,57 @@ class ChatRemoteDatasource {
.toList(); .toList();
} }
/// Lists all sessions for the current tenant.
Future<List<SessionSummary>> listSessions() async {
final response = await _dio.get(ApiEndpoints.sessions);
final data = response.data;
List<dynamic> sessions;
if (data is List) {
sessions = data;
} else if (data is Map && data.containsKey('sessions')) {
sessions = data['sessions'] as List;
} else {
sessions = [];
}
final list = sessions
.map((s) => SessionSummary.fromJson(s as Map<String, dynamic>))
.toList();
// Sort by most recent first
list.sort((a, b) => b.updatedAt.compareTo(a.updatedAt));
return list;
}
/// Gets conversation messages for a session (from conversation_messages table).
Future<List<ChatMessageModel>> getSessionMessages(String sessionId) async {
final response = await _dio.get(
'${ApiEndpoints.sessions}/$sessionId/messages',
queryParameters: {'limit': '50'},
);
final data = response.data;
List<dynamic> messages;
if (data is Map && data.containsKey('messages')) {
messages = data['messages'] as List;
} else if (data is List) {
messages = data;
} else {
messages = [];
}
return messages
.map((m) => ChatMessageModel.fromJson(m as Map<String, dynamic>))
.toList();
}
/// Deletes a session and all its messages.
Future<void> deleteSession(String sessionId) async {
await _dio.delete('${ApiEndpoints.sessions}/$sessionId');
}
/// Approves a pending command for a given task. /// Approves a pending command for a given task.
Future<void> approveCommand(String taskId) async { Future<void> approveCommand(String taskId) async {
await _dio.post('${ApiEndpoints.approvals}/$taskId/approve'); await _dio.post('${ApiEndpoints.approvals}/$taskId/approve');

View File

@ -41,6 +41,9 @@ class ChatRepositoryImpl implements ChatRepository {
sessionId; sessionId;
final taskId = response['taskId'] as String? ?? response['task_id'] as String?; final taskId = response['taskId'] as String? ?? response['task_id'] as String?;
// Emit the real sessionId so the notifier can capture it
yield SessionInfoEvent(returnedSessionId);
// Connect to the agent WebSocket and subscribe to the session // Connect to the agent WebSocket and subscribe to the session
final token = await _getAccessToken(); final token = await _getAccessToken();
await _webSocketClient.connect('/ws/agent', token: token); await _webSocketClient.connect('/ws/agent', token: token);
@ -91,6 +94,9 @@ class ChatRepositoryImpl implements ChatRepository {
sessionId; sessionId;
final taskId = response['taskId'] as String? ?? response['task_id'] as String?; final taskId = response['taskId'] as String? ?? response['task_id'] as String?;
// Emit the real sessionId so the notifier can capture it
yield SessionInfoEvent(returnedSessionId);
final voiceToken = await _getAccessToken(); final voiceToken = await _getAccessToken();
await _webSocketClient.connect('/ws/agent', token: voiceToken); await _webSocketClient.connect('/ws/agent', token: voiceToken);
_webSocketClient.send({ _webSocketClient.send({

View File

@ -50,3 +50,9 @@ class StandingOrderConfirmedEvent extends StreamEvent {
final String orderName; final String orderName;
StandingOrderConfirmedEvent(this.orderId, this.orderName); StandingOrderConfirmedEvent(this.orderId, this.orderName);
} }
/// Carries the real sessionId assigned by the backend.
class SessionInfoEvent extends StreamEvent {
final String sessionId;
SessionInfoEvent(this.sessionId);
}

View File

@ -6,6 +6,7 @@ import '../providers/chat_providers.dart';
import '../widgets/timeline_event_node.dart'; import '../widgets/timeline_event_node.dart';
import '../widgets/stream_text_widget.dart'; import '../widgets/stream_text_widget.dart';
import '../widgets/approval_action_card.dart'; import '../widgets/approval_action_card.dart';
import '../widgets/conversation_drawer.dart';
import '../../../agent_call/presentation/pages/agent_call_page.dart'; import '../../../agent_call/presentation/pages/agent_call_page.dart';
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -189,9 +190,16 @@ class _ChatPageState extends ConsumerState<ChatPage> {
ref.listen(chatProvider, (_, __) => _scrollToBottom()); ref.listen(chatProvider, (_, __) => _scrollToBottom());
return Scaffold( return Scaffold(
drawer: const ConversationDrawer(),
appBar: AppBar( appBar: AppBar(
title: const Text('iAgent'), title: const Text('iAgent'),
actions: [ actions: [
// New chat button (always visible)
IconButton(
icon: const Icon(Icons.edit_outlined),
tooltip: '新对话',
onPressed: () => ref.read(chatProvider.notifier).startNewChat(),
),
// Stop button during streaming // Stop button during streaming
if (chatState.isStreaming) if (chatState.isStreaming)
IconButton( IconButton(
@ -199,12 +207,6 @@ class _ChatPageState extends ConsumerState<ChatPage> {
tooltip: '停止', tooltip: '停止',
onPressed: () => ref.read(chatProvider.notifier).cancelCurrentTask(), onPressed: () => ref.read(chatProvider.notifier).cancelCurrentTask(),
), ),
if (chatState.messages.isNotEmpty)
IconButton(
icon: const Icon(Icons.delete_outline),
tooltip: '清空对话',
onPressed: () => ref.read(chatProvider.notifier).clearChat(),
),
// Voice call button // Voice call button
IconButton( IconButton(
icon: const Icon(Icons.call), icon: const Icon(Icons.call),

View File

@ -15,6 +15,7 @@ import '../../domain/repositories/chat_repository.dart';
import '../../domain/usecases/cancel_task.dart'; import '../../domain/usecases/cancel_task.dart';
import '../../domain/usecases/get_session_history.dart'; import '../../domain/usecases/get_session_history.dart';
import '../../domain/usecases/send_message.dart'; import '../../domain/usecases/send_message.dart';
import '../widgets/conversation_drawer.dart';
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Dependency providers // Dependency providers
@ -42,7 +43,6 @@ final chatRepositoryProvider = Provider<ChatRepository>((ref) {
final ws = ref.watch(webSocketClientProvider); final ws = ref.watch(webSocketClientProvider);
final storage = ref.watch(secureStorageProvider); final storage = ref.watch(secureStorageProvider);
// Use a no-op local datasource if SharedPreferences is not yet ready
return ChatRepositoryImpl( return ChatRepositoryImpl(
remoteDatasource: remote, remoteDatasource: remote,
localDatasource: local ?? _NoOpLocalDatasource(), localDatasource: local ?? _NoOpLocalDatasource(),
@ -67,6 +67,15 @@ final cancelTaskUseCaseProvider = Provider<CancelTask>((ref) {
return CancelTask(ref.watch(chatRepositoryProvider)); return CancelTask(ref.watch(chatRepositoryProvider));
}); });
// ---------------------------------------------------------------------------
// Session list provider (for conversation drawer)
// ---------------------------------------------------------------------------
final sessionListProvider = FutureProvider<List<SessionSummary>>((ref) async {
final datasource = ref.watch(chatRemoteDatasourceProvider);
return datasource.listSessions();
});
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Chat state // Chat state
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -134,10 +143,11 @@ class ChatNotifier extends StateNotifier<ChatState> {
try { try {
final useCase = _ref.read(sendMessageUseCaseProvider); final useCase = _ref.read(sendMessageUseCaseProvider);
final sessionId = state.sessionId ?? 'new'; // Pass current sessionId to reuse the session for multi-turn context
final sessionId = state.sessionId;
final stream = useCase.execute( final stream = useCase.execute(
sessionId: sessionId, sessionId: sessionId ?? 'new',
message: prompt, message: prompt,
); );
@ -194,8 +204,6 @@ class ChatNotifier extends StateNotifier<ChatState> {
); );
case ToolResultEvent(:final toolName, :final output, :final isError): case ToolResultEvent(:final toolName, :final output, :final isError):
// First, update the matching ToolUse message's status so its spinner
// transitions to a completed/error icon in the timeline.
final updatedMessages = [...state.messages]; final updatedMessages = [...state.messages];
for (int i = updatedMessages.length - 1; i >= 0; i--) { for (int i = updatedMessages.length - 1; i >= 0; i--) {
final m = updatedMessages[i]; final m = updatedMessages[i];
@ -246,8 +254,6 @@ class ChatNotifier extends StateNotifier<ChatState> {
); );
case CompletedEvent(:final summary): case CompletedEvent(:final summary):
// Only show summary as a message if there were no assistant text messages
// (avoids duplicate bubble when the SDK already streamed the full response)
final hasAssistantText = state.messages.any( final hasAssistantText = state.messages.any(
(m) => m.role == MessageRole.assistant && m.type == MessageType.text && m.content.isNotEmpty, (m) => m.role == MessageRole.assistant && m.type == MessageType.text && m.content.isNotEmpty,
); );
@ -288,11 +294,13 @@ class ChatNotifier extends StateNotifier<ChatState> {
messages: [...state.messages, msg], messages: [...state.messages, msg],
agentStatus: AgentStatus.idle, agentStatus: AgentStatus.idle,
); );
case SessionInfoEvent(:final sessionId):
// Capture the real sessionId from the backend for multi-turn reuse
state = state.copyWith(sessionId: sessionId);
} }
} }
/// Appends text content to the last assistant message if it is of the same
/// type, or creates a new message bubble.
void _appendOrUpdateAssistantMessage(String content, MessageType type) { void _appendOrUpdateAssistantMessage(String content, MessageType type) {
if (state.messages.isNotEmpty) { if (state.messages.isNotEmpty) {
final last = state.messages.last; final last = state.messages.last;
@ -319,7 +327,43 @@ class ChatNotifier extends StateNotifier<ChatState> {
state = state.copyWith(messages: [...state.messages, msg]); state = state.copyWith(messages: [...state.messages, msg]);
} }
/// Approves a pending command. /// Starts a new chat clears messages and resets sessionId.
void startNewChat() {
_eventSubscription?.cancel();
state = const ChatState();
}
/// Switches to an existing session loads its messages from the backend.
Future<void> switchSession(String sessionId) async {
_eventSubscription?.cancel();
state = ChatState(sessionId: sessionId, agentStatus: AgentStatus.idle);
try {
final datasource = _ref.read(chatRemoteDatasourceProvider);
final messages = await datasource.getSessionMessages(sessionId);
state = state.copyWith(
messages: messages.map((m) => m.toEntity()).toList(),
sessionId: sessionId,
);
} catch (e) {
state = state.copyWith(error: '加载对话历史失败: $e');
}
}
/// Deletes a session from the backend.
Future<void> deleteSession(String sessionId) async {
try {
final datasource = _ref.read(chatRemoteDatasourceProvider);
await datasource.deleteSession(sessionId);
// If the deleted session is the current one, reset state
if (state.sessionId == sessionId) {
state = const ChatState();
}
} catch (e) {
state = state.copyWith(error: '删除对话失败: $e');
}
}
Future<void> approveCommand(String taskId) async { Future<void> approveCommand(String taskId) async {
try { try {
final repo = _ref.read(chatRepositoryProvider); final repo = _ref.read(chatRepositoryProvider);
@ -330,7 +374,6 @@ class ChatNotifier extends StateNotifier<ChatState> {
} }
} }
/// Rejects a pending command.
Future<void> rejectCommand(String taskId, {String? reason}) async { Future<void> rejectCommand(String taskId, {String? reason}) async {
try { try {
final repo = _ref.read(chatRepositoryProvider); final repo = _ref.read(chatRepositoryProvider);
@ -341,7 +384,6 @@ class ChatNotifier extends StateNotifier<ChatState> {
} }
} }
/// Confirms a standing order draft.
Future<void> confirmStandingOrder(Map<String, dynamic> draft) async { Future<void> confirmStandingOrder(Map<String, dynamic> draft) async {
if (state.sessionId == null) return; if (state.sessionId == null) return;
try { try {
@ -353,7 +395,6 @@ class ChatNotifier extends StateNotifier<ChatState> {
} }
} }
/// Cancels the current agent task.
Future<void> cancelCurrentTask() async { Future<void> cancelCurrentTask() async {
if (state.sessionId == null) return; if (state.sessionId == null) return;
try { try {
@ -366,7 +407,6 @@ class ChatNotifier extends StateNotifier<ChatState> {
} }
} }
/// Loads session history from the backend.
Future<void> loadSessionHistory(String sessionId) async { Future<void> loadSessionHistory(String sessionId) async {
try { try {
final useCase = _ref.read(getSessionHistoryUseCaseProvider); final useCase = _ref.read(getSessionHistoryUseCaseProvider);
@ -380,7 +420,6 @@ class ChatNotifier extends StateNotifier<ChatState> {
} }
} }
/// Clears the chat state and cancels any active subscriptions.
void clearChat() { void clearChat() {
_eventSubscription?.cancel(); _eventSubscription?.cancel();
state = const ChatState(); state = const ChatState();

View File

@ -0,0 +1,302 @@
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import '../../../../core/theme/app_colors.dart';
import '../providers/chat_providers.dart';
/// Session summary returned from the backend.
class SessionSummary {
final String id;
final String title;
final String status;
final DateTime createdAt;
final DateTime updatedAt;
const SessionSummary({
required this.id,
required this.title,
required this.status,
required this.createdAt,
required this.updatedAt,
});
factory SessionSummary.fromJson(Map<String, dynamic> json) {
return SessionSummary(
id: json['id'] as String,
title: json['systemPrompt'] as String? ?? _generateTitle(json),
status: json['status'] as String? ?? 'active',
createdAt: DateTime.tryParse(json['createdAt'] as String? ?? '') ?? DateTime.now(),
updatedAt: DateTime.tryParse(json['updatedAt'] as String? ?? '') ?? DateTime.now(),
);
}
static String _generateTitle(Map<String, dynamic> json) {
// Use metadata or fallback to date-based title
final meta = json['metadata'] as Map<String, dynamic>?;
if (meta != null && meta['title'] != null) {
return meta['title'] as String;
}
final createdAt = DateTime.tryParse(json['createdAt'] as String? ?? '');
if (createdAt != null) {
return '对话 ${createdAt.month}/${createdAt.day} ${createdAt.hour}:${createdAt.minute.toString().padLeft(2, '0')}';
}
return '新对话';
}
}
/// Groups sessions by date category.
enum DateGroup { today, yesterday, previous7Days, previous30Days, older }
/// Left drawer showing conversation history list.
/// Pattern: ChatGPT / Claude / Gemini style side drawer.
class ConversationDrawer extends ConsumerWidget {
const ConversationDrawer({super.key});
@override
Widget build(BuildContext context, WidgetRef ref) {
final sessions = ref.watch(sessionListProvider);
final currentSessionId = ref.watch(currentSessionIdProvider);
return Drawer(
backgroundColor: AppColors.background,
child: SafeArea(
child: Column(
children: [
// Header: New Chat button
Padding(
padding: const EdgeInsets.fromLTRB(16, 12, 16, 8),
child: SizedBox(
width: double.infinity,
child: OutlinedButton.icon(
onPressed: () {
ref.read(chatProvider.notifier).startNewChat();
Navigator.of(context).pop();
},
icon: const Icon(Icons.add, size: 18),
label: const Text('新对话'),
style: OutlinedButton.styleFrom(
foregroundColor: AppColors.textPrimary,
side: BorderSide(color: AppColors.surfaceLight.withOpacity(0.6)),
padding: const EdgeInsets.symmetric(vertical: 12),
shape: RoundedRectangleBorder(
borderRadius: BorderRadius.circular(10),
),
),
),
),
),
const Divider(color: AppColors.surfaceLight, height: 1),
// Session list
Expanded(
child: sessions.when(
data: (list) {
if (list.isEmpty) {
return Center(
child: Column(
mainAxisSize: MainAxisSize.min,
children: [
Icon(Icons.chat_bubble_outline, size: 48, color: AppColors.textMuted),
const SizedBox(height: 12),
Text('暂无对话历史', style: TextStyle(color: AppColors.textMuted, fontSize: 14)),
],
),
);
}
final grouped = _groupByDate(list);
return ListView(
padding: const EdgeInsets.symmetric(vertical: 8),
children: grouped.entries.expand((entry) {
return [
// Date group header
Padding(
padding: const EdgeInsets.fromLTRB(16, 12, 16, 4),
child: Text(
_dateGroupLabel(entry.key),
style: TextStyle(
color: AppColors.textMuted,
fontSize: 11,
fontWeight: FontWeight.w600,
letterSpacing: 0.5,
),
),
),
// Sessions in this group
...entry.value.map((session) => _SessionTile(
session: session,
isActive: session.id == currentSessionId,
onTap: () {
ref.read(chatProvider.notifier).switchSession(session.id);
Navigator.of(context).pop();
},
onDelete: () {
_confirmDelete(context, ref, session);
},
)),
];
}).toList(),
);
},
loading: () => const Center(child: CircularProgressIndicator()),
error: (err, _) => Center(
child: Column(
mainAxisSize: MainAxisSize.min,
children: [
Icon(Icons.error_outline, color: AppColors.error),
const SizedBox(height: 8),
Text('加载失败', style: TextStyle(color: AppColors.error)),
TextButton(
onPressed: () => ref.invalidate(sessionListProvider),
child: const Text('重试'),
),
],
),
),
),
),
],
),
),
);
}
Map<DateGroup, List<SessionSummary>> _groupByDate(List<SessionSummary> sessions) {
final now = DateTime.now();
final today = DateTime(now.year, now.month, now.day);
final yesterday = today.subtract(const Duration(days: 1));
final week = today.subtract(const Duration(days: 7));
final month = today.subtract(const Duration(days: 30));
final grouped = <DateGroup, List<SessionSummary>>{};
for (final session in sessions) {
final date = DateTime(session.updatedAt.year, session.updatedAt.month, session.updatedAt.day);
DateGroup group;
if (!date.isBefore(today)) {
group = DateGroup.today;
} else if (!date.isBefore(yesterday)) {
group = DateGroup.yesterday;
} else if (!date.isBefore(week)) {
group = DateGroup.previous7Days;
} else if (!date.isBefore(month)) {
group = DateGroup.previous30Days;
} else {
group = DateGroup.older;
}
grouped.putIfAbsent(group, () => []).add(session);
}
return grouped;
}
String _dateGroupLabel(DateGroup group) {
switch (group) {
case DateGroup.today:
return '今天';
case DateGroup.yesterday:
return '昨天';
case DateGroup.previous7Days:
return '最近7天';
case DateGroup.previous30Days:
return '最近30天';
case DateGroup.older:
return '更早';
}
}
void _confirmDelete(BuildContext context, WidgetRef ref, SessionSummary session) {
showDialog(
context: context,
builder: (ctx) => AlertDialog(
backgroundColor: AppColors.surface,
title: const Text('删除对话', style: TextStyle(color: AppColors.textPrimary)),
content: Text(
'确定要删除「${session.title}」吗?此操作不可恢复。',
style: const TextStyle(color: AppColors.textSecondary),
),
actions: [
TextButton(
onPressed: () => Navigator.of(ctx).pop(),
child: const Text('取消'),
),
TextButton(
onPressed: () {
Navigator.of(ctx).pop();
ref.read(chatProvider.notifier).deleteSession(session.id);
ref.invalidate(sessionListProvider);
},
style: TextButton.styleFrom(foregroundColor: AppColors.error),
child: const Text('删除'),
),
],
),
);
}
}
/// Individual session list tile with long-press menu.
class _SessionTile extends StatelessWidget {
final SessionSummary session;
final bool isActive;
final VoidCallback onTap;
final VoidCallback onDelete;
const _SessionTile({
required this.session,
required this.isActive,
required this.onTap,
required this.onDelete,
});
@override
Widget build(BuildContext context) {
return ListTile(
dense: true,
selected: isActive,
selectedTileColor: AppColors.surfaceLight.withOpacity(0.4),
shape: RoundedRectangleBorder(borderRadius: BorderRadius.circular(8)),
contentPadding: const EdgeInsets.symmetric(horizontal: 16, vertical: 2),
leading: Icon(
Icons.chat_bubble_outline,
size: 18,
color: isActive ? AppColors.primary : AppColors.textMuted,
),
title: Text(
session.title,
maxLines: 1,
overflow: TextOverflow.ellipsis,
style: TextStyle(
color: isActive ? AppColors.textPrimary : AppColors.textSecondary,
fontSize: 13,
fontWeight: isActive ? FontWeight.w600 : FontWeight.normal,
),
),
onTap: onTap,
onLongPress: () {
showModalBottomSheet(
context: context,
backgroundColor: AppColors.surface,
shape: const RoundedRectangleBorder(
borderRadius: BorderRadius.vertical(top: Radius.circular(16)),
),
builder: (ctx) => SafeArea(
child: Column(
mainAxisSize: MainAxisSize.min,
children: [
ListTile(
leading: const Icon(Icons.delete_outline, color: AppColors.error),
title: const Text('删除对话', style: TextStyle(color: AppColors.error)),
onTap: () {
Navigator.of(ctx).pop();
onDelete();
},
),
],
),
),
);
},
);
}
}

View File

@ -37,6 +37,9 @@ import { TenantAgentConfig } from './domain/entities/tenant-agent-config.entity'
import { AgentConfig } from './domain/entities/agent-config.entity'; import { AgentConfig } from './domain/entities/agent-config.entity';
import { AgentSkill } from './domain/entities/agent-skill.entity'; import { AgentSkill } from './domain/entities/agent-skill.entity';
import { HookScript } from './domain/entities/hook-script.entity'; import { HookScript } from './domain/entities/hook-script.entity';
import { ConversationMessage } from './domain/entities/conversation-message.entity';
import { MessageRepository } from './infrastructure/repositories/message.repository';
import { ConversationContextService } from './domain/services/conversation-context.service';
@Module({ @Module({
imports: [ imports: [
@ -45,6 +48,7 @@ import { HookScript } from './domain/entities/hook-script.entity';
TypeOrmModule.forFeature([ TypeOrmModule.forFeature([
AgentSession, AgentTask, CommandRecord, StandingOrderRef, AgentSession, AgentTask, CommandRecord, StandingOrderRef,
TenantAgentConfig, AgentConfig, AgentSkill, HookScript, TenantAgentConfig, AgentConfig, AgentSkill, HookScript,
ConversationMessage,
]), ]),
], ],
controllers: [ controllers: [
@ -62,8 +66,10 @@ import { HookScript } from './domain/entities/hook-script.entity';
SkillManagerService, SkillManagerService,
StandingOrderExtractorService, StandingOrderExtractorService,
AllowedToolsResolverService, AllowedToolsResolverService,
ConversationContextService,
SessionRepository, SessionRepository,
TaskRepository, TaskRepository,
MessageRepository,
TenantAgentConfigRepository, TenantAgentConfigRepository,
AgentConfigRepository, AgentConfigRepository,
AgentSkillRepository, AgentSkillRepository,

View File

@ -0,0 +1,34 @@
import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn } from 'typeorm';
@Entity('conversation_messages')
export class ConversationMessage {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Column({ type: 'varchar', length: 20 })
tenantId!: string;
@Column({ type: 'uuid' })
sessionId!: string;
@Column({ type: 'varchar', length: 20 })
role!: 'user' | 'assistant' | 'system';
@Column({ type: 'text' })
content!: string;
@Column({ type: 'jsonb', nullable: true })
toolCalls?: any[];
@Column({ type: 'jsonb', nullable: true })
toolResults?: any[];
@Column({ type: 'int', nullable: true })
tokenCount?: number;
@Column({ type: 'int' })
sequenceNumber!: number;
@CreateDateColumn({ type: 'timestamptz' })
createdAt!: Date;
}

View File

@ -4,7 +4,11 @@ export interface AgentEnginePort {
readonly engineType: AgentEngineType; readonly engineType: AgentEngineType;
executeTask(params: EngineTaskParams): AsyncGenerator<EngineStreamEvent>; executeTask(params: EngineTaskParams): AsyncGenerator<EngineStreamEvent>;
cancelTask(sessionId: string): Promise<void>; cancelTask(sessionId: string): Promise<void>;
continueSession(sessionId: string, message: string): AsyncGenerator<EngineStreamEvent>; continueSession(
sessionId: string,
message: string,
conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>,
): AsyncGenerator<EngineStreamEvent>;
healthCheck(): Promise<boolean>; healthCheck(): Promise<boolean>;
} }
@ -17,6 +21,11 @@ export interface EngineTaskParams {
maxBudgetUsd?: number; maxBudgetUsd?: number;
context?: Record<string, unknown>; context?: Record<string, unknown>;
skill?: { name: string; arguments: string }; skill?: { name: string; arguments: string };
/** Conversation history messages to prepend for multi-turn context. */
conversationHistory?: Array<{
role: 'user' | 'assistant';
content: string | any[];
}>;
} }
export type EngineStreamEvent = export type EngineStreamEvent =

View File

@ -0,0 +1,170 @@
import { Injectable, Logger } from '@nestjs/common';
import { MessageRepository } from '../../infrastructure/repositories/message.repository';
import { ConversationMessage } from '../entities/conversation-message.entity';
import { TenantContextService } from '@it0/common';
import * as crypto from 'crypto';
export interface AnthropicHistoryMessage {
role: 'user' | 'assistant';
content: string | any[];
}
@Injectable()
export class ConversationContextService {
private readonly logger = new Logger(ConversationContextService.name);
constructor(private readonly messageRepository: MessageRepository) {}
/**
* Save a user message to the conversation history.
*/
async saveUserMessage(sessionId: string, content: string): Promise<ConversationMessage> {
const tenantId = TenantContextService.getTenantId();
const sequenceNumber = await this.messageRepository.getNextSequenceNumber(sessionId);
const message = new ConversationMessage();
message.id = crypto.randomUUID();
message.tenantId = tenantId;
message.sessionId = sessionId;
message.role = 'user';
message.content = content;
message.tokenCount = this.estimateTokens(content);
message.sequenceNumber = sequenceNumber;
message.createdAt = new Date();
return this.messageRepository.save(message);
}
/**
* Save an assistant response to the conversation history.
*/
async saveAssistantMessage(
sessionId: string,
content: string,
toolCalls?: any[],
toolResults?: any[],
): Promise<ConversationMessage> {
const tenantId = TenantContextService.getTenantId();
const sequenceNumber = await this.messageRepository.getNextSequenceNumber(sessionId);
const message = new ConversationMessage();
message.id = crypto.randomUUID();
message.tenantId = tenantId;
message.sessionId = sessionId;
message.role = 'assistant';
message.content = content;
message.toolCalls = toolCalls;
message.toolResults = toolResults;
message.tokenCount = this.estimateTokens(content);
message.sequenceNumber = sequenceNumber;
message.createdAt = new Date();
return this.messageRepository.save(message);
}
/**
* Load conversation history for a session, formatted as Anthropic messages.
* Returns the most recent `maxMessages` messages in chronological order.
*/
async loadContext(
sessionId: string,
maxMessages = 20,
): Promise<AnthropicHistoryMessage[]> {
const messages = await this.messageRepository.findRecentBySessionId(
sessionId,
maxMessages,
);
if (messages.length === 0) return [];
const history: AnthropicHistoryMessage[] = [];
for (const msg of messages) {
if (msg.role === 'user') {
history.push({ role: 'user', content: msg.content });
} else if (msg.role === 'assistant') {
// If the assistant message has tool calls, build content blocks
if (msg.toolCalls && msg.toolCalls.length > 0) {
const contentBlocks: any[] = [];
if (msg.content) {
contentBlocks.push({ type: 'text', text: msg.content });
}
for (const tc of msg.toolCalls) {
contentBlocks.push({
type: 'tool_use',
id: tc.id,
name: tc.name,
input: tc.input,
});
}
history.push({ role: 'assistant', content: contentBlocks });
// Add tool results as the next user message
if (msg.toolResults && msg.toolResults.length > 0) {
history.push({
role: 'user',
content: msg.toolResults.map((tr: any) => ({
type: 'tool_result',
tool_use_id: tr.tool_use_id,
content: tr.content,
})),
});
}
} else {
history.push({ role: 'assistant', content: msg.content });
}
}
}
// Ensure messages alternate correctly — first message should be 'user'
if (history.length > 0 && history[0].role !== 'user') {
history.shift();
}
this.logger.log(
`Loaded ${history.length} context messages for session ${sessionId}`,
);
return history;
}
/**
* Build a text summary of conversation history for SDK engine prompt prefix.
*/
async buildPromptPrefix(
sessionId: string,
maxMessages = 20,
): Promise<string> {
const messages = await this.messageRepository.findRecentBySessionId(
sessionId,
maxMessages,
);
if (messages.length === 0) return '';
const lines: string[] = ['[Previous conversation]'];
for (const msg of messages) {
const role = msg.role === 'user' ? 'User' : 'Assistant';
// Truncate long messages in the prefix
const content = msg.content.length > 500
? msg.content.slice(0, 500) + '...'
: msg.content;
lines.push(`${role}: ${content}`);
}
lines.push('', '[Current request]');
return lines.join('\n');
}
/**
* Simple token estimation: ~4 characters per token for English,
* ~2 characters per token for CJK.
*/
private estimateTokens(content: string): number {
if (!content) return 0;
// Count CJK characters
const cjkCount = (content.match(/[\u4e00-\u9fff\u3040-\u30ff\uac00-\ud7af]/g) || []).length;
const nonCjkCount = content.length - cjkCount;
return Math.ceil(cjkCount / 2 + nonCjkCount / 4);
}
}

View File

@ -100,8 +100,14 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
try { try {
const { query } = await dynamicImport('@anthropic-ai/claude-agent-sdk'); const { query } = await dynamicImport('@anthropic-ai/claude-agent-sdk');
// Build prompt with conversation history prefix if available
const promptWithContext = this.buildPromptWithHistory(
params.prompt,
params.conversationHistory,
);
const sdkQuery = query({ const sdkQuery = query({
prompt: params.prompt, prompt: promptWithContext,
options: { options: {
systemPrompt: params.systemPrompt || undefined, systemPrompt: params.systemPrompt || undefined,
allowedTools: params.allowedTools?.length ? params.allowedTools : undefined, allowedTools: params.allowedTools?.length ? params.allowedTools : undefined,
@ -223,7 +229,11 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
} }
} }
async *continueSession(sessionId: string, message: string): AsyncGenerator<EngineStreamEvent> { async *continueSession(
sessionId: string,
message: string,
conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>,
): AsyncGenerator<EngineStreamEvent> {
const session = this.activeSessions.get(sessionId); const session = this.activeSessions.get(sessionId);
// If there's a pending approval, resolve it // If there's a pending approval, resolve it
@ -340,6 +350,33 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
return this.activeSessions.get(sessionId)?.sdkSessionId; return this.activeSessions.get(sessionId)?.sdkSessionId;
} }
/**
* Build a prompt that includes conversation history as a prefix.
* Used when SDK session resume is not available (e.g., after process restart).
*/
private buildPromptWithHistory(
prompt: string,
history?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>,
): string {
if (!history || history.length === 0) return prompt;
const lines: string[] = ['[Previous conversation]'];
for (const msg of history) {
const role = msg.role === 'user' ? 'User' : 'Assistant';
const content = typeof msg.content === 'string'
? msg.content
: JSON.stringify(msg.content);
// Truncate very long messages in the prefix
const truncated = content.length > 500
? content.slice(0, 500) + '...'
: content;
lines.push(`${role}: ${truncated}`);
}
lines.push('', '[Current request]', prompt);
return lines.join('\n');
}
private classifyToolRisk(toolName: string, toolInput: any): CommandRiskLevel { private classifyToolRisk(toolName: string, toolInput: any): CommandRiskLevel {
// Only classify Bash commands for risk; other tools are auto-allowed // Only classify Bash commands for risk; other tools are auto-allowed
if (toolName === 'Bash' && typeof toolInput?.command === 'string') { if (toolName === 'Bash' && typeof toolInput?.command === 'string') {

View File

@ -58,8 +58,9 @@ export class ClaudeApiEngine implements AgentEnginePort {
// Build tools array from allowedTools // Build tools array from allowedTools
const tools = this.buildToolDefinitions(params.allowedTools); const tools = this.buildToolDefinitions(params.allowedTools);
// Initialize conversation with user prompt // Initialize conversation with history + user prompt
const messages: AnthropicMessage[] = [ const messages: AnthropicMessage[] = [
...(params.conversationHistory || []),
{ role: 'user', content: params.prompt }, { role: 'user', content: params.prompt },
]; ];
@ -234,16 +235,18 @@ export class ClaudeApiEngine implements AgentEnginePort {
this.abortControllers.delete(sessionId); this.abortControllers.delete(sessionId);
} }
async *continueSession(sessionId: string, message: string): AsyncGenerator<EngineStreamEvent> { async *continueSession(
// For the API engine, continuing a session requires the full message history sessionId: string,
// which should be managed by the orchestrator layer. Here we treat it as a message: string,
// new single-turn interaction for simplicity. conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>,
): AsyncGenerator<EngineStreamEvent> {
yield* this.executeTask({ yield* this.executeTask({
sessionId, sessionId,
prompt: message, prompt: message,
systemPrompt: '', systemPrompt: '',
allowedTools: [], allowedTools: [],
maxTurns: 1, maxTurns: 10,
conversationHistory,
}); });
} }

View File

@ -74,7 +74,11 @@ export class ClaudeCodeCliEngine implements AgentEnginePort {
this.processes.delete(sessionId); this.processes.delete(sessionId);
} }
async *continueSession(sessionId: string, message: string): AsyncGenerator<EngineStreamEvent> { async *continueSession(
sessionId: string,
message: string,
_conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>,
): AsyncGenerator<EngineStreamEvent> {
const args = [ const args = [
'--session-id', sessionId, '--session-id', sessionId,
'--resume', '--resume',

View File

@ -42,6 +42,7 @@ export class CustomEngine implements AgentEnginePort {
async *continueSession( async *continueSession(
sessionId: string, sessionId: string,
message: string, message: string,
_conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>,
): AsyncGenerator<EngineStreamEvent> { ): AsyncGenerator<EngineStreamEvent> {
this.logger.log( this.logger.log(
`Custom engine continueSession called for session ${sessionId}`, `Custom engine continueSession called for session ${sessionId}`,

View File

@ -0,0 +1,68 @@
import { Injectable } from '@nestjs/common';
import { DataSource } from 'typeorm';
import { TenantAwareRepository } from '@it0/database';
import { ConversationMessage } from '../../domain/entities/conversation-message.entity';
@Injectable()
export class MessageRepository extends TenantAwareRepository<ConversationMessage> {
constructor(dataSource: DataSource) {
super(dataSource, ConversationMessage);
}
async findBySessionId(
sessionId: string,
limit = 20,
offset = 0,
): Promise<ConversationMessage[]> {
return this.withRepository((repo) =>
repo.find({
where: { sessionId } as any,
order: { sequenceNumber: 'ASC' },
skip: offset,
take: limit,
}),
);
}
/**
* Load the most recent N messages for a session, ordered by sequence number ASC.
*/
async findRecentBySessionId(
sessionId: string,
limit = 20,
): Promise<ConversationMessage[]> {
return this.withRepository(async (repo) => {
// Subquery approach: get the last N messages ordered DESC, then re-order ASC
const messages = await repo.find({
where: { sessionId } as any,
order: { sequenceNumber: 'DESC' },
take: limit,
});
return messages.reverse();
});
}
async countBySessionId(sessionId: string): Promise<number> {
return this.withRepository((repo) =>
repo.count({ where: { sessionId } as any }),
);
}
async getNextSequenceNumber(sessionId: string): Promise<number> {
return this.withRepository(async (repo) => {
const result = await repo
.createQueryBuilder('msg')
.select('COALESCE(MAX(msg.sequence_number), 0)', 'maxSeq')
.where('msg.session_id = :sessionId', { sessionId })
.getRawOne();
return (parseInt(result?.maxSeq ?? '0', 10)) + 1;
});
}
async deleteBySessionId(sessionId: string): Promise<void> {
await this.withRepository(async (repo) => {
await repo.delete({ sessionId } as any);
return undefined as any;
});
}
}

View File

@ -4,6 +4,7 @@ import { EngineRegistry } from '../../../infrastructure/engines/engine-registry'
import { AgentStreamGateway } from '../../ws/agent-stream.gateway'; import { AgentStreamGateway } from '../../ws/agent-stream.gateway';
import { SessionRepository } from '../../../infrastructure/repositories/session.repository'; import { SessionRepository } from '../../../infrastructure/repositories/session.repository';
import { TaskRepository } from '../../../infrastructure/repositories/task.repository'; import { TaskRepository } from '../../../infrastructure/repositories/task.repository';
import { ConversationContextService } from '../../../domain/services/conversation-context.service';
import { AgentSession } from '../../../domain/entities/agent-session.entity'; import { AgentSession } from '../../../domain/entities/agent-session.entity';
import { AgentTask } from '../../../domain/entities/agent-task.entity'; import { AgentTask } from '../../../domain/entities/agent-task.entity';
import { TaskStatus } from '../../../domain/value-objects/task-status.vo'; import { TaskStatus } from '../../../domain/value-objects/task-status.vo';
@ -19,27 +20,41 @@ export class AgentController {
private readonly gateway: AgentStreamGateway, private readonly gateway: AgentStreamGateway,
private readonly sessionRepository: SessionRepository, private readonly sessionRepository: SessionRepository,
private readonly taskRepository: TaskRepository, private readonly taskRepository: TaskRepository,
private readonly contextService: ConversationContextService,
) {} ) {}
// TODO 10: Execute task
@Post('tasks') @Post('tasks')
async executeTask( async executeTask(
@TenantId() tenantId: string, @TenantId() tenantId: string,
@Body() body: { prompt: string; systemPrompt?: string; maxTurns?: number; allowedTools?: string[]; engineType?: string }, @Body() body: {
prompt: string;
sessionId?: string;
systemPrompt?: string;
maxTurns?: number;
allowedTools?: string[];
engineType?: string;
maxContextMessages?: number;
},
) { ) {
// Allow callers to override the engine (e.g. voice uses claude_api for streaming) // Allow callers to override the engine (e.g. voice uses claude_api for streaming)
const engine = body.engineType const engine = body.engineType
? this.engineRegistry.switchEngine(body.engineType as AgentEngineType) ? this.engineRegistry.switchEngine(body.engineType as AgentEngineType)
: this.engineRegistry.getActiveEngine(); : this.engineRegistry.getActiveEngine();
const session = new AgentSession(); // Reuse existing session or create new one
session.id = crypto.randomUUID(); let session: AgentSession;
session.tenantId = tenantId; if (body.sessionId) {
session.engineType = engine.engineType; const existing = await this.sessionRepository.findById(body.sessionId);
if (existing && existing.status === 'active') {
session = existing;
} else {
session = this.createNewSession(tenantId, engine.engineType, body.systemPrompt);
}
} else {
session = this.createNewSession(tenantId, engine.engineType, body.systemPrompt);
}
// Keep session active for multi-turn
session.status = 'active'; session.status = 'active';
session.systemPrompt = body.systemPrompt;
session.metadata = {};
session.createdAt = new Date();
session.updatedAt = new Date(); session.updatedAt = new Date();
await this.sessionRepository.save(session); await this.sessionRepository.save(session);
@ -53,25 +68,47 @@ export class AgentController {
task.createdAt = new Date(); task.createdAt = new Date();
await this.taskRepository.save(task); await this.taskRepository.save(task);
// Save user message to conversation history
await this.contextService.saveUserMessage(session.id, body.prompt);
// Load conversation history for context
const maxCtx = body.maxContextMessages ?? 20;
const conversationHistory = await this.contextService.loadContext(session.id, maxCtx);
this.logger.log(`[Task ${task.id}] Loaded ${conversationHistory.length} history messages for session=${session.id}`);
// Fire-and-forget: iterate engine stream and emit events via gateway // Fire-and-forget: iterate engine stream and emit events via gateway
(async () => { (async () => {
try { try {
this.logger.log(`[Task ${task.id}] Starting engine stream for session=${session.id}, prompt="${body.prompt.slice(0, 80)}"`); this.logger.log(`[Task ${task.id}] Starting engine stream for session=${session.id}, prompt="${body.prompt.slice(0, 80)}"`);
// Pass conversation history (excluding the current user message, which is the last one)
// loadContext returns all messages including the one we just saved,
// so we pass the history minus the last user message (it will be added by the engine as params.prompt)
const historyForEngine = conversationHistory.slice(0, -1);
const stream = engine.executeTask({ const stream = engine.executeTask({
sessionId: session.id, sessionId: session.id,
prompt: body.prompt, prompt: body.prompt,
systemPrompt: body.systemPrompt || '', systemPrompt: body.systemPrompt || '',
allowedTools: body.allowedTools || [], allowedTools: body.allowedTools || [],
maxTurns: body.maxTurns || 10, maxTurns: body.maxTurns || 10,
conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined,
}); });
let eventCount = 0; let eventCount = 0;
let finished = false; let finished = false;
const textParts: string[] = [];
for await (const event of stream) { for await (const event of stream) {
eventCount++; eventCount++;
this.logger.log(`[Task ${task.id}] Event #${eventCount}: type=${event.type}${event.type === 'text' ? ` len=${(event as any).content?.length}` : ''}${event.type === 'error' ? ` msg=${(event as any).message}` : ''}`); this.logger.log(`[Task ${task.id}] Event #${eventCount}: type=${event.type}${event.type === 'text' ? ` len=${(event as any).content?.length}` : ''}${event.type === 'error' ? ` msg=${(event as any).message}` : ''}`);
this.gateway.emitStreamEvent(session.id, event); this.gateway.emitStreamEvent(session.id, event);
// Collect text for assistant message
if (event.type === 'text') {
textParts.push(event.content);
}
if (event.type === 'completed' && !finished) { if (event.type === 'completed' && !finished) {
finished = true; finished = true;
task.status = TaskStatus.COMPLETED; task.status = TaskStatus.COMPLETED;
@ -80,7 +117,13 @@ export class AgentController {
task.completedAt = new Date(); task.completedAt = new Date();
await this.taskRepository.save(task); await this.taskRepository.save(task);
session.status = 'completed'; // Save assistant response to conversation history
const assistantText = textParts.join('') || event.summary;
if (assistantText) {
await this.contextService.saveAssistantMessage(session.id, assistantText);
}
// Keep session active (don't mark completed) so it can be reused
session.updatedAt = new Date(); session.updatedAt = new Date();
await this.sessionRepository.save(session); await this.sessionRepository.save(session);
} }
@ -125,7 +168,6 @@ export class AgentController {
return { sessionId: session.id, taskId: task.id }; return { sessionId: session.id, taskId: task.id };
} }
// TODO 11: Cancel task
@Delete('tasks/:taskId') @Delete('tasks/:taskId')
async cancelTask(@Param('taskId') taskId: string) { async cancelTask(@Param('taskId') taskId: string) {
const task = await this.taskRepository.findById(taskId); const task = await this.taskRepository.findById(taskId);
@ -152,7 +194,6 @@ export class AgentController {
return { message: 'Task cancelled', taskId }; return { message: 'Task cancelled', taskId };
} }
// TODO 12: Approve command
@Post('tasks/:taskId/approve') @Post('tasks/:taskId/approve')
async approveCommand(@Param('taskId') taskId: string, @Body() body: { approved: boolean }) { async approveCommand(@Param('taskId') taskId: string, @Body() body: { approved: boolean }) {
const task = await this.taskRepository.findById(taskId); const task = await this.taskRepository.findById(taskId);
@ -189,7 +230,6 @@ export class AgentController {
task.completedAt = new Date(); task.completedAt = new Date();
await this.taskRepository.save(task); await this.taskRepository.save(task);
session.status = 'completed';
session.updatedAt = new Date(); session.updatedAt = new Date();
await this.sessionRepository.save(session); await this.sessionRepository.save(session);
} }
@ -238,17 +278,28 @@ export class AgentController {
} }
} }
// TODO 13: List engines
@Get('engines') @Get('engines')
async listEngines() { async listEngines() {
const engines = this.engineRegistry.listAvailable(); const engines = this.engineRegistry.listAvailable();
return { engines }; return { engines };
} }
// TODO 14: Switch engine
@Post('engines/switch') @Post('engines/switch')
async switchEngine(@Body() body: { engineType: string }) { async switchEngine(@Body() body: { engineType: string }) {
const engine = this.engineRegistry.switchEngine(body.engineType as AgentEngineType); const engine = this.engineRegistry.switchEngine(body.engineType as AgentEngineType);
return { message: 'Engine switched', engineType: engine.engineType }; return { message: 'Engine switched', engineType: engine.engineType };
} }
private createNewSession(tenantId: string, engineType: string, systemPrompt?: string): AgentSession {
const session = new AgentSession();
session.id = crypto.randomUUID();
session.tenantId = tenantId;
session.engineType = engineType;
session.status = 'active';
session.systemPrompt = systemPrompt;
session.metadata = {};
session.createdAt = new Date();
session.updatedAt = new Date();
return session;
}
} }

View File

@ -1,22 +1,22 @@
import { Controller, Get, Param, NotFoundException } from '@nestjs/common'; import { Controller, Get, Delete, Param, Query, NotFoundException } from '@nestjs/common';
import { TenantId } from '@it0/common'; import { TenantId } from '@it0/common';
import { SessionRepository } from '../../../infrastructure/repositories/session.repository'; import { SessionRepository } from '../../../infrastructure/repositories/session.repository';
import { TaskRepository } from '../../../infrastructure/repositories/task.repository'; import { TaskRepository } from '../../../infrastructure/repositories/task.repository';
import { MessageRepository } from '../../../infrastructure/repositories/message.repository';
@Controller('api/v1/agent/sessions') @Controller('api/v1/agent/sessions')
export class SessionController { export class SessionController {
constructor( constructor(
private readonly sessionRepository: SessionRepository, private readonly sessionRepository: SessionRepository,
private readonly taskRepository: TaskRepository, private readonly taskRepository: TaskRepository,
private readonly messageRepository: MessageRepository,
) {} ) {}
// TODO 15: List sessions
@Get() @Get()
async listSessions(@TenantId() tenantId: string) { async listSessions(@TenantId() tenantId: string) {
return this.sessionRepository.findByTenant(tenantId); return this.sessionRepository.findByTenant(tenantId);
} }
// TODO 16: Get session details
@Get(':sessionId') @Get(':sessionId')
async getSession(@Param('sessionId') sessionId: string) { async getSession(@Param('sessionId') sessionId: string) {
const session = await this.sessionRepository.findById(sessionId); const session = await this.sessionRepository.findById(sessionId);
@ -26,9 +26,65 @@ export class SessionController {
return session; return session;
} }
// TODO 17: Get session history
@Get(':sessionId/history') @Get(':sessionId/history')
async getSessionHistory(@Param('sessionId') sessionId: string) { async getSessionHistory(@Param('sessionId') sessionId: string) {
return this.taskRepository.findBySessionId(sessionId); return this.taskRepository.findBySessionId(sessionId);
} }
@Get(':sessionId/messages')
async getSessionMessages(
@Param('sessionId') sessionId: string,
@Query('limit') limit?: string,
@Query('offset') offset?: string,
) {
const session = await this.sessionRepository.findById(sessionId);
if (!session) {
throw new NotFoundException(`Session ${sessionId} not found`);
}
const messages = await this.messageRepository.findBySessionId(
sessionId,
limit ? parseInt(limit, 10) : 50,
offset ? parseInt(offset, 10) : 0,
);
const total = await this.messageRepository.countBySessionId(sessionId);
return {
messages,
total,
sessionId,
};
}
@Delete(':sessionId/messages')
async clearSessionMessages(@Param('sessionId') sessionId: string) {
const session = await this.sessionRepository.findById(sessionId);
if (!session) {
throw new NotFoundException(`Session ${sessionId} not found`);
}
await this.messageRepository.deleteBySessionId(sessionId);
return { message: 'Conversation history cleared', sessionId };
}
@Delete(':sessionId')
async deleteSession(@Param('sessionId') sessionId: string) {
const session = await this.sessionRepository.findById(sessionId);
if (!session) {
throw new NotFoundException(`Session ${sessionId} not found`);
}
// Delete messages first (cascade should handle this, but be explicit)
await this.messageRepository.deleteBySessionId(sessionId);
// Delete tasks
const tasks = await this.taskRepository.findBySessionId(sessionId);
for (const task of tasks) {
await this.taskRepository.remove(task);
}
// Delete session
await this.sessionRepository.remove(session);
return { message: 'Session deleted', sessionId };
}
} }

View File

@ -330,6 +330,25 @@ CREATE INDEX idx_api_keys_hash ON api_keys(key_hash);
-- Agent Service - Additional Tables -- Agent Service - Additional Tables
-- ========================================================================= -- =========================================================================
-- Conversation Messages (multi-turn dialogue history per session)
CREATE TABLE conversation_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE,
role VARCHAR(20) NOT NULL,
content TEXT NOT NULL,
tool_calls JSONB,
tool_results JSONB,
token_count INTEGER,
sequence_number INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_conversation_messages_session
ON conversation_messages(session_id, sequence_number ASC);
CREATE INDEX idx_conversation_messages_session_recent
ON conversation_messages(session_id, sequence_number DESC);
-- Agent Tasks -- Agent Tasks
CREATE TABLE agent_tasks ( CREATE TABLE agent_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

View File

@ -0,0 +1,24 @@
-- IT0 Migration 004: Add conversation_messages table for multi-turn context management
-- This table stores conversation history per session, shared across all engine types.
-- Usage: Replace {SCHEMA} with target schema name (e.g., it0_t_t001)
SET search_path TO {SCHEMA};
-- Conversation Messages (multi-turn dialogue history per session)
CREATE TABLE IF NOT EXISTS conversation_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE,
role VARCHAR(20) NOT NULL,
content TEXT NOT NULL,
tool_calls JSONB,
tool_results JSONB,
token_count INTEGER,
sequence_number INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_conversation_messages_session
ON conversation_messages(session_id, sequence_number ASC);
CREATE INDEX IF NOT EXISTS idx_conversation_messages_session_recent
ON conversation_messages(session_id, sequence_number DESC);

View File

@ -86,6 +86,13 @@ async function runTenantSchema(client: Client, tenantId: string) {
{ '{SCHEMA}': schemaName }, { '{SCHEMA}': schemaName },
); );
// Run conversation messages table (004)
await runSqlFile(
client,
path.join(MIGRATIONS_DIR, '004-add-conversation-messages.sql'),
{ '{SCHEMA}': schemaName },
);
log(`Tenant schema ${schemaName} ready.`); log(`Tenant schema ${schemaName} ready.`);
} }