diff --git a/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart b/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart index 9fa198c..c879aed 100644 --- a/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart +++ b/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart @@ -118,9 +118,21 @@ class ChatRemoteDatasource { ); } - /// Cancels an active agent task in a session. - Future cancelTask(String sessionId) async { - await _dio.post('${ApiEndpoints.sessions}/$sessionId/cancel'); + /// Cancels an active agent task by taskId. + Future cancelTask(String taskId) async { + await _dio.delete('${ApiEndpoints.tasks}/$taskId'); + } + + /// Injects a message into an active task, cancelling current work and starting new task. + Future> injectMessage({ + required String taskId, + required String message, + }) async { + final response = await _dio.post( + '${ApiEndpoints.tasks}/$taskId/inject', + data: {'message': message}, + ); + return response.data as Map; } /// Confirms a standing order draft proposed by the agent. diff --git a/it0_app/lib/features/chat/data/models/stream_event_model.dart b/it0_app/lib/features/chat/data/models/stream_event_model.dart index c25e85b..247ff0a 100644 --- a/it0_app/lib/features/chat/data/models/stream_event_model.dart +++ b/it0_app/lib/features/chat/data/models/stream_event_model.dart @@ -80,6 +80,14 @@ class StreamEventModel { data['orderName'] as String? ?? data['order_name'] as String? ?? '', ); + case 'task_info': + return TaskInfoEvent( + data['taskId'] as String? ?? data['task_id'] as String? ?? '', + ); + + case 'cancelled': + return CancelledEvent(); + default: // Fall back to text event for unknown types return TextEvent( diff --git a/it0_app/lib/features/chat/data/repositories/chat_repository_impl.dart b/it0_app/lib/features/chat/data/repositories/chat_repository_impl.dart index aecdf0a..673ad0f 100644 --- a/it0_app/lib/features/chat/data/repositories/chat_repository_impl.dart +++ b/it0_app/lib/features/chat/data/repositories/chat_repository_impl.dart @@ -41,8 +41,9 @@ class ChatRepositoryImpl implements ChatRepository { sessionId; final taskId = response['taskId'] as String? ?? response['task_id'] as String?; - // Emit the real sessionId so the notifier can capture it + // Emit the real sessionId and taskId so the notifier can capture them yield SessionInfoEvent(returnedSessionId); + if (taskId != null) yield TaskInfoEvent(taskId); // Connect to the agent WebSocket and subscribe to the session final token = await _getAccessToken(); @@ -94,8 +95,9 @@ class ChatRepositoryImpl implements ChatRepository { sessionId; final taskId = response['taskId'] as String? ?? response['task_id'] as String?; - // Emit the real sessionId so the notifier can capture it + // Emit the real sessionId and taskId so the notifier can capture them yield SessionInfoEvent(returnedSessionId); + if (taskId != null) yield TaskInfoEvent(taskId); final voiceToken = await _getAccessToken(); await _webSocketClient.connect('/ws/agent', token: voiceToken); @@ -153,8 +155,52 @@ class ChatRepositoryImpl implements ChatRepository { } @override - Future cancelTask(String sessionId) async { - await _remoteDatasource.cancelTask(sessionId); + Future cancelTask(String taskId) async { + await _remoteDatasource.cancelTask(taskId); + } + + @override + Stream injectMessage({ + required String taskId, + required String message, + }) async* { + // Call inject API — this cancels the current task and starts a new one + final response = await _remoteDatasource.injectMessage( + taskId: taskId, + message: message, + ); + + final newTaskId = response['taskId'] as String? ?? response['task_id'] as String?; + if (newTaskId != null) { + yield TaskInfoEvent(newTaskId); + } + + // The WebSocket is already connected and subscribed to this session. + // New events from the injected task will flow through the existing connection. + yield* _webSocketClient.messages.transform( + StreamTransformer, StreamEvent>.fromHandlers( + handleData: (msg, sink) { + final event = msg['event'] as String? ?? msg['type'] as String? ?? ''; + + if (event == 'stream_event' || event == 'message') { + final data = msg['data'] as Map? ?? msg; + final model = StreamEventModel.fromJson(data); + final entity = model.toEntity(); + // Skip cancelled events from the old task during inject + if (entity is CancelledEvent) return; + sink.add(entity); + } else if (event == 'stream_end' || event == 'done' || event == 'complete') { + final summary = msg['summary'] as String? ?? ''; + sink.add(CompletedEvent(summary)); + sink.close(); + } else if (event == 'error') { + final message = msg['message'] as String? ?? '流式传输错误'; + sink.add(ErrorEvent(message)); + sink.close(); + } + }, + ), + ); } @override diff --git a/it0_app/lib/features/chat/domain/entities/chat_message.dart b/it0_app/lib/features/chat/domain/entities/chat_message.dart index 56e1534..9dd98d3 100644 --- a/it0_app/lib/features/chat/domain/entities/chat_message.dart +++ b/it0_app/lib/features/chat/domain/entities/chat_message.dart @@ -1,6 +1,6 @@ enum MessageRole { user, assistant, system } -enum MessageType { text, toolUse, toolResult, approval, thinking, standingOrderDraft } +enum MessageType { text, toolUse, toolResult, approval, thinking, standingOrderDraft, interrupted } enum ToolStatus { executing, completed, error, blocked, awaitingApproval } diff --git a/it0_app/lib/features/chat/domain/entities/stream_event.dart b/it0_app/lib/features/chat/domain/entities/stream_event.dart index 8d837a7..a744d91 100644 --- a/it0_app/lib/features/chat/domain/entities/stream_event.dart +++ b/it0_app/lib/features/chat/domain/entities/stream_event.dart @@ -56,3 +56,14 @@ class SessionInfoEvent extends StreamEvent { final String sessionId; SessionInfoEvent(this.sessionId); } + +/// Carries the taskId assigned by the backend for cancel tracking. +class TaskInfoEvent extends StreamEvent { + final String taskId; + TaskInfoEvent(this.taskId); +} + +/// Emitted when a task is cancelled (by user or by injection). +class CancelledEvent extends StreamEvent { + CancelledEvent(); +} diff --git a/it0_app/lib/features/chat/domain/repositories/chat_repository.dart b/it0_app/lib/features/chat/domain/repositories/chat_repository.dart index 040e31a..e989831 100644 --- a/it0_app/lib/features/chat/domain/repositories/chat_repository.dart +++ b/it0_app/lib/features/chat/domain/repositories/chat_repository.dart @@ -24,8 +24,14 @@ abstract class ChatRepository { /// Rejects a pending command approval request with an optional reason. Future rejectCommand(String taskId, {String? reason}); - /// Cancels an active agent task within a session. - Future cancelTask(String sessionId); + /// Cancels an active agent task by taskId. + Future cancelTask(String taskId); + + /// Injects a message while agent is working, returns a stream of events for the new task. + Stream injectMessage({ + required String taskId, + required String message, + }); /// Confirms a standing order draft proposed by the agent. Future confirmStandingOrder( diff --git a/it0_app/lib/features/chat/domain/usecases/cancel_task.dart b/it0_app/lib/features/chat/domain/usecases/cancel_task.dart index 287666f..a46abd9 100644 --- a/it0_app/lib/features/chat/domain/usecases/cancel_task.dart +++ b/it0_app/lib/features/chat/domain/usecases/cancel_task.dart @@ -5,7 +5,7 @@ class CancelTask { CancelTask(this._repository); - Future execute(String sessionId) { - return _repository.cancelTask(sessionId); + Future execute(String taskId) { + return _repository.cancelTask(taskId); } } diff --git a/it0_app/lib/features/chat/presentation/pages/chat_page.dart b/it0_app/lib/features/chat/presentation/pages/chat_page.dart index 384ea5e..ee12e17 100644 --- a/it0_app/lib/features/chat/presentation/pages/chat_page.dart +++ b/it0_app/lib/features/chat/presentation/pages/chat_page.dart @@ -34,6 +34,14 @@ class _ChatPageState extends ConsumerState { _scrollToBottom(); } + void _inject() { + final text = _messageController.text.trim(); + if (text.isEmpty) return; + _messageController.clear(); + ref.read(chatProvider.notifier).injectMessage(text); + _scrollToBottom(); + } + void _scrollToBottom() { WidgetsBinding.instance.addPostFrameCallback((_) { if (_scrollController.hasClients) { @@ -160,6 +168,15 @@ class _ChatPageState extends ConsumerState { ), ); + case MessageType.interrupted: + return TimelineEventNode( + status: NodeStatus.warning, + label: message.content, + isFirst: isFirst, + isLast: isLast, + icon: Icons.cancel_outlined, + ); + case MessageType.text: default: return TimelineEventNode( @@ -321,8 +338,8 @@ class _ChatPageState extends ConsumerState { } Widget _buildInputArea(ChatState chatState) { - final isDisabled = chatState.isStreaming || - chatState.agentStatus == AgentStatus.awaitingApproval; + final isAwaitingApproval = chatState.agentStatus == AgentStatus.awaitingApproval; + final isStreaming = chatState.isStreaming && !isAwaitingApproval; return Container( padding: const EdgeInsets.all(8), @@ -335,29 +352,40 @@ class _ChatPageState extends ConsumerState { Expanded( child: TextField( controller: _messageController, - decoration: const InputDecoration( - hintText: '输入指令...', - border: OutlineInputBorder( + decoration: InputDecoration( + hintText: isStreaming ? '追加指令...' : '输入指令...', + border: const OutlineInputBorder( borderRadius: BorderRadius.all(Radius.circular(24)), ), - contentPadding: EdgeInsets.symmetric(horizontal: 16, vertical: 10), + contentPadding: const EdgeInsets.symmetric(horizontal: 16, vertical: 10), ), textInputAction: TextInputAction.send, - onSubmitted: (_) => _send(), - enabled: !isDisabled, + onSubmitted: (_) => isStreaming ? _inject() : _send(), + enabled: !isAwaitingApproval, ), ), const SizedBox(width: 8), - if (chatState.isStreaming) - IconButton( - icon: const Icon(Icons.stop_circle_outlined, color: AppColors.error), - tooltip: '停止', - onPressed: () => ref.read(chatProvider.notifier).cancelCurrentTask(), + if (isStreaming) + // During streaming: show both inject-send and stop buttons + Row( + mainAxisSize: MainAxisSize.min, + children: [ + IconButton( + icon: const Icon(Icons.send, color: AppColors.info), + tooltip: '追加指令', + onPressed: _inject, + ), + IconButton( + icon: const Icon(Icons.stop_circle_outlined, color: AppColors.error), + tooltip: '停止', + onPressed: () => ref.read(chatProvider.notifier).cancelCurrentTask(), + ), + ], ) else IconButton( icon: const Icon(Icons.send), - onPressed: isDisabled ? null : _send, + onPressed: isAwaitingApproval ? null : _send, ), ], ), diff --git a/it0_app/lib/features/chat/presentation/providers/chat_providers.dart b/it0_app/lib/features/chat/presentation/providers/chat_providers.dart index ca8a8c0..de5d3d3 100644 --- a/it0_app/lib/features/chat/presentation/providers/chat_providers.dart +++ b/it0_app/lib/features/chat/presentation/providers/chat_providers.dart @@ -86,12 +86,14 @@ class ChatState { final List messages; final AgentStatus agentStatus; final String? sessionId; + final String? taskId; final String? error; const ChatState({ this.messages = const [], this.agentStatus = AgentStatus.idle, this.sessionId, + this.taskId, this.error, }); @@ -101,12 +103,15 @@ class ChatState { List? messages, AgentStatus? agentStatus, String? sessionId, + String? taskId, String? error, + bool clearTaskId = false, }) { return ChatState( messages: messages ?? this.messages, agentStatus: agentStatus ?? this.agentStatus, sessionId: sessionId ?? this.sessionId, + taskId: clearTaskId ? null : (taskId ?? this.taskId), error: error, ); } @@ -275,12 +280,14 @@ class ChatNotifier extends StateNotifier { state = state.copyWith( messages: finalMessages, agentStatus: AgentStatus.idle, + clearTaskId: true, ); case ErrorEvent(:final message): state = state.copyWith( agentStatus: AgentStatus.error, error: message, + clearTaskId: true, ); case StandingOrderDraftEvent(:final draft): @@ -313,6 +320,16 @@ class ChatNotifier extends StateNotifier { case SessionInfoEvent(:final sessionId): // Capture the real sessionId from the backend for multi-turn reuse state = state.copyWith(sessionId: sessionId); + + case TaskInfoEvent(:final taskId): + // Capture the taskId for cancel/inject tracking + state = state.copyWith(taskId: taskId); + + case CancelledEvent(): + // Backend confirmed cancellation — if UI hasn't already gone idle, do it now + if (state.agentStatus != AgentStatus.idle) { + state = state.copyWith(agentStatus: AgentStatus.idle, clearTaskId: true); + } } } @@ -411,14 +428,123 @@ class ChatNotifier extends StateNotifier { } Future cancelCurrentTask() async { - if (state.sessionId == null) return; + final taskId = state.taskId; + if (taskId == null && state.sessionId == null) return; + + // 1. IMMEDIATELY update UI — optimistic cancel + _eventSubscription?.cancel(); + _eventSubscription = null; + + // Mark any executing tools as completed + final updatedMessages = state.messages.map((m) { + if (m.type == MessageType.toolUse && + m.toolExecution?.status == ToolStatus.executing) { + return m.copyWith( + toolExecution: m.toolExecution!.copyWith(status: ToolStatus.completed), + ); + } + return m; + }).toList(); + + // Add interrupted marker to timeline + final interruptedMsg = ChatMessage( + id: '${DateTime.now().microsecondsSinceEpoch}_interrupted', + role: MessageRole.assistant, + content: '已中断', + timestamp: DateTime.now(), + type: MessageType.interrupted, + ); + + state = state.copyWith( + agentStatus: AgentStatus.idle, + messages: [...updatedMessages, interruptedMsg], + clearTaskId: true, + error: null, + ); + + // 2. Fire-and-forget backend cancel + if (taskId != null) { + try { + final useCase = _ref.read(cancelTaskUseCaseProvider); + await useCase.execute(taskId); + } catch (_) { + // Ignore — UI is already updated + } + } + } + + /// Injects a message while the agent is actively working. + /// Cancels the current task and starts a new one with the injected message. + Future injectMessage(String message) async { + if (message.trim().isEmpty) return; + final taskId = state.taskId; + if (taskId == null) { + // Fallback: if no active task, treat as normal send + return sendMessage(message); + } + + // 1. Cancel current event subscription + _eventSubscription?.cancel(); + _eventSubscription = null; + + // 2. Mark executing tools as completed + final updatedMessages = state.messages.map((m) { + if (m.type == MessageType.toolUse && + m.toolExecution?.status == ToolStatus.executing) { + return m.copyWith( + toolExecution: m.toolExecution!.copyWith(status: ToolStatus.completed), + ); + } + return m; + }).toList(); + + // 3. Add interrupted marker + user message to timeline + final interruptedMsg = ChatMessage( + id: '${DateTime.now().microsecondsSinceEpoch}_interrupted', + role: MessageRole.assistant, + content: '已中断 (用户追加指令)', + timestamp: DateTime.now(), + type: MessageType.interrupted, + ); + final userMsg = ChatMessage( + id: DateTime.now().microsecondsSinceEpoch.toString(), + role: MessageRole.user, + content: message, + timestamp: DateTime.now(), + type: MessageType.text, + ); + + state = state.copyWith( + messages: [...updatedMessages, interruptedMsg, userMsg], + agentStatus: AgentStatus.thinking, + clearTaskId: true, + error: null, + ); + + // 4. Call the inject API and listen to new event stream try { - final useCase = _ref.read(cancelTaskUseCaseProvider); - await useCase.execute(state.sessionId!); - _eventSubscription?.cancel(); - state = state.copyWith(agentStatus: AgentStatus.idle); + final repo = _ref.read(chatRepositoryProvider); + final stream = repo.injectMessage(taskId: taskId, message: message); + + _eventSubscription = stream.listen( + (event) => _handleStreamEvent(event), + onError: (error) { + state = state.copyWith( + agentStatus: AgentStatus.error, + error: ErrorHandler.friendlyMessage(error), + ); + }, + onDone: () { + if (state.agentStatus != AgentStatus.error) { + state = state.copyWith(agentStatus: AgentStatus.idle); + } + }, + ); } catch (e) { - state = state.copyWith(error: '取消失败: $e'); + state = state.copyWith( + agentStatus: AgentStatus.error, + error: ErrorHandler.friendlyMessage(e), + ); } } diff --git a/packages/services/agent-service/src/domain/ports/outbound/agent-engine.port.ts b/packages/services/agent-service/src/domain/ports/outbound/agent-engine.port.ts index 5df48c6..0bbc294 100644 --- a/packages/services/agent-service/src/domain/ports/outbound/agent-engine.port.ts +++ b/packages/services/agent-service/src/domain/ports/outbound/agent-engine.port.ts @@ -37,4 +37,6 @@ export type EngineStreamEvent = | { type: 'tool_result'; toolName: string; output: string; isError: boolean } | { type: 'approval_required'; command: string; riskLevel: number; taskId: string } | { type: 'completed'; summary: string; tokensUsed?: number } - | { type: 'error'; message: string; code: string }; + | { type: 'error'; message: string; code: string } + | { type: 'cancelled'; message: string; code: string } + | { type: 'task_info'; taskId: string };