it0/it0_app/lib/features/chat/presentation/providers/chat_providers.dart

777 lines
25 KiB
Dart

import 'dart:async';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:shared_preferences/shared_preferences.dart';
import '../../../../core/network/dio_client.dart';
import '../../../../core/network/websocket_client.dart';
import '../../../auth/data/providers/auth_provider.dart';
import '../../data/datasources/chat_local_datasource.dart';
import '../../data/datasources/chat_remote_datasource.dart';
import '../../data/models/chat_message_model.dart';
import '../../data/repositories/chat_repository_impl.dart';
import '../../../../core/errors/error_handler.dart';
import '../../domain/entities/chat_message.dart';
import '../../domain/entities/stream_event.dart';
import '../../domain/repositories/chat_repository.dart';
import '../../domain/usecases/cancel_task.dart';
import '../../domain/usecases/get_session_history.dart';
import '../../domain/usecases/send_message.dart';
import '../widgets/conversation_drawer.dart';
// ---------------------------------------------------------------------------
// Dependency providers
// ---------------------------------------------------------------------------
final sharedPreferencesProvider = FutureProvider<SharedPreferences>((ref) {
return SharedPreferences.getInstance();
});
final chatRemoteDatasourceProvider = Provider<ChatRemoteDatasource>((ref) {
final dio = ref.watch(dioClientProvider);
return ChatRemoteDatasource(dio);
});
final chatLocalDatasourceProvider = Provider<ChatLocalDatasource?>((ref) {
final prefsAsync = ref.watch(sharedPreferencesProvider);
return prefsAsync.whenOrNull(
data: (prefs) => ChatLocalDatasource(prefs),
);
});
final chatRepositoryProvider = Provider<ChatRepository>((ref) {
final remote = ref.watch(chatRemoteDatasourceProvider);
final local = ref.watch(chatLocalDatasourceProvider);
final ws = ref.watch(webSocketClientProvider);
final storage = ref.watch(secureStorageProvider);
return ChatRepositoryImpl(
remoteDatasource: remote,
localDatasource: local ?? _NoOpLocalDatasource(),
webSocketClient: ws,
getAccessToken: () => storage.read(key: 'access_token'),
);
});
// ---------------------------------------------------------------------------
// Use case providers
// ---------------------------------------------------------------------------
final sendMessageUseCaseProvider = Provider<SendMessage>((ref) {
return SendMessage(ref.watch(chatRepositoryProvider));
});
final getSessionHistoryUseCaseProvider = Provider<GetSessionHistory>((ref) {
return GetSessionHistory(ref.watch(chatRepositoryProvider));
});
final cancelTaskUseCaseProvider = Provider<CancelTask>((ref) {
return CancelTask(ref.watch(chatRepositoryProvider));
});
// ---------------------------------------------------------------------------
// Session list provider (for conversation drawer)
// ---------------------------------------------------------------------------
final sessionListProvider = FutureProvider<List<SessionSummary>>((ref) async {
final datasource = ref.watch(chatRemoteDatasourceProvider);
return datasource.listSessions();
});
// ---------------------------------------------------------------------------
// Chat state
// ---------------------------------------------------------------------------
enum AgentStatus { idle, thinking, executing, awaitingApproval, error }
class ChatState {
final List<ChatMessage> messages;
final AgentStatus agentStatus;
final String? sessionId;
final String? taskId;
final String? error;
const ChatState({
this.messages = const [],
this.agentStatus = AgentStatus.idle,
this.sessionId,
this.taskId,
this.error,
});
bool get isStreaming => agentStatus != AgentStatus.idle && agentStatus != AgentStatus.error;
ChatState copyWith({
List<ChatMessage>? messages,
AgentStatus? agentStatus,
String? sessionId,
String? taskId,
String? error,
bool clearTaskId = false,
}) {
return ChatState(
messages: messages ?? this.messages,
agentStatus: agentStatus ?? this.agentStatus,
sessionId: sessionId ?? this.sessionId,
taskId: clearTaskId ? null : (taskId ?? this.taskId),
error: error,
);
}
}
// ---------------------------------------------------------------------------
// Chat notifier
// ---------------------------------------------------------------------------
class ChatNotifier extends StateNotifier<ChatState> {
final Ref _ref;
StreamSubscription<StreamEvent>? _eventSubscription;
// Token throttle: buffer text tokens and flush every 80ms to reduce rebuilds
final StringBuffer _textBuffer = StringBuffer();
final StringBuffer _thinkingBuffer = StringBuffer();
Timer? _flushTimer;
static const _flushInterval = Duration(milliseconds: 80);
ChatNotifier(this._ref) : super(const ChatState());
/// Sends a user message to the agent and processes the streamed response.
Future<void> sendMessage(String prompt, {List<ChatAttachment>? attachments}) async {
if (prompt.trim().isEmpty && (attachments == null || attachments.isEmpty)) return;
// Add the user message locally
final userMsg = ChatMessage(
id: DateTime.now().microsecondsSinceEpoch.toString(),
role: MessageRole.user,
content: prompt.isEmpty ? '[图片]' : prompt,
timestamp: DateTime.now(),
type: MessageType.text,
attachments: attachments,
);
state = state.copyWith(
messages: [...state.messages, userMsg],
agentStatus: AgentStatus.thinking,
error: null,
);
try {
final useCase = _ref.read(sendMessageUseCaseProvider);
// Pass current sessionId to reuse the session for multi-turn context
final sessionId = state.sessionId;
final stream = useCase.execute(
sessionId: sessionId ?? 'new',
message: prompt.isEmpty ? '[图片]' : prompt,
attachments: attachments?.map((a) => a.toJson()).toList(),
);
_eventSubscription?.cancel();
_eventSubscription = stream.listen(
(event) => _handleStreamEvent(event),
onError: (error) {
state = state.copyWith(
agentStatus: AgentStatus.error,
error: ErrorHandler.friendlyMessage(error),
);
},
onDone: () {
if (state.agentStatus != AgentStatus.error) {
state = state.copyWith(agentStatus: AgentStatus.idle);
}
},
);
} catch (e) {
state = state.copyWith(
agentStatus: AgentStatus.error,
error: ErrorHandler.friendlyMessage(e),
);
}
}
void _handleStreamEvent(StreamEvent event) {
switch (event) {
case ThinkingEvent(:final content):
_appendOrUpdateAssistantMessage(content, MessageType.thinking);
if (state.agentStatus != AgentStatus.thinking) {
state = state.copyWith(agentStatus: AgentStatus.thinking);
}
case TextEvent(:final content):
_appendOrUpdateAssistantMessage(content, MessageType.text);
if (state.agentStatus != AgentStatus.executing) {
state = state.copyWith(agentStatus: AgentStatus.executing);
}
case ToolUseEvent(:final toolName, :final input):
final msg = ChatMessage(
id: DateTime.now().microsecondsSinceEpoch.toString(),
role: MessageRole.assistant,
content: '执行: $toolName',
timestamp: DateTime.now(),
type: MessageType.toolUse,
toolExecution: ToolExecution(
toolName: toolName,
input: input.toString(),
riskLevel: 0,
status: ToolStatus.executing,
),
);
state = state.copyWith(
messages: [...state.messages, msg],
agentStatus: AgentStatus.executing,
);
case ToolResultEvent(:final toolName, :final output, :final isError):
final updatedMessages = [...state.messages];
for (int i = updatedMessages.length - 1; i >= 0; i--) {
final m = updatedMessages[i];
if (m.type == MessageType.toolUse &&
m.toolExecution?.status == ToolStatus.executing) {
updatedMessages[i] = m.copyWith(
toolExecution: m.toolExecution!.copyWith(
status: isError ? ToolStatus.error : ToolStatus.completed,
),
);
break;
}
}
final msg = ChatMessage(
id: DateTime.now().microsecondsSinceEpoch.toString(),
role: MessageRole.assistant,
content: output,
timestamp: DateTime.now(),
type: MessageType.toolResult,
toolExecution: ToolExecution(
toolName: toolName,
input: '',
output: output,
riskLevel: 0,
status: isError ? ToolStatus.error : ToolStatus.completed,
),
);
state = state.copyWith(messages: [...updatedMessages, msg]);
case ApprovalRequiredEvent(:final taskId, :final command, :final riskLevel):
final msg = ChatMessage(
id: DateTime.now().microsecondsSinceEpoch.toString(),
role: MessageRole.assistant,
content: '需要审批: $command',
timestamp: DateTime.now(),
type: MessageType.approval,
approvalRequest: ApprovalRequest(
taskId: taskId,
command: command,
riskLevel: riskLevel,
expiresAt: DateTime.now().add(const Duration(minutes: 5)),
),
);
state = state.copyWith(
messages: [...state.messages, msg],
agentStatus: AgentStatus.awaitingApproval,
);
case CompletedEvent(:final summary):
_flushBuffersSync();
final hasAssistantText = state.messages.any(
(m) => m.role == MessageRole.assistant && m.type == MessageType.text && m.content.isNotEmpty,
);
if (summary.isNotEmpty && !hasAssistantText) {
// Write summary directly to state (not via buffer) since we're about
// to set idle status — buffering would cause a brief missing-text gap.
state = state.copyWith(
messages: _applyBuffer(state.messages, summary, MessageType.text),
);
}
// Mark any remaining executing tools as completed
final finalMessages = state.messages.map((m) {
if (m.type == MessageType.toolUse &&
m.toolExecution?.status == ToolStatus.executing) {
return m.copyWith(
toolExecution: m.toolExecution!.copyWith(
status: ToolStatus.completed,
),
);
}
return m;
}).toList();
state = state.copyWith(
messages: finalMessages,
agentStatus: AgentStatus.idle,
clearTaskId: true,
);
case ErrorEvent(:final message):
_flushBuffersSync();
state = state.copyWith(
agentStatus: AgentStatus.error,
error: message,
clearTaskId: true,
);
case StandingOrderDraftEvent(:final draft):
final msg = ChatMessage(
id: DateTime.now().microsecondsSinceEpoch.toString(),
role: MessageRole.assistant,
content: '常驻指令草案已生成',
timestamp: DateTime.now(),
type: MessageType.standingOrderDraft,
metadata: draft,
);
state = state.copyWith(
messages: [...state.messages, msg],
agentStatus: AgentStatus.awaitingApproval,
);
case StandingOrderConfirmedEvent(:final orderId, :final orderName):
final msg = ChatMessage(
id: DateTime.now().microsecondsSinceEpoch.toString(),
role: MessageRole.assistant,
content: '常驻指令「$orderName」已确认 (ID: $orderId)',
timestamp: DateTime.now(),
type: MessageType.text,
);
state = state.copyWith(
messages: [...state.messages, msg],
agentStatus: AgentStatus.idle,
);
case SessionInfoEvent(:final sessionId):
// Capture the real sessionId from the backend for multi-turn reuse
state = state.copyWith(sessionId: sessionId);
case TaskInfoEvent(:final taskId):
// Capture the taskId for cancel/inject tracking
state = state.copyWith(taskId: taskId);
case CancelledEvent():
// Backend confirmed cancellation — if UI hasn't already gone idle, do it now
if (state.agentStatus != AgentStatus.idle) {
state = state.copyWith(agentStatus: AgentStatus.idle, clearTaskId: true);
}
}
}
void _appendOrUpdateAssistantMessage(String content, MessageType type) {
// Buffer tokens and flush on a timer to reduce widget rebuilds
if (type == MessageType.thinking) {
_thinkingBuffer.write(content);
} else {
_textBuffer.write(content);
}
_flushTimer ??= Timer(_flushInterval, _flushBuffers);
}
/// Flush buffered tokens to state immediately (synchronous).
void _flushBuffersSync() {
_flushTimer?.cancel();
_flushTimer = null;
_flushBuffers();
}
/// Flush any buffered text/thinking tokens into state.messages.
void _flushBuffers() {
_flushTimer = null;
if (_textBuffer.isEmpty && _thinkingBuffer.isEmpty) return;
var messages = [...state.messages];
if (_textBuffer.isNotEmpty) {
messages = _applyBuffer(messages, _textBuffer.toString(), MessageType.text);
_textBuffer.clear();
}
if (_thinkingBuffer.isNotEmpty) {
messages = _applyBuffer(messages, _thinkingBuffer.toString(), MessageType.thinking);
_thinkingBuffer.clear();
}
state = state.copyWith(messages: messages);
}
List<ChatMessage> _applyBuffer(
List<ChatMessage> messages,
String content,
MessageType type,
) {
if (messages.isNotEmpty) {
final last = messages.last;
if (last.role == MessageRole.assistant && last.type == type) {
return [
...messages.sublist(0, messages.length - 1),
last.copyWith(content: last.content + content),
];
}
}
return [
...messages,
ChatMessage(
id: DateTime.now().microsecondsSinceEpoch.toString(),
role: MessageRole.assistant,
content: content,
timestamp: DateTime.now(),
type: type,
isStreaming: true,
),
];
}
/// Starts a new chat — clears messages and resets sessionId.
void startNewChat() {
_eventSubscription?.cancel();
_flushTimer?.cancel();
_flushTimer = null;
_textBuffer.clear();
_thinkingBuffer.clear();
state = const ChatState();
}
/// Switches to an existing session — loads its messages from the backend.
Future<void> switchSession(String sessionId) async {
_eventSubscription?.cancel();
_flushTimer?.cancel();
_flushTimer = null;
_textBuffer.clear();
_thinkingBuffer.clear();
state = ChatState(sessionId: sessionId, agentStatus: AgentStatus.idle);
try {
final datasource = _ref.read(chatRemoteDatasourceProvider);
final messages = await datasource.getSessionMessages(sessionId);
state = state.copyWith(
messages: messages.map((m) => m.toEntity()).toList(),
sessionId: sessionId,
);
} catch (e) {
state = state.copyWith(error: '加载对话历史失败: $e');
}
}
/// Deletes a session from the backend.
Future<void> deleteSession(String sessionId) async {
try {
final datasource = _ref.read(chatRemoteDatasourceProvider);
await datasource.deleteSession(sessionId);
// If the deleted session is the current one, reset state
if (state.sessionId == sessionId) {
state = const ChatState();
}
} catch (e) {
state = state.copyWith(error: '删除对话失败: $e');
}
}
Future<void> approveCommand(String taskId) async {
try {
final repo = _ref.read(chatRepositoryProvider);
await repo.approveCommand(taskId);
state = state.copyWith(agentStatus: AgentStatus.executing);
} catch (e) {
state = state.copyWith(error: '审批失败: $e');
}
}
Future<void> rejectCommand(String taskId, {String? reason}) async {
try {
final repo = _ref.read(chatRepositoryProvider);
await repo.rejectCommand(taskId, reason: reason);
state = state.copyWith(agentStatus: AgentStatus.idle);
} catch (e) {
state = state.copyWith(error: '拒绝失败: $e');
}
}
Future<void> confirmStandingOrder(Map<String, dynamic> draft) async {
if (state.sessionId == null) return;
try {
final repo = _ref.read(chatRepositoryProvider);
await repo.confirmStandingOrder(state.sessionId!, draft);
state = state.copyWith(agentStatus: AgentStatus.idle);
} catch (e) {
state = state.copyWith(error: '确认常驻指令失败: $e');
}
}
/// Sends a recorded audio file as a voice message.
///
/// Flow:
/// 1. Shows a temporary "识别中..." user message bubble.
/// 2. Uploads audio to the backend voice-message endpoint.
/// Backend runs Whisper STT, optionally interrupts any running task,
/// and starts a new agent task with the transcript.
/// 3. Replaces the placeholder with the real transcript.
/// 4. Subscribes to the WS stream for the new task.
Future<void> sendVoiceMessage(String audioPath) async {
final tempId = '${DateTime.now().microsecondsSinceEpoch}_voice';
final tempMsg = ChatMessage(
id: tempId,
role: MessageRole.user,
content: '🎤 识别中...',
timestamp: DateTime.now(),
type: MessageType.text,
);
// Cancel any ongoing subscription (voice message acts as interrupt)
_eventSubscription?.cancel();
_eventSubscription = null;
_flushBuffersSync();
state = state.copyWith(
messages: [...state.messages, tempMsg],
agentStatus: AgentStatus.thinking,
error: null,
);
try {
final datasource = _ref.read(chatRemoteDatasourceProvider);
final sessionId = state.sessionId ?? 'new';
final result = await datasource.sendVoiceMessage(
sessionId: sessionId,
audioPath: audioPath,
);
final returnedSessionId = result['sessionId'] as String? ?? sessionId;
final taskId = result['taskId'] as String?;
final transcript = result['transcript'] as String? ?? '🎤';
// Replace placeholder with real transcript
final updatedMessages = state.messages
.map((m) => m.id == tempId ? m.copyWith(content: transcript) : m)
.toList();
state = state.copyWith(
messages: updatedMessages,
sessionId: returnedSessionId,
taskId: taskId,
);
// Subscribe to the WS stream for the running task
final repo = _ref.read(chatRepositoryProvider);
final stream = repo.subscribeExistingTask(
sessionId: returnedSessionId,
taskId: taskId ?? '',
);
_eventSubscription = stream.listen(
(event) => _handleStreamEvent(event),
onError: (error) {
state = state.copyWith(
agentStatus: AgentStatus.error,
error: '语音消息处理失败: $error',
);
},
onDone: () {
if (state.agentStatus != AgentStatus.error) {
state = state.copyWith(agentStatus: AgentStatus.idle);
}
},
);
} catch (e) {
// Remove placeholder on failure
state = state.copyWith(
messages: state.messages.where((m) => m.id != tempId).toList(),
agentStatus: AgentStatus.error,
error: '语音识别失败: $e',
);
}
}
Future<String> transcribeAudio(String audioPath, {String language = 'zh'}) async {
final datasource = _ref.read(chatRemoteDatasourceProvider);
return datasource.transcribeAudio(audioPath: audioPath, language: language);
}
Future<void> cancelCurrentTask() async {
final taskId = state.taskId;
if (taskId == null && state.sessionId == null) return;
// Flush any buffered tokens before cancelling
_flushBuffersSync();
// 1. IMMEDIATELY update UI — optimistic cancel
_eventSubscription?.cancel();
_eventSubscription = null;
// Mark any executing tools as completed
final updatedMessages = state.messages.map((m) {
if (m.type == MessageType.toolUse &&
m.toolExecution?.status == ToolStatus.executing) {
return m.copyWith(
toolExecution: m.toolExecution!.copyWith(status: ToolStatus.completed),
);
}
return m;
}).toList();
// Add interrupted marker to timeline
final interruptedMsg = ChatMessage(
id: '${DateTime.now().microsecondsSinceEpoch}_interrupted',
role: MessageRole.assistant,
content: '已中断',
timestamp: DateTime.now(),
type: MessageType.interrupted,
);
state = state.copyWith(
agentStatus: AgentStatus.idle,
messages: [...updatedMessages, interruptedMsg],
clearTaskId: true,
error: null,
);
// 2. Fire-and-forget backend cancel
if (taskId != null) {
try {
final useCase = _ref.read(cancelTaskUseCaseProvider);
await useCase.execute(taskId);
} catch (_) {
// Ignore — UI is already updated
}
}
}
/// Injects a message while the agent is actively working.
/// Cancels the current task and starts a new one with the injected message.
Future<void> injectMessage(String message) async {
if (message.trim().isEmpty) return;
final taskId = state.taskId;
if (taskId == null) {
// Fallback: if no active task, treat as normal send
return sendMessage(message);
}
// 1. Cancel current event subscription and flush buffered tokens
_eventSubscription?.cancel();
_eventSubscription = null;
_flushBuffersSync();
// 2. Mark executing tools as completed
final updatedMessages = state.messages.map((m) {
if (m.type == MessageType.toolUse &&
m.toolExecution?.status == ToolStatus.executing) {
return m.copyWith(
toolExecution: m.toolExecution!.copyWith(status: ToolStatus.completed),
);
}
return m;
}).toList();
// 3. Add interrupted marker + user message to timeline
final interruptedMsg = ChatMessage(
id: '${DateTime.now().microsecondsSinceEpoch}_interrupted',
role: MessageRole.assistant,
content: '已中断 (用户追加指令)',
timestamp: DateTime.now(),
type: MessageType.interrupted,
);
final userMsg = ChatMessage(
id: DateTime.now().microsecondsSinceEpoch.toString(),
role: MessageRole.user,
content: message,
timestamp: DateTime.now(),
type: MessageType.text,
);
state = state.copyWith(
messages: [...updatedMessages, interruptedMsg, userMsg],
agentStatus: AgentStatus.thinking,
clearTaskId: true,
error: null,
);
// 4. Call the inject API and listen to new event stream
try {
final repo = _ref.read(chatRepositoryProvider);
final stream = repo.injectMessage(taskId: taskId, message: message);
_eventSubscription = stream.listen(
(event) => _handleStreamEvent(event),
onError: (error) {
state = state.copyWith(
agentStatus: AgentStatus.error,
error: ErrorHandler.friendlyMessage(error),
);
},
onDone: () {
if (state.agentStatus != AgentStatus.error) {
state = state.copyWith(agentStatus: AgentStatus.idle);
}
},
);
} catch (e) {
state = state.copyWith(
agentStatus: AgentStatus.error,
error: ErrorHandler.friendlyMessage(e),
);
}
}
Future<void> loadSessionHistory(String sessionId) async {
try {
final useCase = _ref.read(getSessionHistoryUseCaseProvider);
final messages = await useCase.execute(sessionId);
state = state.copyWith(
messages: messages,
sessionId: sessionId,
);
} catch (e) {
state = state.copyWith(error: '加载历史记录失败: $e');
}
}
void clearChat() {
_eventSubscription?.cancel();
_flushTimer?.cancel();
_flushTimer = null;
_textBuffer.clear();
_thinkingBuffer.clear();
state = const ChatState();
}
@override
void dispose() {
_eventSubscription?.cancel();
_flushTimer?.cancel();
_ref.read(webSocketClientProvider).disconnect();
super.dispose();
}
}
// ---------------------------------------------------------------------------
// Main providers
// ---------------------------------------------------------------------------
final chatProvider = StateNotifierProvider<ChatNotifier, ChatState>((ref) {
return ChatNotifier(ref);
});
final agentStatusProvider = Provider<AgentStatus>((ref) {
return ref.watch(chatProvider).agentStatus;
});
final currentSessionIdProvider = Provider<String?>((ref) {
return ref.watch(chatProvider).sessionId;
});
final chatMessagesListProvider = Provider<List<ChatMessage>>((ref) {
return ref.watch(chatProvider).messages;
});
// ---------------------------------------------------------------------------
// No-op local datasource fallback
// ---------------------------------------------------------------------------
class _NoOpLocalDatasource implements ChatLocalDatasource {
@override
Future<void> cacheMessages(
String sessionId,
List<ChatMessageModel> messages,
) async {}
@override
List<ChatMessageModel> getCachedMessages(String sessionId) => [];
@override
Future<void> clearCache(String sessionId) async {}
@override
Future<void> clearAllCaches() async {}
}