import 'dart:async'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:shared_preferences/shared_preferences.dart'; import '../../../../core/network/dio_client.dart'; import '../../../../core/network/websocket_client.dart'; import '../../../auth/data/providers/auth_provider.dart'; import '../../data/datasources/chat_local_datasource.dart'; import '../../data/datasources/chat_remote_datasource.dart'; import '../../data/models/chat_message_model.dart'; import '../../data/repositories/chat_repository_impl.dart'; import '../../../../core/errors/error_handler.dart'; import '../../domain/entities/chat_message.dart'; import '../../domain/entities/stream_event.dart'; import '../../domain/repositories/chat_repository.dart'; import '../../domain/usecases/cancel_task.dart'; import '../../domain/usecases/get_session_history.dart'; import '../../domain/usecases/send_message.dart'; import '../widgets/conversation_drawer.dart'; // --------------------------------------------------------------------------- // Dependency providers // --------------------------------------------------------------------------- final sharedPreferencesProvider = FutureProvider((ref) { return SharedPreferences.getInstance(); }); final chatRemoteDatasourceProvider = Provider((ref) { final dio = ref.watch(dioClientProvider); return ChatRemoteDatasource(dio); }); final chatLocalDatasourceProvider = Provider((ref) { final prefsAsync = ref.watch(sharedPreferencesProvider); return prefsAsync.whenOrNull( data: (prefs) => ChatLocalDatasource(prefs), ); }); final chatRepositoryProvider = Provider((ref) { final remote = ref.watch(chatRemoteDatasourceProvider); final local = ref.watch(chatLocalDatasourceProvider); final ws = ref.watch(webSocketClientProvider); final storage = ref.watch(secureStorageProvider); return ChatRepositoryImpl( remoteDatasource: remote, localDatasource: local ?? _NoOpLocalDatasource(), webSocketClient: ws, getAccessToken: () => storage.read(key: 'access_token'), ); }); // --------------------------------------------------------------------------- // Use case providers // --------------------------------------------------------------------------- final sendMessageUseCaseProvider = Provider((ref) { return SendMessage(ref.watch(chatRepositoryProvider)); }); final getSessionHistoryUseCaseProvider = Provider((ref) { return GetSessionHistory(ref.watch(chatRepositoryProvider)); }); final cancelTaskUseCaseProvider = Provider((ref) { return CancelTask(ref.watch(chatRepositoryProvider)); }); // --------------------------------------------------------------------------- // Session list provider (for conversation drawer) // --------------------------------------------------------------------------- final sessionListProvider = FutureProvider>((ref) async { final datasource = ref.watch(chatRemoteDatasourceProvider); return datasource.listSessions(); }); // --------------------------------------------------------------------------- // Chat state // --------------------------------------------------------------------------- enum AgentStatus { idle, thinking, executing, awaitingApproval, error } class ChatState { final List messages; final AgentStatus agentStatus; final String? sessionId; final String? taskId; final String? error; const ChatState({ this.messages = const [], this.agentStatus = AgentStatus.idle, this.sessionId, this.taskId, this.error, }); bool get isStreaming => agentStatus != AgentStatus.idle && agentStatus != AgentStatus.error; ChatState copyWith({ List? messages, AgentStatus? agentStatus, String? sessionId, String? taskId, String? error, bool clearTaskId = false, }) { return ChatState( messages: messages ?? this.messages, agentStatus: agentStatus ?? this.agentStatus, sessionId: sessionId ?? this.sessionId, taskId: clearTaskId ? null : (taskId ?? this.taskId), error: error, ); } } // --------------------------------------------------------------------------- // Chat notifier // --------------------------------------------------------------------------- class ChatNotifier extends StateNotifier { final Ref _ref; StreamSubscription? _eventSubscription; // Token throttle: buffer text tokens and flush every 80ms to reduce rebuilds final StringBuffer _textBuffer = StringBuffer(); final StringBuffer _thinkingBuffer = StringBuffer(); Timer? _flushTimer; static const _flushInterval = Duration(milliseconds: 80); ChatNotifier(this._ref) : super(const ChatState()); /// Sends a user message to the agent and processes the streamed response. Future sendMessage(String prompt, {List? attachments}) async { if (prompt.trim().isEmpty && (attachments == null || attachments.isEmpty)) return; // Add the user message locally final userMsg = ChatMessage( id: DateTime.now().microsecondsSinceEpoch.toString(), role: MessageRole.user, content: prompt.isEmpty ? '[图片]' : prompt, timestamp: DateTime.now(), type: MessageType.text, attachments: attachments, ); state = state.copyWith( messages: [...state.messages, userMsg], agentStatus: AgentStatus.thinking, error: null, ); try { final useCase = _ref.read(sendMessageUseCaseProvider); // Pass current sessionId to reuse the session for multi-turn context final sessionId = state.sessionId; final stream = useCase.execute( sessionId: sessionId ?? 'new', message: prompt.isEmpty ? '[图片]' : prompt, attachments: attachments?.map((a) => a.toJson()).toList(), ); _eventSubscription?.cancel(); _eventSubscription = stream.listen( (event) => _handleStreamEvent(event), onError: (error) { state = state.copyWith( agentStatus: AgentStatus.error, error: ErrorHandler.friendlyMessage(error), ); }, onDone: () { if (state.agentStatus != AgentStatus.error) { state = state.copyWith(agentStatus: AgentStatus.idle); } }, ); } catch (e) { state = state.copyWith( agentStatus: AgentStatus.error, error: ErrorHandler.friendlyMessage(e), ); } } void _handleStreamEvent(StreamEvent event) { switch (event) { case ThinkingEvent(:final content): _appendOrUpdateAssistantMessage(content, MessageType.thinking); if (state.agentStatus != AgentStatus.thinking) { state = state.copyWith(agentStatus: AgentStatus.thinking); } case TextEvent(:final content): _appendOrUpdateAssistantMessage(content, MessageType.text); if (state.agentStatus != AgentStatus.executing) { state = state.copyWith(agentStatus: AgentStatus.executing); } case ToolUseEvent(:final toolName, :final input): final msg = ChatMessage( id: DateTime.now().microsecondsSinceEpoch.toString(), role: MessageRole.assistant, content: '执行: $toolName', timestamp: DateTime.now(), type: MessageType.toolUse, toolExecution: ToolExecution( toolName: toolName, input: input.toString(), riskLevel: 0, status: ToolStatus.executing, ), ); state = state.copyWith( messages: [...state.messages, msg], agentStatus: AgentStatus.executing, ); case ToolResultEvent(:final toolName, :final output, :final isError): final updatedMessages = [...state.messages]; for (int i = updatedMessages.length - 1; i >= 0; i--) { final m = updatedMessages[i]; if (m.type == MessageType.toolUse && m.toolExecution?.status == ToolStatus.executing) { updatedMessages[i] = m.copyWith( toolExecution: m.toolExecution!.copyWith( status: isError ? ToolStatus.error : ToolStatus.completed, ), ); break; } } final msg = ChatMessage( id: DateTime.now().microsecondsSinceEpoch.toString(), role: MessageRole.assistant, content: output, timestamp: DateTime.now(), type: MessageType.toolResult, toolExecution: ToolExecution( toolName: toolName, input: '', output: output, riskLevel: 0, status: isError ? ToolStatus.error : ToolStatus.completed, ), ); state = state.copyWith(messages: [...updatedMessages, msg]); case ApprovalRequiredEvent(:final taskId, :final command, :final riskLevel): final msg = ChatMessage( id: DateTime.now().microsecondsSinceEpoch.toString(), role: MessageRole.assistant, content: '需要审批: $command', timestamp: DateTime.now(), type: MessageType.approval, approvalRequest: ApprovalRequest( taskId: taskId, command: command, riskLevel: riskLevel, expiresAt: DateTime.now().add(const Duration(minutes: 5)), ), ); state = state.copyWith( messages: [...state.messages, msg], agentStatus: AgentStatus.awaitingApproval, ); case CompletedEvent(:final summary): _flushBuffersSync(); final hasAssistantText = state.messages.any( (m) => m.role == MessageRole.assistant && m.type == MessageType.text && m.content.isNotEmpty, ); if (summary.isNotEmpty && !hasAssistantText) { // Write summary directly to state (not via buffer) since we're about // to set idle status — buffering would cause a brief missing-text gap. state = state.copyWith( messages: _applyBuffer(state.messages, summary, MessageType.text), ); } // Mark any remaining executing tools as completed final finalMessages = state.messages.map((m) { if (m.type == MessageType.toolUse && m.toolExecution?.status == ToolStatus.executing) { return m.copyWith( toolExecution: m.toolExecution!.copyWith( status: ToolStatus.completed, ), ); } return m; }).toList(); state = state.copyWith( messages: finalMessages, agentStatus: AgentStatus.idle, clearTaskId: true, ); case ErrorEvent(:final message): _flushBuffersSync(); state = state.copyWith( agentStatus: AgentStatus.error, error: message, clearTaskId: true, ); case StandingOrderDraftEvent(:final draft): final msg = ChatMessage( id: DateTime.now().microsecondsSinceEpoch.toString(), role: MessageRole.assistant, content: '常驻指令草案已生成', timestamp: DateTime.now(), type: MessageType.standingOrderDraft, metadata: draft, ); state = state.copyWith( messages: [...state.messages, msg], agentStatus: AgentStatus.awaitingApproval, ); case StandingOrderConfirmedEvent(:final orderId, :final orderName): final msg = ChatMessage( id: DateTime.now().microsecondsSinceEpoch.toString(), role: MessageRole.assistant, content: '常驻指令「$orderName」已确认 (ID: $orderId)', timestamp: DateTime.now(), type: MessageType.text, ); state = state.copyWith( messages: [...state.messages, msg], agentStatus: AgentStatus.idle, ); case SessionInfoEvent(:final sessionId): // Capture the real sessionId from the backend for multi-turn reuse state = state.copyWith(sessionId: sessionId); case TaskInfoEvent(:final taskId): // Capture the taskId for cancel/inject tracking state = state.copyWith(taskId: taskId); case CancelledEvent(): // Backend confirmed cancellation — if UI hasn't already gone idle, do it now if (state.agentStatus != AgentStatus.idle) { state = state.copyWith(agentStatus: AgentStatus.idle, clearTaskId: true); } } } void _appendOrUpdateAssistantMessage(String content, MessageType type) { // Buffer tokens and flush on a timer to reduce widget rebuilds if (type == MessageType.thinking) { _thinkingBuffer.write(content); } else { _textBuffer.write(content); } _flushTimer ??= Timer(_flushInterval, _flushBuffers); } /// Flush buffered tokens to state immediately (synchronous). void _flushBuffersSync() { _flushTimer?.cancel(); _flushTimer = null; _flushBuffers(); } /// Flush any buffered text/thinking tokens into state.messages. void _flushBuffers() { _flushTimer = null; if (_textBuffer.isEmpty && _thinkingBuffer.isEmpty) return; var messages = [...state.messages]; if (_textBuffer.isNotEmpty) { messages = _applyBuffer(messages, _textBuffer.toString(), MessageType.text); _textBuffer.clear(); } if (_thinkingBuffer.isNotEmpty) { messages = _applyBuffer(messages, _thinkingBuffer.toString(), MessageType.thinking); _thinkingBuffer.clear(); } state = state.copyWith(messages: messages); } List _applyBuffer( List messages, String content, MessageType type, ) { if (messages.isNotEmpty) { final last = messages.last; if (last.role == MessageRole.assistant && last.type == type) { return [ ...messages.sublist(0, messages.length - 1), last.copyWith(content: last.content + content), ]; } } return [ ...messages, ChatMessage( id: DateTime.now().microsecondsSinceEpoch.toString(), role: MessageRole.assistant, content: content, timestamp: DateTime.now(), type: type, isStreaming: true, ), ]; } /// Starts a new chat — clears messages and resets sessionId. void startNewChat() { _eventSubscription?.cancel(); _flushTimer?.cancel(); _flushTimer = null; _textBuffer.clear(); _thinkingBuffer.clear(); state = const ChatState(); } /// Switches to an existing session — loads its messages from the backend. Future switchSession(String sessionId) async { _eventSubscription?.cancel(); _flushTimer?.cancel(); _flushTimer = null; _textBuffer.clear(); _thinkingBuffer.clear(); state = ChatState(sessionId: sessionId, agentStatus: AgentStatus.idle); try { final datasource = _ref.read(chatRemoteDatasourceProvider); final messages = await datasource.getSessionMessages(sessionId); state = state.copyWith( messages: messages.map((m) => m.toEntity()).toList(), sessionId: sessionId, ); } catch (e) { state = state.copyWith(error: '加载对话历史失败: $e'); } } /// Deletes a session from the backend. Future deleteSession(String sessionId) async { try { final datasource = _ref.read(chatRemoteDatasourceProvider); await datasource.deleteSession(sessionId); // If the deleted session is the current one, reset state if (state.sessionId == sessionId) { state = const ChatState(); } } catch (e) { state = state.copyWith(error: '删除对话失败: $e'); } } Future approveCommand(String taskId) async { try { final repo = _ref.read(chatRepositoryProvider); await repo.approveCommand(taskId); state = state.copyWith(agentStatus: AgentStatus.executing); } catch (e) { state = state.copyWith(error: '审批失败: $e'); } } Future rejectCommand(String taskId, {String? reason}) async { try { final repo = _ref.read(chatRepositoryProvider); await repo.rejectCommand(taskId, reason: reason); state = state.copyWith(agentStatus: AgentStatus.idle); } catch (e) { state = state.copyWith(error: '拒绝失败: $e'); } } Future confirmStandingOrder(Map draft) async { if (state.sessionId == null) return; try { final repo = _ref.read(chatRepositoryProvider); await repo.confirmStandingOrder(state.sessionId!, draft); state = state.copyWith(agentStatus: AgentStatus.idle); } catch (e) { state = state.copyWith(error: '确认常驻指令失败: $e'); } } /// Sends a recorded audio file as a voice message. /// /// Flow: /// 1. Shows a temporary "识别中..." user message bubble. /// 2. Uploads audio to the backend voice-message endpoint. /// Backend runs Whisper STT, optionally interrupts any running task, /// and starts a new agent task with the transcript. /// 3. Replaces the placeholder with the real transcript. /// 4. Subscribes to the WS stream for the new task. Future sendVoiceMessage(String audioPath) async { final tempId = '${DateTime.now().microsecondsSinceEpoch}_voice'; final tempMsg = ChatMessage( id: tempId, role: MessageRole.user, content: '🎤 识别中...', timestamp: DateTime.now(), type: MessageType.text, ); // Cancel any ongoing subscription (voice message acts as interrupt) _eventSubscription?.cancel(); _eventSubscription = null; _flushBuffersSync(); state = state.copyWith( messages: [...state.messages, tempMsg], agentStatus: AgentStatus.thinking, error: null, ); try { final datasource = _ref.read(chatRemoteDatasourceProvider); final sessionId = state.sessionId ?? 'new'; final result = await datasource.sendVoiceMessage( sessionId: sessionId, audioPath: audioPath, ); final returnedSessionId = result['sessionId'] as String? ?? sessionId; final taskId = result['taskId'] as String?; final transcript = result['transcript'] as String? ?? '🎤'; // Replace placeholder with real transcript final updatedMessages = state.messages .map((m) => m.id == tempId ? m.copyWith(content: transcript) : m) .toList(); state = state.copyWith( messages: updatedMessages, sessionId: returnedSessionId, taskId: taskId, ); // Subscribe to the WS stream for the running task final repo = _ref.read(chatRepositoryProvider); final stream = repo.subscribeExistingTask( sessionId: returnedSessionId, taskId: taskId ?? '', ); _eventSubscription = stream.listen( (event) => _handleStreamEvent(event), onError: (error) { state = state.copyWith( agentStatus: AgentStatus.error, error: '语音消息处理失败: $error', ); }, onDone: () { if (state.agentStatus != AgentStatus.error) { state = state.copyWith(agentStatus: AgentStatus.idle); } }, ); } catch (e) { // Remove placeholder on failure state = state.copyWith( messages: state.messages.where((m) => m.id != tempId).toList(), agentStatus: AgentStatus.error, error: '语音识别失败: $e', ); } } Future transcribeAudio(String audioPath) async { final datasource = _ref.read(chatRemoteDatasourceProvider); return datasource.transcribeAudio(audioPath: audioPath); } Future cancelCurrentTask() async { final taskId = state.taskId; if (taskId == null && state.sessionId == null) return; // Flush any buffered tokens before cancelling _flushBuffersSync(); // 1. IMMEDIATELY update UI — optimistic cancel _eventSubscription?.cancel(); _eventSubscription = null; // Mark any executing tools as completed final updatedMessages = state.messages.map((m) { if (m.type == MessageType.toolUse && m.toolExecution?.status == ToolStatus.executing) { return m.copyWith( toolExecution: m.toolExecution!.copyWith(status: ToolStatus.completed), ); } return m; }).toList(); // Add interrupted marker to timeline final interruptedMsg = ChatMessage( id: '${DateTime.now().microsecondsSinceEpoch}_interrupted', role: MessageRole.assistant, content: '已中断', timestamp: DateTime.now(), type: MessageType.interrupted, ); state = state.copyWith( agentStatus: AgentStatus.idle, messages: [...updatedMessages, interruptedMsg], clearTaskId: true, error: null, ); // 2. Fire-and-forget backend cancel if (taskId != null) { try { final useCase = _ref.read(cancelTaskUseCaseProvider); await useCase.execute(taskId); } catch (_) { // Ignore — UI is already updated } } } /// Injects a message while the agent is actively working. /// Cancels the current task and starts a new one with the injected message. Future injectMessage(String message) async { if (message.trim().isEmpty) return; final taskId = state.taskId; if (taskId == null) { // Fallback: if no active task, treat as normal send return sendMessage(message); } // 1. Cancel current event subscription and flush buffered tokens _eventSubscription?.cancel(); _eventSubscription = null; _flushBuffersSync(); // 2. Mark executing tools as completed final updatedMessages = state.messages.map((m) { if (m.type == MessageType.toolUse && m.toolExecution?.status == ToolStatus.executing) { return m.copyWith( toolExecution: m.toolExecution!.copyWith(status: ToolStatus.completed), ); } return m; }).toList(); // 3. Add interrupted marker + user message to timeline final interruptedMsg = ChatMessage( id: '${DateTime.now().microsecondsSinceEpoch}_interrupted', role: MessageRole.assistant, content: '已中断 (用户追加指令)', timestamp: DateTime.now(), type: MessageType.interrupted, ); final userMsg = ChatMessage( id: DateTime.now().microsecondsSinceEpoch.toString(), role: MessageRole.user, content: message, timestamp: DateTime.now(), type: MessageType.text, ); state = state.copyWith( messages: [...updatedMessages, interruptedMsg, userMsg], agentStatus: AgentStatus.thinking, clearTaskId: true, error: null, ); // 4. Call the inject API and listen to new event stream try { final repo = _ref.read(chatRepositoryProvider); final stream = repo.injectMessage(taskId: taskId, message: message); _eventSubscription = stream.listen( (event) => _handleStreamEvent(event), onError: (error) { state = state.copyWith( agentStatus: AgentStatus.error, error: ErrorHandler.friendlyMessage(error), ); }, onDone: () { if (state.agentStatus != AgentStatus.error) { state = state.copyWith(agentStatus: AgentStatus.idle); } }, ); } catch (e) { state = state.copyWith( agentStatus: AgentStatus.error, error: ErrorHandler.friendlyMessage(e), ); } } Future loadSessionHistory(String sessionId) async { try { final useCase = _ref.read(getSessionHistoryUseCaseProvider); final messages = await useCase.execute(sessionId); state = state.copyWith( messages: messages, sessionId: sessionId, ); } catch (e) { state = state.copyWith(error: '加载历史记录失败: $e'); } } void clearChat() { _eventSubscription?.cancel(); _flushTimer?.cancel(); _flushTimer = null; _textBuffer.clear(); _thinkingBuffer.clear(); state = const ChatState(); } @override void dispose() { _eventSubscription?.cancel(); _flushTimer?.cancel(); _ref.read(webSocketClientProvider).disconnect(); super.dispose(); } } // --------------------------------------------------------------------------- // Main providers // --------------------------------------------------------------------------- final chatProvider = StateNotifierProvider((ref) { return ChatNotifier(ref); }); final agentStatusProvider = Provider((ref) { return ref.watch(chatProvider).agentStatus; }); final currentSessionIdProvider = Provider((ref) { return ref.watch(chatProvider).sessionId; }); final chatMessagesListProvider = Provider>((ref) { return ref.watch(chatProvider).messages; }); // --------------------------------------------------------------------------- // No-op local datasource fallback // --------------------------------------------------------------------------- class _NoOpLocalDatasource implements ChatLocalDatasource { @override Future cacheMessages( String sessionId, List messages, ) async {} @override List getCachedMessages(String sessionId) => []; @override Future clearCache(String sessionId) async {} @override Future clearAllCaches() async {} }