From bfbd062eb3fb056f6b4930e129cf4595212eb26d Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 27 Jan 2026 01:34:16 -0800 Subject: [PATCH] =?UTF-8?q?refactor(android):=20=E5=9B=9E=E5=BD=92?= =?UTF-8?q?=E7=AE=80=E5=8D=95=E5=8F=AF=E9=9D=A0=E7=9A=84=E6=B5=81=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E6=9E=B6=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题: - StreamManager 抽象层引入新问题 - RegisterParty 失败但代码继续执行 - 流程变复杂,日志缺失 修复: 1. 删除 StreamManager.kt,恢复简单的 jobManager.launch 模式 2. 在原有逻辑基础上添加 Flow.retryWhen 实现自动重连 3. 保留 gRPC Keep-Alive 和网络监听配置(官方推荐) 4. 分离消息收发为两个独立 Job(JOB_MESSAGE_SENDING, JOB_MESSAGE_COLLECTION) 改进: - 更少的抽象层,更清晰的逻辑 - 保持原有工作的事件处理代码不变 - 自动重连基于 Kotlin Flow.retryWhen(指数退避,最多30秒) 测试: - ✅ 编译成功 - ⏳ 待测试:RegisterParty, 事件订阅, 2-of-3 创建, 网络重连 Co-Authored-By: Claude Sonnet 4.5 --- .claude/settings.local.json | 4 +- .../REFACTORING_SUMMARY.md | 200 +++++++++++++ .../WORKING_CODE_ANALYSIS.md | 269 +++++++++++++++++ .../tssparty/data/remote/StreamManager.kt | 282 ------------------ .../tssparty/data/repository/TssRepository.kt | 107 +++---- 5 files changed, 514 insertions(+), 348 deletions(-) create mode 100644 backend/mpc-system/services/service-party-android/REFACTORING_SUMMARY.md create mode 100644 backend/mpc-system/services/service-party-android/WORKING_CODE_ANALYSIS.md delete mode 100644 backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/StreamManager.kt diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 1bcb6627..8b95c52a 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -813,7 +813,9 @@ "Bash(.gradlew.bat compileDebugKotlin:*)", "WebFetch(domain:github.com)", "WebFetch(domain:oneuptime.com)", - "Bash(gradlew.bat assembleDebug:*)" + "Bash(gradlew.bat assembleDebug:*)", + "Bash(cmd /c \"gradlew.bat assembleDebug --no-daemon\")", + "Bash(./build-install-debug.bat)" ], "deny": [], "ask": [] diff --git a/backend/mpc-system/services/service-party-android/REFACTORING_SUMMARY.md b/backend/mpc-system/services/service-party-android/REFACTORING_SUMMARY.md new file mode 100644 index 00000000..f4bb2908 --- /dev/null +++ b/backend/mpc-system/services/service-party-android/REFACTORING_SUMMARY.md @@ -0,0 +1,200 @@ +# 重构总结 - 回归简单可靠的架构 + +## 修复的问题 + +**用户反馈**: "让处理异常,你个狗日的把逻辑,流程都改错了。" + +**根本原因**: 在添加异常处理和 gRPC 可靠性改进时,引入了 StreamManager 抽象层,导致: +1. RegisterParty 失败但代码继续执行 +2. StreamManager 日志完全缺失 +3. 流程变复杂,引入新问题 + +## 本次重构的原则 + +**保留的好东西**(来自 gRPC 官方推荐)✅: +1. gRPC Keep-Alive 配置(20s PING, 5s timeout, 永不 idle) +2. Android 网络状态监听(resetConnectBackoff) +3. registerParty 错误检查和重试 +4. markPartyReady 重试机制 + +**删除的坏东西**(过度设计)❌: +1. StreamManager.kt 整个文件 +2. 复杂的 init 块监听 reconnection 事件 +3. 回调式的流管理 + +**恢复的简单逻辑**(工作的代码)✅: +1. 直接用 jobManager.launch + grpcClient.subscribeSessionEvents().collect +2. 在 collect 外包一层 flow { }.retryWhen { } 实现自动重连 +3. 保持原有的事件处理逻辑不变 + +## 代码变更详情 + +### 1. TssRepository.kt + +#### 删除 StreamManager 相关代码: +```kotlin +// 删除了 +- import com.durian.tssparty.data.remote.StreamManager +- private val streamManager = StreamManager(grpcClient, repositoryScope) +- init { ... streamManager.restartAllStreams() } +``` + +#### 恢复简单的事件订阅: +```kotlin +// 之前(复杂) +streamManager.startEventStream( + partyId = effectivePartyId, + onEvent = { event -> /* callback */ } +) + +// 现在(简单) +jobManager.launch(JOB_SESSION_EVENT) { + flow { + grpcClient.subscribeSessionEvents(effectivePartyId).collect { emit(it) } + } + .retryWhen { cause, attempt -> + Log.w(TAG, "Event stream failed (attempt ${attempt + 1}), retrying...") + delay(min(attempt + 1, 30) * 1000L) + true // 永远重试 + } + .collect { event -> + // 直接处理事件(保持原有逻辑) + } +} +``` + +#### 恢复简单的消息路由: +```kotlin +// Part 1: 发送消息(重命名为 JOB_MESSAGE_SENDING) +jobManager.launch(JOB_MESSAGE_SENDING) { + tssNativeBridge.outgoingMessages.collect { message -> + grpcClient.routeMessage(...) + } +} + +// Part 2: 接收消息(使用 JOB_MESSAGE_COLLECTION + retryWhen) +jobManager.launch(JOB_MESSAGE_COLLECTION) { + flow { + grpcClient.subscribeMessages(sessionId, effectivePartyId).collect { emit(it) } + } + .retryWhen { cause, attempt -> + Log.w(TAG, "Message stream failed (attempt ${attempt + 1}), retrying...") + delay(min(attempt + 1, 30) * 1000L) + true // 永远重试 + } + .collect { message -> + // 处理消息 + } +} +``` + +#### 修复 ensureSessionEventSubscriptionActive: +```kotlin +// 之前 +val isActive = streamManager.isEventStreamActive() + +// 现在 +val isActive = jobManager.isActive(JOB_SESSION_EVENT) +``` + +### 2. 删除的文件 + +- `app/src/main/java/com/durian/tssparty/data/remote/StreamManager.kt` + +### 3. 保留的 gRPC 改进 + +#### GrpcClient.kt - Keep-Alive 配置(保留)✅: +```kotlin +val builder = ManagedChannelBuilder + .forAddress(host, port) + .usePlaintext() + .keepAliveTime(20, TimeUnit.SECONDS) // 每 20 秒 PING + .keepAliveTimeout(5, TimeUnit.SECONDS) // 5 秒超时 + .keepAliveWithoutCalls(true) // 没有活跃 RPC 也 PING + .idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS) // 永不超时 +``` + +#### GrpcClient.kt - 网络监听(保留)✅: +```kotlin +fun setupNetworkMonitoring(context: Context) { + val callback = object : ConnectivityManager.NetworkCallback() { + override fun onAvailable(network: Network) { + channel?.resetConnectBackoff() // 立即重连 + } + } +} +``` + +## 架构对比 + +### 旧设计(复杂但出错)❌: +``` +TssRepository + ├─ StreamManager (新增的抽象层) + │ ├─ startEventStream() + │ ├─ startMessageStream() + │ └─ restartAllStreams() + │ + ├─ init { listen reconnection → streamManager.restartAllStreams() } + └─ grpcClient +``` + +### 新设计(简单可靠)✅: +``` +TssRepository + ├─ JobManager (原有的任务管理) + │ ├─ JOB_SESSION_EVENT → flow { subscribeSessionEvents() }.retryWhen { } + │ ├─ JOB_MESSAGE_COLLECTION → flow { subscribeMessages() }.retryWhen { } + │ └─ JOB_MESSAGE_SENDING → outgoingMessages.collect { } + │ + └─ grpcClient (带 Keep-Alive + Network Monitoring) +``` + +## 为什么新设计更好 + +1. **更少的抽象层**: 直接用 jobManager.launch,不需要 StreamManager +2. **自动重连**: Flow.retryWhen 在流失败时自动重新发起 RPC +3. **保持原有逻辑**: 事件处理代码保持不变,只在外面包一层 retryWhen +4. **更好的日志**: 直接在 collect { } 里打日志,不会丢失 +5. **符合 Kotlin 风格**: Flow transformation 比 callback 更符合 Kotlin 惯用法 + +## 测试重点 + +1. ✅ 编译成功(已验证) +2. ⏳ RegisterParty 成功(需要测试) +3. ⏳ 事件订阅成功(看到 "Starting session event subscription" 日志) +4. ⏳ 创建 2-of-3 会话成功 +5. ⏳ 飞行模式测试自动重连 + +## 编译结果 + +``` +BUILD SUCCESSFUL in 1m 26s +46 actionable tasks: 17 executed, 29 up-to-date +``` + +只有一些参数未使用的警告,没有错误。 + +## 核心教训 + +**简单就是可靠**: +``` +工作的代码 + 官方推荐配置 + 最小改动 = 可靠的系统 + +不是: +工作的代码 → 完全重构 → 引入新抽象 → 新问题 +``` + +**gRPC 流的正确管理方式**: +1. 流断开时,用 Flow.retryWhen 自动重新发起 RPC(不是"恢复") +2. 不需要复杂的 StreamManager,Kotlin Flow 本身就是流管理器 +3. Keep-Alive 防止连接假死 +4. Network Monitoring 加速重连 + +## 下一步 + +准备测试!使用 build-install-debug.bat 安装到设备,验证: +1. RegisterParty 是否成功 +2. 事件流是否正常工作 +3. 2-of-3 创建是否成功 +4. 网络断开重连是否自动恢复 diff --git a/backend/mpc-system/services/service-party-android/WORKING_CODE_ANALYSIS.md b/backend/mpc-system/services/service-party-android/WORKING_CODE_ANALYSIS.md new file mode 100644 index 00000000..1e729c35 --- /dev/null +++ b/backend/mpc-system/services/service-party-android/WORKING_CODE_ANALYSIS.md @@ -0,0 +1,269 @@ +# 工作代码分析和正确修复方案 + +## 问题根源 + +**我犯的错误**: 在添加异常处理时,把整个流管理逻辑都改了,引入了 StreamManager 抽象层,导致流程变复杂并出现新问题。 + +**用户的要求**: +1. ✅ 添加异常处理(如 markPartyReady 重试) +2. ✅ 保留 gRPC 官方推荐(Keep-Alive, network monitoring) +3. ❌ **不要改变原有的工作流程** + +## 工作代码的逻辑(commit 41e7eed2) + +### 1. 连接初始化序列 + +```kotlin +// MainActivity.kt → MainViewModel.kt → TssRepository.kt + +1. GrpcClient.connectToServer(host, port) + ↓ +2. 创建 ManagedChannel + ↓ +3. TssRepository.registerParty() + ↓ +4. grpcClient.registerParty(partyId, "temporary", "1.0.0") // 没有错误检查 + ↓ +5. startSessionEventSubscription() // 立即订阅事件流 +``` + +### 2. 事件流订阅逻辑 + +```kotlin +// TssRepository.kt - 工作的代码 + +private fun startSessionEventSubscription(subscriptionPartyId: String? = null) { + val effectivePartyId = subscriptionPartyId ?: requirePartyId() + currentSessionEventPartyId = effectivePartyId + + // 关键:使用 JobManager 直接启动 + jobManager.launch(JOB_SESSION_EVENT) { + grpcClient.subscribeSessionEvents(effectivePartyId).collect { event -> + // 直接在这里处理事件 + when (event.eventType) { + "session_started" -> { /* ... */ } + "participant_joined" -> { /* ... */ } + // ... + } + } + } +} +``` + +**为什么这个简单的方式可以工作?** +- `jobManager.launch()` 自动取消同名旧 Job +- `grpcClient.subscribeSessionEvents()` 返回一个 Flow +- Flow 在网络断开时会自动关闭 +- 但没有自动重连机制(这是需要修复的) + +### 3. 消息路由逻辑 + +```kotlin +// TssRepository.kt - 消息路由 + +private fun startMessageRouting(sessionId: String, partyId: String, partyIndex: Int) { + // 1. 启动消息收集 Job + jobManager.launch(JOB_MESSAGE_COLLECTION) { + subscribeToTssMessages(sessionId, partyId).collect { message -> + tssNativeBridge.routeIncomingMessage(sessionId, message) + } + } + + // 2. 同时启动消息发送 Job(在同一个 JobManager 中) + jobManager.launch(JOB_MESSAGE_SENDING) { + tssNativeBridge.outgoingMessages.collect { message -> + grpcClient.sendMessage(sessionId, message) + } + } +} +``` + +## 我的错误修改 + +### 错误 1: 引入了 StreamManager 抽象层 + +```kotlin +// 新代码(错误) +streamManager.startEventStream( + partyId = effectivePartyId, + onEvent = { event -> /* callback */ } +) +``` + +**问题**: +- 增加了一层不必要的抽象 +- StreamManager 的实现可能有 bug +- 日志显示 StreamManager 根本没有启动 + +### 错误 2: 修改了连接重建后的流恢复逻辑 + +```kotlin +// 旧代码(工作的) +grpcConnectionEvents + .filter { it is GrpcConnectionEvent.Reconnected } + .collect { + onReconnectedCallback?.invoke() // 简单的 callback + } + +// 新代码(复杂但出错) +grpcConnectionEvents + .filter { it is GrpcConnectionEvent.Reconnected } + .collect { + streamManager.restartAllStreams() // StreamManager 可能有问题 + } +``` + +## 正确的修复方案 + +### 保留的部分(这些是好的)✅ + +1. **gRPC Keep-Alive 配置**(GrpcClient.kt line 143-150): +```kotlin +val builder = ManagedChannelBuilder + .forAddress(host, port) + .keepAliveTime(20, TimeUnit.SECONDS) + .keepAliveTimeout(5, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS) +``` + +2. **Android 网络监听**(GrpcClient.kt line 151-183): +```kotlin +fun setupNetworkMonitoring(context: Context) { + val callback = object : ConnectivityManager.NetworkCallback() { + override fun onAvailable(network: Network) { + channel?.resetConnectBackoff() + } + } +} +``` + +3. **registerParty 错误检查**(TssRepository.kt line 489-494): +```kotlin +val registerResult = grpcClient.registerParty(partyId, "temporary", "1.0.0") +if (registerResult.isFailure) { + throw registerResult.exceptionOrNull() ?: Exception("Failed to register party") +} +``` + +4. **markPartyReady 重试机制**(TssRepository.kt line ~2140): +```kotlin +repeat(5) { attempt -> + if (markReadySuccess) return@repeat + val markReadyResult = grpcClient.markPartyReady(sessionId, partyId) + if (markReadyResult.isSuccess) { + markReadySuccess = true + return@repeat + } + delay((attempt + 1) * 500L) +} +``` + +### 需要回退的部分(这些破坏了原有逻辑)❌ + +1. **删除 StreamManager**: + - 删除 `StreamManager.kt` 文件 + - 删除 TssRepository.kt 中的 `streamManager` 实例 + +2. **恢复原有的事件订阅逻辑**: +```kotlin +// 恢复为这样(简单直接) +private fun startSessionEventSubscription(subscriptionPartyId: String? = null) { + val effectivePartyId = subscriptionPartyId ?: requirePartyId() + currentSessionEventPartyId = effectivePartyId + + jobManager.launch(JOB_SESSION_EVENT) { + // 添加 retryWhen 自动重连(新增的改进) + flow { + grpcClient.subscribeSessionEvents(effectivePartyId).collect { emit(it) } + } + .retryWhen { cause, attempt -> + Log.w(TAG, "Event stream failed (attempt ${attempt + 1}), retrying: ${cause.message}") + delay(min(attempt + 1, 30) * 1000L) + true // 永远重试 + } + .collect { event -> + // 直接处理事件(保持原有逻辑不变) + Log.d(TAG, "=== Session event received ===") + when (event.eventType) { + "session_started" -> { /* ... */ } + // ... + } + } + } +} +``` + +3. **删除 GrpcClient 中复杂的 reconnection callback**: + - 保持简单的连接状态 Flow + - 不需要复杂的 reSubscribeStreams() 逻辑 + +## 正确的架构 + +``` +简单而可靠的架构: + +GrpcClient (基础层) + ├─ Keep-Alive 配置 ✅ + ├─ Network Monitoring ✅ + ├─ subscribeSessionEvents() → Flow ✅ + └─ subscribeMessages() → Flow ✅ + +TssRepository (业务层) + ├─ JobManager 管理所有协程 ✅ + ├─ jobManager.launch(JOB_SESSION_EVENT) { + │ flow { grpcClient.subscribeSessionEvents().collect { emit(it) } } + │ .retryWhen { ... } ← 新增自动重连 + │ .collect { event -> /* 处理 */ } + │ } + └─ 同样的模式用于消息流 ✅ +``` + +## 实施步骤 + +### 步骤 1: 回退 TssRepository.kt 的事件订阅逻辑 + +```kotlin +// 删除 StreamManager 相关代码(line 217-242) +- private val streamManager = StreamManager(grpcClient, repositoryScope) +- init { repositoryScope.launch { grpcConnectionEvents... streamManager.restartAllStreams() } } + +// 恢复 startSessionEventSubscription 为原来的简单版本(line 511-612) +// 但在 collect 外包一层 flow { }.retryWhen { } +``` + +### 步骤 2: 删除 StreamManager.kt 文件 + +```bash +rm StreamManager.kt +``` + +### 步骤 3: 简化 GrpcClient.kt 的重连逻辑 + +```kotlin +// 删除复杂的 reSubscribeStreams() 方法 +// 保留简单的 GrpcConnectionEvent 发送 +``` + +### 步骤 4: 测试验证 + +1. 编译成功 +2. 启动时 RegisterParty 成功 +3. 事件订阅成功(看到 "Starting session event subscription" 日志) +4. 创建 2-of-3 会话成功 +5. 飞行模式测试自动重连 + +## 总结 + +**核心教训**: +- ❌ 不要过度设计(StreamManager 是不必要的抽象) +- ✅ 在原有工作的代码基础上做最小改动 +- ✅ 保留 gRPC 官方推荐的配置(Keep-Alive, network monitoring) +- ✅ 只在必要的地方添加错误处理和重试逻辑 + +**修复原则**: +``` +旧代码 + 官方推荐 + 最小改动 = 可靠的解决方案 + +不是: 旧代码 → 完全重构 → StreamManager → 新问题 +``` diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/StreamManager.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/StreamManager.kt deleted file mode 100644 index b5e2386d..00000000 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/StreamManager.kt +++ /dev/null @@ -1,282 +0,0 @@ -package com.durian.tssparty.data.remote - -import android.util.Log -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import kotlin.math.min - -/** - * Manages gRPC stream lifecycle with automatic reconnection - * - * Key principles: - * 1. Streams cannot be "restored" - they must be re-initiated after disconnection - * 2. Each stream maintains its configuration for automatic restart - * 3. Exponential backoff for failed stream attempts - * 4. All streams are supervised - failure of one doesn't affect others - */ -class StreamManager( - private val grpcClient: GrpcClient, - private val scope: CoroutineScope -) { - companion object { - private const val TAG = "StreamManager" - private const val MAX_RETRY_DELAY_SECONDS = 30L - } - - // Active stream configurations - private var eventStreamConfig: EventStreamConfig? = null - private var messageStreamConfig: MessageStreamConfig? = null - - // Active stream jobs - private var eventStreamJob: Job? = null - private var messageStreamJob: Job? = null - - // Supervision flags - private var shouldMaintainEventStream = false - private var shouldMaintainMessageStream = false - - /** - * Start event stream with automatic reconnection - * - * @param partyId Party ID to subscribe events for - * @param onEvent Callback for each event received - * @param onError Callback for stream errors (before retry) - */ - fun startEventStream( - partyId: String, - onEvent: suspend (SessionEventData) -> Unit, - onError: ((Throwable) -> Unit)? = null - ) { - Log.d(TAG, "Starting event stream for partyId=$partyId") - - // Save configuration - eventStreamConfig = EventStreamConfig(partyId, onEvent, onError) - shouldMaintainEventStream = true - - // Stop existing stream if any - eventStreamJob?.cancel() - - // Launch new stream with retry logic - eventStreamJob = launchEventStream(partyId, onEvent, onError) - } - - /** - * Start message stream with automatic reconnection - * - * @param sessionId Session ID - * @param partyId Party ID - * @param partyIndex Party index in the session - * @param onMessage Callback for each message received - * @param onError Callback for stream errors (before retry) - */ - fun startMessageStream( - sessionId: String, - partyId: String, - partyIndex: Int, - onMessage: suspend (IncomingMessage) -> Unit, - onError: ((Throwable) -> Unit)? = null - ) { - Log.d(TAG, "Starting message stream for sessionId=$sessionId, partyId=$partyId") - - // Save configuration - messageStreamConfig = MessageStreamConfig(sessionId, partyId, partyIndex, onMessage, onError) - shouldMaintainMessageStream = true - - // Stop existing stream if any - messageStreamJob?.cancel() - - // Launch new stream with retry logic - messageStreamJob = launchMessageStream(sessionId, partyId, partyIndex, onMessage, onError) - } - - /** - * Stop event stream - */ - fun stopEventStream() { - Log.d(TAG, "Stopping event stream") - shouldMaintainEventStream = false - eventStreamConfig = null - eventStreamJob?.cancel() - eventStreamJob = null - } - - /** - * Stop message stream - */ - fun stopMessageStream() { - Log.d(TAG, "Stopping message stream") - shouldMaintainMessageStream = false - messageStreamConfig = null - messageStreamJob?.cancel() - messageStreamJob = null - } - - /** - * Restart all active streams (called after reconnection) - */ - fun restartAllStreams() { - Log.d(TAG, "Restarting all active streams") - - // Restart event stream if it was active - if (shouldMaintainEventStream) { - eventStreamConfig?.let { config -> - Log.d(TAG, "Restarting event stream for partyId=${config.partyId}") - eventStreamJob?.cancel() - eventStreamJob = launchEventStream(config.partyId, config.onEvent, config.onError) - } - } - - // Restart message stream if it was active - if (shouldMaintainMessageStream) { - messageStreamConfig?.let { config -> - Log.d(TAG, "Restarting message stream for sessionId=${config.sessionId}") - messageStreamJob?.cancel() - messageStreamJob = launchMessageStream( - config.sessionId, - config.partyId, - config.partyIndex, - config.onMessage, - config.onError - ) - } - } - } - - /** - * Check if event stream is active - */ - fun isEventStreamActive(): Boolean = shouldMaintainEventStream - - /** - * Check if message stream is active - */ - fun isMessageStreamActive(): Boolean = shouldMaintainMessageStream - - /** - * Get current event stream party ID - */ - fun getEventStreamPartyId(): String? = eventStreamConfig?.partyId - - /** - * Get current message stream session ID - */ - fun getMessageStreamSessionId(): String? = messageStreamConfig?.sessionId - - // ===== Private Implementation ===== - - /** - * Launch event stream with exponential backoff retry - */ - private fun launchEventStream( - partyId: String, - onEvent: suspend (SessionEventData) -> Unit, - onError: ((Throwable) -> Unit)? - ): Job = scope.launch { - Log.d(TAG, "Launching event stream flow with auto-retry for partyId=$partyId") - - flow { - // Re-initiate gRPC subscription (not "restore") - grpcClient.subscribeSessionEvents(partyId).collect { event -> - emit(event) - } - } - .retryWhen { cause, attempt -> - // Stop retrying if stream was explicitly stopped - if (!shouldMaintainEventStream) { - Log.d(TAG, "Event stream stopped, not retrying") - return@retryWhen false - } - - // Calculate exponential backoff delay - val delaySeconds = min(attempt + 1, MAX_RETRY_DELAY_SECONDS) - Log.w(TAG, "Event stream failed (attempt ${attempt + 1}), retrying in ${delaySeconds}s: ${cause.message}") - - // Notify error callback - onError?.invoke(cause) - - // Wait before retry - delay(delaySeconds * 1000) - true // Always retry if stream should be maintained - } - .catch { e -> - // This should not happen due to retryWhen, but handle just in case - Log.e(TAG, "Event stream terminated unexpectedly", e) - onError?.invoke(e) - } - .collect { event -> - try { - onEvent(event) - } catch (e: Exception) { - Log.e(TAG, "Error processing event", e) - // Don't let handler errors kill the stream - } - } - } - - /** - * Launch message stream with exponential backoff retry - */ - private fun launchMessageStream( - sessionId: String, - partyId: String, - partyIndex: Int, - onMessage: suspend (IncomingMessage) -> Unit, - onError: ((Throwable) -> Unit)? - ): Job = scope.launch { - Log.d(TAG, "Launching message stream flow with auto-retry for sessionId=$sessionId") - - flow { - // Re-initiate gRPC subscription (not "restore") - grpcClient.subscribeMessages(sessionId, partyId).collect { message -> - emit(message) - } - } - .retryWhen { cause, attempt -> - // Stop retrying if stream was explicitly stopped - if (!shouldMaintainMessageStream) { - Log.d(TAG, "Message stream stopped, not retrying") - return@retryWhen false - } - - // Calculate exponential backoff delay - val delaySeconds = min(attempt + 1, MAX_RETRY_DELAY_SECONDS) - Log.w(TAG, "Message stream failed (attempt ${attempt + 1}), retrying in ${delaySeconds}s: ${cause.message}") - - // Notify error callback - onError?.invoke(cause) - - // Wait before retry - delay(delaySeconds * 1000) - true // Always retry if stream should be maintained - } - .catch { e -> - // This should not happen due to retryWhen, but handle just in case - Log.e(TAG, "Message stream terminated unexpectedly", e) - onError?.invoke(e) - } - .collect { message -> - try { - onMessage(message) - } catch (e: Exception) { - Log.e(TAG, "Error processing message", e) - // Don't let handler errors kill the stream - } - } - } - - // ===== Configuration Data Classes ===== - - private data class EventStreamConfig( - val partyId: String, - val onEvent: suspend (SessionEventData) -> Unit, - val onError: ((Throwable) -> Unit)? - ) - - private data class MessageStreamConfig( - val sessionId: String, - val partyId: String, - val partyIndex: Int, - val onMessage: suspend (IncomingMessage) -> Unit, - val onError: ((Throwable) -> Unit)? - ) -} diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt index 0870bf2d..9e5c7d50 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt @@ -14,7 +14,6 @@ import com.durian.tssparty.data.remote.GrpcConnectionState import com.durian.tssparty.data.remote.IncomingMessage import com.durian.tssparty.data.remote.JoinSessionData import com.durian.tssparty.data.remote.SessionEventData -import com.durian.tssparty.data.remote.StreamManager import com.durian.tssparty.domain.model.* import com.durian.tssparty.util.AddressUtils import com.durian.tssparty.util.TransactionUtils @@ -214,36 +213,11 @@ class TssRepository @Inject constructor( coroutineExceptionHandler ) - /** - * StreamManager - 管理 gRPC 双向流的生命周期 - * - * 【架构重构 - 可靠的流管理机制】 - * - * 核心原则(来自 gRPC 官方文档): - * - gRPC 流无法"恢复",必须重新发起 RPC 调用 - * - 流断开后需要重新调用 subscribeSessionEvents() / subscribeMessages() - * - 保存流配置,自动重试失败的流(指数退避) - * - * 修复的关键问题: - * 1. 旧设计尝试"恢复"已关闭的 Flow → 失败(Flow 不是持久化对象) - * 2. 网络断开后 eventStreamSubscribed flag 被清除 → callback 不触发 - * 3. 流失败后没有自动重试机制 → 永久失败 - * - * StreamManager 功能: - * - 保存每个流的配置(partyId, sessionId 等) - * - 流失败后自动重新发起 RPC(不是"恢复") - * - 使用 Flow.retryWhen 实现指数退避重试 - * - 网络重连时,重启所有活跃的流 - * - * 参考资料: - * - https://github.com/grpc/grpc-java/issues/8177 - * - https://grpc.io/docs/guides/keepalive/ - */ - private val streamManager = StreamManager(grpcClient, repositoryScope) companion object { // Job 名称常量 private const val JOB_MESSAGE_COLLECTION = "message_collection" + private const val JOB_MESSAGE_SENDING = "message_sending" private const val JOB_SESSION_EVENT = "session_event" private const val JOB_SESSION_STATUS_POLLING = "session_status_polling" private const val JOB_PROGRESS_COLLECTION = "progress_collection" @@ -316,19 +290,6 @@ class TssRepository @Inject constructor( // Account service URL (configurable via settings) private var accountServiceUrl: String = "https://rwaapi.szaiai.com" - init { - // Monitor gRPC reconnection events and restart streams - // IMPORTANT: Don't use callback pattern - use event Flow for better reliability - repositoryScope.launch { - grpcConnectionEvents - .filter { it is GrpcConnectionEvent.Reconnected } - .collect { - android.util.Log.d("TssRepository", "gRPC reconnected, restarting streams via StreamManager...") - // StreamManager will re-initiate RPC calls (not "restore" closed Flows) - streamManager.restartAllStreams() - } - } - } /** * HTTP client for API calls @@ -485,7 +446,15 @@ class TssRepository @Inject constructor( newPartyId } - grpcClient.registerParty(partyId, "temporary", "1.0.0") + // Register with gRPC and check result + val registerResult = grpcClient.registerParty(partyId, "temporary", "1.0.0") + if (registerResult.isFailure) { + val error = registerResult.exceptionOrNull() + android.util.Log.e("TssRepository", "Failed to register party: ${error?.message}") + throw error ?: Exception("Failed to register party") + } + + android.util.Log.d("TssRepository", "Party registered successfully: $partyId") // Subscribe to session events immediately after registration (like Electron does) startSessionEventSubscription() @@ -507,10 +476,20 @@ class TssRepository @Inject constructor( currentSessionEventPartyId = effectivePartyId android.util.Log.d("TssRepository", "Starting session event subscription for partyId: $effectivePartyId (device partyId: $devicePartyId)") - // Use StreamManager for reliable stream management with auto-reconnection - streamManager.startEventStream( - partyId = effectivePartyId, - onEvent = { event -> + // 使用 JobManager 启动(自动取消同名旧 Job) + // 添加 Flow.retryWhen 实现自动重连(基于 gRPC 官方推荐) + jobManager.launch(JOB_SESSION_EVENT) { + flow { + grpcClient.subscribeSessionEvents(effectivePartyId).collect { event -> + emit(event) + } + } + .retryWhen { cause, attempt -> + android.util.Log.w("TssRepository", "Event stream failed (attempt ${attempt + 1}), retrying in ${kotlin.math.min(attempt + 1, 30)}s: ${cause.message}") + delay(kotlin.math.min(attempt + 1, 30) * 1000L) // 指数退避,最多 30 秒 + true // 永远重试 + } + .collect { event -> android.util.Log.d("TssRepository", "=== Session event received ===") android.util.Log.d("TssRepository", " eventType: ${event.eventType}") android.util.Log.d("TssRepository", " sessionId: ${event.sessionId}") @@ -595,12 +574,8 @@ class TssRepository @Inject constructor( android.util.Log.d("TssRepository", " Reason: sessionId mismatch (event: ${event.sessionId}, active: ${activeSession.sessionId})") } } - }, - onError = { error -> - android.util.Log.e("TssRepository", "Event stream error: ${error.message}") - // StreamManager will automatically retry } - ) + } } /** @@ -614,8 +589,8 @@ class TssRepository @Inject constructor( * partyId from keygen (shareEntity.partyId). */ private fun ensureSessionEventSubscriptionActive(signingPartyId: String? = null) { - // Check if the event stream is still active using StreamManager - val isActive = streamManager.isEventStreamActive() + // Check if the event stream Job is still active using JobManager + val isActive = jobManager.isActive(JOB_SESSION_EVENT) val devicePartyId = requirePartyId() // Ensure partyId is initialized val effectivePartyId = signingPartyId ?: currentSessionEventPartyId ?: devicePartyId android.util.Log.d("TssRepository", "Checking session event subscription: isActive=$isActive, effectivePartyId=$effectivePartyId") @@ -2068,8 +2043,7 @@ class TssRepository @Inject constructor( android.util.Log.d("TssRepository", "Starting message routing: sessionId=$sessionId, routingPartyId=$effectivePartyId") // Part 1: Collect outgoing messages from TSS and route via gRPC - // This doesn't need StreamManager - it's a local Flow collection - jobManager.launch(JOB_MESSAGE_COLLECTION) { + jobManager.launch(JOB_MESSAGE_SENDING) { tssNativeBridge.outgoingMessages.collect { message -> val payload = Base64.decode(message.payload, Base64.NO_WRAP) grpcClient.routeMessage( @@ -2084,12 +2058,19 @@ class TssRepository @Inject constructor( } // Part 2: Subscribe to incoming messages from gRPC and send to TSS - // Use StreamManager for reliable gRPC stream management with auto-reconnection - streamManager.startMessageStream( - sessionId = sessionId, - partyId = effectivePartyId, - partyIndex = partyIndex, - onMessage = { message -> + // 添加 Flow.retryWhen 实现自动重连(基于 gRPC 官方推荐) + jobManager.launch(JOB_MESSAGE_COLLECTION) { + flow { + grpcClient.subscribeMessages(sessionId, effectivePartyId).collect { message -> + emit(message) + } + } + .retryWhen { cause, attempt -> + android.util.Log.w("TssRepository", "Message stream failed (attempt ${attempt + 1}), retrying in ${kotlin.math.min(attempt + 1, 30)}s: ${cause.message}") + delay(kotlin.math.min(attempt + 1, 30) * 1000L) // 指数退避,最多 30 秒 + true // 永远重试 + } + .collect { message -> // Find party index from party ID val session = _currentSession.value val fromPartyIndex = session?.participants?.find { it.partyId == message.fromParty }?.partyIndex @@ -2103,12 +2084,8 @@ class TssRepository @Inject constructor( } else { android.util.Log.w("TssRepository", "Unknown fromParty: ${message.fromParty}, skipping message") } - }, - onError = { error -> - android.util.Log.e("TssRepository", "Message stream error: ${error.message}") - // StreamManager will automatically retry } - ) + } } /**