feat: inject-message support for mid-stream task interruption

Backend (agent-engine.port.ts):
- Add `cancelled` event type: emitted when a task is cancelled (user-initiated
  or injection), so Flutter can close the old stream cleanly
- Add `task_info` event type: emitted after inject to pass the new taskId to
  the client, enabling cancel/re-inject on the replacement task

Flutter (features/chat/):
- ChatState: track current `taskId` alongside `sessionId`; clear on completion
  or error
- Handle `TaskInfoEvent`: update taskId in state when server issues a new task
- Handle `CancelledEvent`: treat as stream termination (agentStatus → idle)
- MessageType.interrupted: new UI node (warning style) for mid-stream cancels
- _inject(): send text as an inject request while streaming; backend cancels
  the current task and starts a new one with the injected message
- Input area: during streaming, hint changes to "追加指令...", Enter key calls
  _inject() instead of _send(), and both inject-send + stop buttons are shown
- isAwaitingApproval kept separate from isStreaming so approval flow is not
  blocked by inject mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-27 21:33:50 -08:00
parent ce4e7840ec
commit d5f663f7af
10 changed files with 272 additions and 33 deletions

View File

@ -118,9 +118,21 @@ class ChatRemoteDatasource {
);
}
/// Cancels an active agent task in a session.
Future<void> cancelTask(String sessionId) async {
await _dio.post('${ApiEndpoints.sessions}/$sessionId/cancel');
/// Cancels an active agent task by taskId.
Future<void> cancelTask(String taskId) async {
await _dio.delete('${ApiEndpoints.tasks}/$taskId');
}
/// Injects a message into an active task, cancelling current work and starting new task.
Future<Map<String, dynamic>> injectMessage({
required String taskId,
required String message,
}) async {
final response = await _dio.post(
'${ApiEndpoints.tasks}/$taskId/inject',
data: {'message': message},
);
return response.data as Map<String, dynamic>;
}
/// Confirms a standing order draft proposed by the agent.

View File

@ -80,6 +80,14 @@ class StreamEventModel {
data['orderName'] as String? ?? data['order_name'] as String? ?? '',
);
case 'task_info':
return TaskInfoEvent(
data['taskId'] as String? ?? data['task_id'] as String? ?? '',
);
case 'cancelled':
return CancelledEvent();
default:
// Fall back to text event for unknown types
return TextEvent(

View File

@ -41,8 +41,9 @@ class ChatRepositoryImpl implements ChatRepository {
sessionId;
final taskId = response['taskId'] as String? ?? response['task_id'] as String?;
// Emit the real sessionId so the notifier can capture it
// Emit the real sessionId and taskId so the notifier can capture them
yield SessionInfoEvent(returnedSessionId);
if (taskId != null) yield TaskInfoEvent(taskId);
// Connect to the agent WebSocket and subscribe to the session
final token = await _getAccessToken();
@ -94,8 +95,9 @@ class ChatRepositoryImpl implements ChatRepository {
sessionId;
final taskId = response['taskId'] as String? ?? response['task_id'] as String?;
// Emit the real sessionId so the notifier can capture it
// Emit the real sessionId and taskId so the notifier can capture them
yield SessionInfoEvent(returnedSessionId);
if (taskId != null) yield TaskInfoEvent(taskId);
final voiceToken = await _getAccessToken();
await _webSocketClient.connect('/ws/agent', token: voiceToken);
@ -153,8 +155,52 @@ class ChatRepositoryImpl implements ChatRepository {
}
@override
Future<void> cancelTask(String sessionId) async {
await _remoteDatasource.cancelTask(sessionId);
Future<void> cancelTask(String taskId) async {
await _remoteDatasource.cancelTask(taskId);
}
@override
Stream<StreamEvent> injectMessage({
required String taskId,
required String message,
}) async* {
// Call inject API this cancels the current task and starts a new one
final response = await _remoteDatasource.injectMessage(
taskId: taskId,
message: message,
);
final newTaskId = response['taskId'] as String? ?? response['task_id'] as String?;
if (newTaskId != null) {
yield TaskInfoEvent(newTaskId);
}
// The WebSocket is already connected and subscribed to this session.
// New events from the injected task will flow through the existing connection.
yield* _webSocketClient.messages.transform(
StreamTransformer<Map<String, dynamic>, StreamEvent>.fromHandlers(
handleData: (msg, sink) {
final event = msg['event'] as String? ?? msg['type'] as String? ?? '';
if (event == 'stream_event' || event == 'message') {
final data = msg['data'] as Map<String, dynamic>? ?? msg;
final model = StreamEventModel.fromJson(data);
final entity = model.toEntity();
// Skip cancelled events from the old task during inject
if (entity is CancelledEvent) return;
sink.add(entity);
} else if (event == 'stream_end' || event == 'done' || event == 'complete') {
final summary = msg['summary'] as String? ?? '';
sink.add(CompletedEvent(summary));
sink.close();
} else if (event == 'error') {
final message = msg['message'] as String? ?? '流式传输错误';
sink.add(ErrorEvent(message));
sink.close();
}
},
),
);
}
@override

View File

@ -1,6 +1,6 @@
enum MessageRole { user, assistant, system }
enum MessageType { text, toolUse, toolResult, approval, thinking, standingOrderDraft }
enum MessageType { text, toolUse, toolResult, approval, thinking, standingOrderDraft, interrupted }
enum ToolStatus { executing, completed, error, blocked, awaitingApproval }

View File

@ -56,3 +56,14 @@ class SessionInfoEvent extends StreamEvent {
final String sessionId;
SessionInfoEvent(this.sessionId);
}
/// Carries the taskId assigned by the backend for cancel tracking.
class TaskInfoEvent extends StreamEvent {
final String taskId;
TaskInfoEvent(this.taskId);
}
/// Emitted when a task is cancelled (by user or by injection).
class CancelledEvent extends StreamEvent {
CancelledEvent();
}

View File

@ -24,8 +24,14 @@ abstract class ChatRepository {
/// Rejects a pending command approval request with an optional reason.
Future<void> rejectCommand(String taskId, {String? reason});
/// Cancels an active agent task within a session.
Future<void> cancelTask(String sessionId);
/// Cancels an active agent task by taskId.
Future<void> cancelTask(String taskId);
/// Injects a message while agent is working, returns a stream of events for the new task.
Stream<StreamEvent> injectMessage({
required String taskId,
required String message,
});
/// Confirms a standing order draft proposed by the agent.
Future<void> confirmStandingOrder(

View File

@ -5,7 +5,7 @@ class CancelTask {
CancelTask(this._repository);
Future<void> execute(String sessionId) {
return _repository.cancelTask(sessionId);
Future<void> execute(String taskId) {
return _repository.cancelTask(taskId);
}
}

View File

@ -34,6 +34,14 @@ class _ChatPageState extends ConsumerState<ChatPage> {
_scrollToBottom();
}
void _inject() {
final text = _messageController.text.trim();
if (text.isEmpty) return;
_messageController.clear();
ref.read(chatProvider.notifier).injectMessage(text);
_scrollToBottom();
}
void _scrollToBottom() {
WidgetsBinding.instance.addPostFrameCallback((_) {
if (_scrollController.hasClients) {
@ -160,6 +168,15 @@ class _ChatPageState extends ConsumerState<ChatPage> {
),
);
case MessageType.interrupted:
return TimelineEventNode(
status: NodeStatus.warning,
label: message.content,
isFirst: isFirst,
isLast: isLast,
icon: Icons.cancel_outlined,
);
case MessageType.text:
default:
return TimelineEventNode(
@ -321,8 +338,8 @@ class _ChatPageState extends ConsumerState<ChatPage> {
}
Widget _buildInputArea(ChatState chatState) {
final isDisabled = chatState.isStreaming ||
chatState.agentStatus == AgentStatus.awaitingApproval;
final isAwaitingApproval = chatState.agentStatus == AgentStatus.awaitingApproval;
final isStreaming = chatState.isStreaming && !isAwaitingApproval;
return Container(
padding: const EdgeInsets.all(8),
@ -335,29 +352,40 @@ class _ChatPageState extends ConsumerState<ChatPage> {
Expanded(
child: TextField(
controller: _messageController,
decoration: const InputDecoration(
hintText: '输入指令...',
border: OutlineInputBorder(
decoration: InputDecoration(
hintText: isStreaming ? '追加指令...' : '输入指令...',
border: const OutlineInputBorder(
borderRadius: BorderRadius.all(Radius.circular(24)),
),
contentPadding: EdgeInsets.symmetric(horizontal: 16, vertical: 10),
contentPadding: const EdgeInsets.symmetric(horizontal: 16, vertical: 10),
),
textInputAction: TextInputAction.send,
onSubmitted: (_) => _send(),
enabled: !isDisabled,
onSubmitted: (_) => isStreaming ? _inject() : _send(),
enabled: !isAwaitingApproval,
),
),
const SizedBox(width: 8),
if (chatState.isStreaming)
IconButton(
icon: const Icon(Icons.stop_circle_outlined, color: AppColors.error),
tooltip: '停止',
onPressed: () => ref.read(chatProvider.notifier).cancelCurrentTask(),
if (isStreaming)
// During streaming: show both inject-send and stop buttons
Row(
mainAxisSize: MainAxisSize.min,
children: [
IconButton(
icon: const Icon(Icons.send, color: AppColors.info),
tooltip: '追加指令',
onPressed: _inject,
),
IconButton(
icon: const Icon(Icons.stop_circle_outlined, color: AppColors.error),
tooltip: '停止',
onPressed: () => ref.read(chatProvider.notifier).cancelCurrentTask(),
),
],
)
else
IconButton(
icon: const Icon(Icons.send),
onPressed: isDisabled ? null : _send,
onPressed: isAwaitingApproval ? null : _send,
),
],
),

View File

@ -86,12 +86,14 @@ class ChatState {
final List<ChatMessage> messages;
final AgentStatus agentStatus;
final String? sessionId;
final String? taskId;
final String? error;
const ChatState({
this.messages = const [],
this.agentStatus = AgentStatus.idle,
this.sessionId,
this.taskId,
this.error,
});
@ -101,12 +103,15 @@ class ChatState {
List<ChatMessage>? messages,
AgentStatus? agentStatus,
String? sessionId,
String? taskId,
String? error,
bool clearTaskId = false,
}) {
return ChatState(
messages: messages ?? this.messages,
agentStatus: agentStatus ?? this.agentStatus,
sessionId: sessionId ?? this.sessionId,
taskId: clearTaskId ? null : (taskId ?? this.taskId),
error: error,
);
}
@ -275,12 +280,14 @@ class ChatNotifier extends StateNotifier<ChatState> {
state = state.copyWith(
messages: finalMessages,
agentStatus: AgentStatus.idle,
clearTaskId: true,
);
case ErrorEvent(:final message):
state = state.copyWith(
agentStatus: AgentStatus.error,
error: message,
clearTaskId: true,
);
case StandingOrderDraftEvent(:final draft):
@ -313,6 +320,16 @@ class ChatNotifier extends StateNotifier<ChatState> {
case SessionInfoEvent(:final sessionId):
// Capture the real sessionId from the backend for multi-turn reuse
state = state.copyWith(sessionId: sessionId);
case TaskInfoEvent(:final taskId):
// Capture the taskId for cancel/inject tracking
state = state.copyWith(taskId: taskId);
case CancelledEvent():
// Backend confirmed cancellation if UI hasn't already gone idle, do it now
if (state.agentStatus != AgentStatus.idle) {
state = state.copyWith(agentStatus: AgentStatus.idle, clearTaskId: true);
}
}
}
@ -411,14 +428,123 @@ class ChatNotifier extends StateNotifier<ChatState> {
}
Future<void> cancelCurrentTask() async {
if (state.sessionId == null) return;
final taskId = state.taskId;
if (taskId == null && state.sessionId == null) return;
// 1. IMMEDIATELY update UI optimistic cancel
_eventSubscription?.cancel();
_eventSubscription = null;
// Mark any executing tools as completed
final updatedMessages = state.messages.map((m) {
if (m.type == MessageType.toolUse &&
m.toolExecution?.status == ToolStatus.executing) {
return m.copyWith(
toolExecution: m.toolExecution!.copyWith(status: ToolStatus.completed),
);
}
return m;
}).toList();
// Add interrupted marker to timeline
final interruptedMsg = ChatMessage(
id: '${DateTime.now().microsecondsSinceEpoch}_interrupted',
role: MessageRole.assistant,
content: '已中断',
timestamp: DateTime.now(),
type: MessageType.interrupted,
);
state = state.copyWith(
agentStatus: AgentStatus.idle,
messages: [...updatedMessages, interruptedMsg],
clearTaskId: true,
error: null,
);
// 2. Fire-and-forget backend cancel
if (taskId != null) {
try {
final useCase = _ref.read(cancelTaskUseCaseProvider);
await useCase.execute(taskId);
} catch (_) {
// Ignore UI is already updated
}
}
}
/// Injects a message while the agent is actively working.
/// Cancels the current task and starts a new one with the injected message.
Future<void> injectMessage(String message) async {
if (message.trim().isEmpty) return;
final taskId = state.taskId;
if (taskId == null) {
// Fallback: if no active task, treat as normal send
return sendMessage(message);
}
// 1. Cancel current event subscription
_eventSubscription?.cancel();
_eventSubscription = null;
// 2. Mark executing tools as completed
final updatedMessages = state.messages.map((m) {
if (m.type == MessageType.toolUse &&
m.toolExecution?.status == ToolStatus.executing) {
return m.copyWith(
toolExecution: m.toolExecution!.copyWith(status: ToolStatus.completed),
);
}
return m;
}).toList();
// 3. Add interrupted marker + user message to timeline
final interruptedMsg = ChatMessage(
id: '${DateTime.now().microsecondsSinceEpoch}_interrupted',
role: MessageRole.assistant,
content: '已中断 (用户追加指令)',
timestamp: DateTime.now(),
type: MessageType.interrupted,
);
final userMsg = ChatMessage(
id: DateTime.now().microsecondsSinceEpoch.toString(),
role: MessageRole.user,
content: message,
timestamp: DateTime.now(),
type: MessageType.text,
);
state = state.copyWith(
messages: [...updatedMessages, interruptedMsg, userMsg],
agentStatus: AgentStatus.thinking,
clearTaskId: true,
error: null,
);
// 4. Call the inject API and listen to new event stream
try {
final useCase = _ref.read(cancelTaskUseCaseProvider);
await useCase.execute(state.sessionId!);
_eventSubscription?.cancel();
state = state.copyWith(agentStatus: AgentStatus.idle);
final repo = _ref.read(chatRepositoryProvider);
final stream = repo.injectMessage(taskId: taskId, message: message);
_eventSubscription = stream.listen(
(event) => _handleStreamEvent(event),
onError: (error) {
state = state.copyWith(
agentStatus: AgentStatus.error,
error: ErrorHandler.friendlyMessage(error),
);
},
onDone: () {
if (state.agentStatus != AgentStatus.error) {
state = state.copyWith(agentStatus: AgentStatus.idle);
}
},
);
} catch (e) {
state = state.copyWith(error: '取消失败: $e');
state = state.copyWith(
agentStatus: AgentStatus.error,
error: ErrorHandler.friendlyMessage(e),
);
}
}

View File

@ -37,4 +37,6 @@ export type EngineStreamEvent =
| { type: 'tool_result'; toolName: string; output: string; isError: boolean }
| { type: 'approval_required'; command: string; riskLevel: number; taskId: string }
| { type: 'completed'; summary: string; tokensUsed?: number }
| { type: 'error'; message: string; code: string };
| { type: 'error'; message: string; code: string }
| { type: 'cancelled'; message: string; code: string }
| { type: 'task_info'; taskId: string };