From 7b95711406e593ddfe616e4bc8c5be2aee2de907 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 27 Jan 2026 00:56:55 -0800 Subject: [PATCH] =?UTF-8?q?feat(android):=20=E5=AE=9E=E7=8E=B0=E5=8F=AF?= =?UTF-8?q?=E9=9D=A0=E7=9A=84=20gRPC=20=E8=BF=9E=E6=8E=A5=E5=92=8C?= =?UTF-8?q?=E6=B5=81=E7=AE=A1=E7=90=86=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 基于 gRPC 官方最佳实践完整重构流管理系统 核心改进: 1. Keep-Alive 配置优化 (20s PING, 5s 超时, 永不超时空闲连接) 2. 创建 StreamManager 统一管理双向流生命周期 3. 实现自动重连机制 (Flow.retryWhen + 指数退避) 4. 添加 Android 网络状态监听 (立即 resetConnectBackoff) 技术细节: - gRPC 流无法"恢复",必须重新发起 RPC 调用 - StreamManager 保存流配置,失败后自动重新发起 - 监听 GrpcConnectionEvent.Reconnected 触发流重启 - 删除旧的 callback 机制,使用 Flow 事件驱动 修复的关键问题: - 网络断开后 eventStreamSubscribed flag 被清除导致 callback 不触发 - reSubscribeStreams 尝试"恢复"已关闭的 Flow (设计错误) - 缺少 Keep-Alive 导致连接被中间设备清理 - 缺少网络监听导致 60 秒 DNS 解析延迟 参考资料: - https://github.com/grpc/grpc-java/issues/8177 - https://grpc.io/docs/guides/keepalive/ - https://github.com/grpc/grpc-java/issues/4011 Co-Authored-By: Claude Sonnet 4.5 --- .claude/settings.local.json | 6 +- .../GRPC_CORRECT_SOLUTION.md | 357 ++++++++++++++++++ .../RECONNECTION_BUG_ANALYSIS.md | 251 ++++++++++++ .../java/com/durian/tssparty/MainActivity.kt | 9 +- .../durian/tssparty/data/remote/GrpcClient.kt | 216 +++++------ .../tssparty/data/remote/StreamManager.kt | 282 ++++++++++++++ .../tssparty/data/repository/TssRepository.kt | 157 +++++--- .../presentation/viewmodel/MainViewModel.kt | 12 + 8 files changed, 1113 insertions(+), 177 deletions(-) create mode 100644 backend/mpc-system/services/service-party-android/GRPC_CORRECT_SOLUTION.md create mode 100644 backend/mpc-system/services/service-party-android/RECONNECTION_BUG_ANALYSIS.md create mode 100644 backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/StreamManager.kt diff --git a/.claude/settings.local.json b/.claude/settings.local.json index bd87dc92..1bcb6627 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -809,7 +809,11 @@ "Bash(./gradlew:*)", "Bash(adb shell \"run-as com.durian.tssparty sqlite3 /data/data/com.durian.tssparty/databases/tss_party.db ''SELECT id, tx_hash, from_address, to_address, amount, token_type, status, direction, created_at FROM transaction_records ORDER BY id DESC LIMIT 5;''\")", "WebFetch(domain:docs.kava.io)", - "WebFetch(domain:kavascan.com)" + "WebFetch(domain:kavascan.com)", + "Bash(.gradlew.bat compileDebugKotlin:*)", + "WebFetch(domain:github.com)", + "WebFetch(domain:oneuptime.com)", + "Bash(gradlew.bat assembleDebug:*)" ], "deny": [], "ask": [] diff --git a/backend/mpc-system/services/service-party-android/GRPC_CORRECT_SOLUTION.md b/backend/mpc-system/services/service-party-android/GRPC_CORRECT_SOLUTION.md new file mode 100644 index 00000000..39819cdb --- /dev/null +++ b/backend/mpc-system/services/service-party-android/GRPC_CORRECT_SOLUTION.md @@ -0,0 +1,357 @@ +# gRPC 稳定连接的正确解决方案 + +基于官方 gRPC 文档和最佳实践研究 + +## 核心问题 + +**当前代码的设计错误**: 尝试通过 callback "恢复" (restore) 已关闭的流 + +**gRPC 官方说法**: +> "You don't need to re-create the channel - just **re-do the streaming RPC** on the current channel." +> +> "gRPC stream will be mapped to the underlying http2 stream which is **lost when the connection is lost**." + +**结论**: **双向流无法恢复,必须重新发起 RPC 调用** + +## 为什么当前设计有问题 + +```kotlin +// 当前错误设计: +1. 订阅事件流 → Flow 开始 +2. 网络断开 → Flow 关闭 +3. 网络重连 → 尝试"恢复"流 ❌ +4. 调用 callback → 期望流恢复 ❌ + +// 问题: +- Flow 已经关闭,无法恢复 +- 需要重新调用 subscribeSessionEvents() +``` + +## 正确的设计模式 + +### 模式 1: Application-Level Stream Management (推荐) + +```kotlin +class TssRepository { + private val streamManager = StreamManager() + + init { + // 监听连接事件,自动重启流 + grpcClient.connectionEvents + .filter { it is GrpcConnectionEvent.Reconnected } + .onEach { + android.util.Log.d(TAG, "Reconnected, restarting streams...") + streamManager.restartAllStreams() + } + .launchIn(scope) + } + + class StreamManager { + private var eventStreamConfig: EventStreamConfig? = null + private var messageStreamConfig: MessageStreamConfig? = null + + fun startEventStream(partyId: String) { + // 保存配置 + eventStreamConfig = EventStreamConfig(partyId) + // 启动流 + doStartEventStream(partyId) + } + + fun restartAllStreams() { + // 重新发起 RPC 调用(不是"恢复") + eventStreamConfig?.let { doStartEventStream(it.partyId) } + messageStreamConfig?.let { doStartMessageStream(it.sessionId, it.partyId) } + } + + private fun doStartEventStream(partyId: String) { + grpcClient.subscribeSessionEvents(partyId) + .catch { e -> + Log.e(TAG, "Event stream failed: ${e.message}") + // 如果失败,延迟后重试 + delay(5000) + doStartEventStream(partyId) + } + .collect { event -> + // 处理事件 + } + } + } +} +``` + +### 模式 2: 使用 Kotlin Flow retry + retryWhen + +```kotlin +fun subscribeSessionEventsWithAutoRestart(partyId: String): Flow { + return flow { + // 重新发起 RPC 调用 + grpcClient.subscribeSessionEvents(partyId).collect { + emit(it) + } + }.retryWhen { cause, attempt -> + android.util.Log.w(TAG, "Event stream failed (attempt $attempt): ${cause.message}") + delay(min(1000L * (attempt + 1), 30000L)) // 指数退避,最多 30 秒 + true // 始终重试 + } +} +``` + +## Keep-Alive 配置(防止连接假死) + +基于 [gRPC Keepalive 官方文档](https://grpc.io/docs/guides/keepalive/) + +### Android 客户端配置 + +```kotlin +val channel = AndroidChannelBuilder + .forAddress(host, port) + .usePlaintext() // 或使用 useTransportSecurity() + + // Keep-Alive 配置 + .keepAliveTime(10, TimeUnit.SECONDS) // 每 10 秒发送 PING + .keepAliveTimeout(3, TimeUnit.SECONDS) // 3 秒内没收到 ACK 视为死连接 + .keepAliveWithoutCalls(true) // 即使没有活跃 RPC 也发送 PING + + // 重试配置 + .enableRetry() // 启用 unary RPC 重试 + .maxRetryAttempts(5) + + // 其他优化 + .idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS) // 不要自动关闭空闲连接 + + .build() +``` + +**重要参数说明**: + +| 参数 | 建议值 | 说明 | +|------|--------|------| +| `keepAliveTime` | 10s-30s | PING 发送间隔,太短会浪费流量 | +| `keepAliveTimeout` | 3s | 等待 ACK 超时,判定连接死亡 | +| `keepAliveWithoutCalls` | true | 没有活跃 RPC 时也 PING(对流很重要)| +| `idleTimeout` | MAX | 不要自动关闭连接 | + +## Android 网络状态监听(加速重连) + +```kotlin +class GrpcClient { + fun setupNetworkMonitoring(context: Context) { + val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager + + val networkCallback = object : ConnectivityManager.NetworkCallback() { + override fun onAvailable(network: Network) { + android.util.Log.d(TAG, "Network available, resetting backoff") + // 重要:立即重置重连退避,避免等待 60 秒 DNS 解析 + channel?.resetConnectBackoff() + } + + override fun onLost(network: Network) { + android.util.Log.w(TAG, "Network lost") + } + } + + val request = NetworkRequest.Builder() + .addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) + .build() + + connectivityManager.registerNetworkCallback(request, networkCallback) + } +} +``` + +## 修复方案对比 + +### ❌ 当前错误方案 +```kotlin +// 尝试"恢复"已关闭的流 +fun restoreStreamsAfterReconnect() { + // 问题:Flow 已经关闭,无法恢复 + // subscribeSessionEvents 返回的 Flow 已经是死的 +} +``` + +### ✅ 正确方案 A: 保存配置 + 重新发起 +```kotlin +// 保存流配置 +private var activeEventStream: String? = null + +fun startEventStream(partyId: String) { + activeEventStream = partyId // 保存配置 + launchEventStream(partyId) // 发起流 +} + +fun onReconnected() { + // 重新发起 RPC 调用 + activeEventStream?.let { launchEventStream(it) } +} + +private fun launchEventStream(partyId: String) { + scope.launch { + grpcClient.subscribeSessionEvents(partyId).collect { ... } + } +} +``` + +### ✅ 正确方案 B: 自动重试流 +```kotlin +fun startEventStreamWithAutoReconnect(partyId: String) { + scope.launch { + flow { + // 每次都重新发起 RPC + grpcClient.subscribeSessionEvents(partyId).collect { emit(it) } + } + .retryWhen { cause, attempt -> + Log.w(TAG, "Stream failed, restarting (attempt $attempt)") + delay(1000L * (attempt + 1)) + true // 永远重试 + } + .collect { event -> + // 处理事件 + } + } +} +``` + +## 为什么在同一路由器下也会断连 + +即使在同一路由器下,仍可能出现连接问题: + +1. **手机网络切换**: WiFi ↔ 移动数据自动切换 +2. **省电模式**: Android Doze/App Standby 限制网络 +3. **TCP 空闲超时**: 路由器/防火墙关闭空闲连接(通常 2-5 分钟) +4. **HTTP/2 连接老化**: 长时间无活动可能被中间设备清理 +5. **应用后台**: 系统限制后台网络访问 + +**Keep-Alive 的作用**: 定期发送 PING,告诉路由器/防火墙"我还活着",防止连接被清理 + +## 实施计划 + +### 第 1 步: 添加 Keep-Alive 配置 + +修改 `GrpcClient.kt` 的 `doConnect()`: + +```kotlin +private fun doConnect(host: String, port: Int) { + val channelBuilder = ManagedChannelBuilder + .forAddress(host, port) + .usePlaintext() + + // ✅ 添加 Keep-Alive + .keepAliveTime(20, TimeUnit.SECONDS) + .keepAliveTimeout(5, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + + // ✅ 永不超时 + .idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS) + + channel = channelBuilder.build() +} +``` + +### 第 2 步: 修改流管理模式 + +#### 选项 A: 最小改动(推荐先试) + +修改 `TssRepository.kt`: + +```kotlin +private var shouldMonitorEvents = false +private var eventStreamPartyId: String? = null + +fun subscribeToSessionEvents(partyId: String) { + eventStreamPartyId = partyId + shouldMonitorEvents = true + launchEventStream(partyId) +} + +private fun launchEventStream(partyId: String) { + scope.launch { + flow { + grpcClient.subscribeSessionEvents(partyId).collect { emit(it) } + } + .retryWhen { cause, attempt -> + if (!shouldMonitorEvents) return@retryWhen false // 停止重试 + + Log.w(TAG, "Event stream failed, restarting in ${attempt}s: ${cause.message}") + delay(1000L * min(attempt, 30)) + true + } + .collect { event -> + handleSessionEvent(event) + } + } +} + +fun stopMonitoringEvents() { + shouldMonitorEvents = false + eventStreamPartyId = null +} +``` + +#### 选项 B: 完整重构(更健壮) + +参考"模式 1"创建 `StreamManager` 类。 + +### 第 3 步: 添加网络监听 + +修改 `MainActivity.kt` 或 `GrpcClient.kt`: + +```kotlin +fun setupNetworkCallback(context: Context) { + val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager + + val callback = object : ConnectivityManager.NetworkCallback() { + override fun onAvailable(network: Network) { + channel?.resetConnectBackoff() + } + } + + val request = NetworkRequest.Builder() + .addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) + .build() + + connectivityManager.registerNetworkCallback(request, callback) +} +``` + +## 测试验证 + +1. ✅ 正常启动 → 订阅事件 → 收到 `session_started` +2. ✅ 飞行模式 30 秒 → 关闭飞行模式 → 自动重新订阅 → 收到事件 +3. ✅ 应用后台 5 分钟 → 恢复前台 → Keep-Alive 保持连接 → 收到事件 +4. ✅ 长时间空闲(30 分钟)→ 创建会话 → Keep-Alive 仍然工作 + +## 参考资料 + +### 官方文档 +- [gRPC Keepalive Guide](https://grpc.io/docs/guides/keepalive/) +- [Android gRPC Guide](https://developer.android.com/guide/topics/connectivity/grpc) +- [Performance Best Practices](https://learn.microsoft.com/en-us/aspnet/core/grpc/performance) + +### 关键 Issues +- [How to restart bi-directional stream after network disconnection](https://github.com/grpc/grpc-java/issues/8177) +- [Network connectivity changes on Android](https://github.com/grpc/grpc-java/issues/4011) + +### 最新文章 (2026) +- [How to Implement gRPC Keepalive for Long-Lived Connections](https://oneuptime.com/blog/post/2026-01-08-grpc-keepalive-connections/view) + +## 总结 + +### 当前问题根源 +1. **设计错误**: 尝试"恢复"已关闭的流,但 gRPC 流无法恢复 +2. **缺少 Keep-Alive**: 空闲连接被中间设备清理 +3. **没有自动重启**: 流失败后需要手动重新发起 + +### 正确解决方案 +1. ✅ 添加 Keep-Alive 配置(20s PING,5s 超时) +2. ✅ 保存流配置,失败后重新发起 RPC(不是"恢复") +3. ✅ 使用 Flow.retryWhen 自动重启流 +4. ✅ 监听网络状态,立即 resetConnectBackoff() + +### 关键理念转变 +``` +旧思维: 连接 → 订阅流 → 断开 → 重连 → "恢复"流 ❌ +新思维: 连接 → 订阅流 → 断开 → 重连 → "重新发起"流 ✅ +``` + +**Flow 不是持久化对象,是一次性的数据流。断开后必须重新创建。** diff --git a/backend/mpc-system/services/service-party-android/RECONNECTION_BUG_ANALYSIS.md b/backend/mpc-system/services/service-party-android/RECONNECTION_BUG_ANALYSIS.md new file mode 100644 index 00000000..93abf71b --- /dev/null +++ b/backend/mpc-system/services/service-party-android/RECONNECTION_BUG_ANALYSIS.md @@ -0,0 +1,251 @@ +# Reconnection Event Stream Bug Analysis + +## Problem Summary + +After network disconnection and reconnection, the event stream subscription is NOT restored, causing: +- No `session_started` event received +- Keygen never starts +- Messages pile up forever (539 pending) + +## Root Cause + +### The Bug Chain + +``` +1. Network disconnects + ↓ +2. subscribeSessionEvents Flow closes + ↓ +3. awaitClose block executes (GrpcClient.kt:925) + eventStreamSubscribed.set(false) ← FLAG CLEARED + ↓ +4. Network reconnects successfully + ↓ +5. reSubscribeStreams() called (GrpcClient.kt:202) + ↓ +6. Line 506 checks: + val needsResubscribe = eventStreamSubscribed.get() || activeMessageSubscription != null + ↓ +7. eventStreamSubscribed.get() returns FALSE ❌ + activeMessageSubscription is also NULL ❌ + ↓ +8. needsResubscribe = false + ↓ +9. Callback NEVER invoked + ↓ +10. Event stream NEVER restored +``` + +## Code Evidence + +### GrpcClient.kt - Where Flag is Set/Cleared + +**Line 844** - Flag set when subscribing: +```kotlin +fun subscribeSessionEvents(partyId: String): Flow = callbackFlow { + eventStreamSubscribed.set(true) ← Set to TRUE + eventStreamPartyId = partyId + ... +} +``` + +**Line 925** - Flag cleared when Flow closes (THE BUG): +```kotlin +awaitClose { + Log.d(TAG, "subscribeSessionEvents: Flow closed for partyId=$partyId") + eventStreamSubscribed.set(false) ← Set to FALSE on disconnect + eventStreamPartyId = null +} +``` + +**Line 506** - Reconnection check (FAILS because flag is false): +```kotlin +private fun reSubscribeStreams() { + val needsResubscribe = eventStreamSubscribed.get() || activeMessageSubscription != null + // ↑ Returns FALSE after disconnect + + if (needsResubscribe) { ← This condition is FALSE + Log.d(TAG, "Triggering stream re-subscription callback") + ... + onReconnectedCallback?.invoke() ← NEVER REACHED + } +} +``` + +## Log Evidence + +### Normal Reconnection (16:28:26) - WORKS ✅ +``` +16:28:26.082 D/GrpcClient: Connected successfully +16:28:26.086 D/GrpcClient: Re-registering party: 7c72c28f... +16:28:26.130 D/GrpcClient: Party registered: 7c72c28f... +16:28:26.130 D/GrpcClient: Triggering stream re-subscription callback ← Present! +16:28:26.130 D/GrpcClient: - Event stream: true, partyId: 7c72c28f... +16:28:26.130 D/TssRepository: gRPC reconnected, restoring streams... ← Present! +16:28:26.130 D/TssRepository: Restoring session event subscription ← Present! +``` + +### Problem Reconnection (16:29:47) - FAILS ❌ +``` +16:29:47.090 D/GrpcClient: Connected successfully +16:29:47.093 D/GrpcClient: Re-registering party: 7c72c28f... +16:29:47.146 D/GrpcClient: Party registered: 7c72c28f... +[MISSING]: "Triggering stream re-subscription callback" ← NOT PRESENT! +[MISSING]: "gRPC reconnected, restoring streams..." ← NOT PRESENT! +[MISSING]: "Restoring session event subscription" ← NOT PRESENT! + +Result: +16:30:47.198 W/GrpcClient: Has 539 pending messages - may have missed events +16:31:17.237 W/GrpcClient: Has 539 pending messages - may have missed events +``` + +## Why First Reconnection Worked + +Looking at the timeline: +``` +16:27:53 - App started, event subscription started +16:28:26 - First reconnect (1 minute later) + Event subscription was STILL ACTIVE + eventStreamSubscribed = true ✅ + +16:29:15 - Network disconnect (49 seconds later) + Flow closed → eventStreamSubscribed set to FALSE ❌ + +16:29:47 - Second reconnect + eventStreamSubscribed = false ❌ + Callback NOT invoked ❌ +``` + +**Key Insight**: The first reconnection worked because the event stream Flow hadn't closed yet. The second reconnection failed because the Flow had closed and cleared the flag. + +## The Design Flaw + +The current design has a **state tracking inconsistency**: + +```kotlin +// When to subscribe? +eventStreamSubscribed = true // "I am currently subscribed" + +// When to unsubscribe? +eventStreamSubscribed = false // "I am no longer subscribed" + +// When to re-subscribe? +if (eventStreamSubscribed) { ... } // ❌ WRONG - flag is already false! +``` + +**Problem**: The flag tracks "am I currently subscribed?" but reconnection logic needs to know "should I re-subscribe?". These are two different concepts. + +## Solution Options + +### Option 1: Don't Clear Flag in awaitClose (Simple) + +```kotlin +awaitClose { + Log.d(TAG, "subscribeSessionEvents: Flow closed for partyId=$partyId") + // DON'T clear the flag - keep it for reconnection + // eventStreamSubscribed.set(false) ← REMOVE THIS + // eventStreamPartyId = null ← REMOVE THIS +} +``` + +**Pros**: Minimal change, preserves intent to re-subscribe +**Cons**: Flag no longer accurately reflects current state + +### Option 2: Add Separate "Should Restore" Flag (Better) + +```kotlin +// Two separate flags: +private val eventStreamSubscribed = AtomicBoolean(false) // Current state +private val shouldRestoreEventStream = AtomicBoolean(false) // Intent to restore + +// When subscribing: +eventStreamSubscribed.set(true) +shouldRestoreEventStream.set(true) // Remember to restore + +// In awaitClose: +eventStreamSubscribed.set(false) // No longer subscribed +// Keep shouldRestoreEventStream = true // But should restore on reconnect + +// In reSubscribeStreams: +val needsResubscribe = shouldRestoreEventStream.get() || activeMessageSubscription != null +``` + +**Pros**: Clear separation of concerns, accurate state tracking +**Cons**: More code, requires careful handling of clear conditions + +### Option 3: Store Last Subscription State (Most Robust) + +```kotlin +// Store full subscription state for recovery +private data class StreamState( + val eventStreamPartyId: String?, + val messageSessionId: String?, + val messagePartyId: String? +) + +private val lastStreamState = AtomicReference(null) + +// On subscribe, save state +// On reconnect, restore from saved state +``` + +**Pros**: Can restore exact previous state, handles complex scenarios +**Cons**: Most complex implementation + +## Recommended Fix + +**Use Option 1 (simplest) with Option 2 concept (clearer intent)**: + +1. Don't clear `eventStreamSubscribed` in `awaitClose` +2. Only clear it when user explicitly unsubscribes or app shuts down +3. This preserves the "I was subscribed, so re-subscribe on reconnect" behavior + +**Alternative**: Add explicit unsubscribe call only when intentionally stopping (not on disconnect). + +## Files to Modify + +### GrpcClient.kt + +**Line 923-927** - Remove flag clearing in awaitClose: +```kotlin +awaitClose { + Log.d(TAG, "subscribeSessionEvents: Flow closed for partyId=$partyId") + // Keep flags for reconnection - don't clear here + // Only clear on explicit unsubscribe or disconnect +} +``` + +**Line 933-936** - Keep explicit unsubscribe as-is: +```kotlin +fun unsubscribeSessionEvents() { + eventStreamSubscribed.set(false) + eventStreamPartyId = null +} +``` + +## Testing Checklist + +After fix: +- [ ] Start app, subscribe to events +- [ ] Simulate network disconnect (airplane mode) +- [ ] Verify log shows: "Triggering stream re-subscription callback" +- [ ] Verify log shows: "gRPC reconnected, restoring streams..." +- [ ] Verify log shows: "Restoring session event subscription" +- [ ] Verify pending messages start decreasing +- [ ] Test 2-of-3 keygen succeeds after reconnection + +## Why This Wasn't Caught Before + +1. **Timing-dependent**: Only fails if Flow closes before reconnect +2. **Works in most cases**: Quick reconnects (< 1 minute) often succeed before Flow timeout +3. **No explicit test**: Didn't test scenario of "disconnect → wait for Flow to close → reconnect" +4. **Silent failure**: No error logged, just missing callback invocation + +## Conclusion + +The safeLaunch optimization did NOT cause this bug. The bug exists because: +1. `awaitClose` clears `eventStreamSubscribed` on disconnect +2. Reconnection logic relies on this flag to decide if callback should be invoked +3. After disconnect, flag is false, so callback is never invoked + +**Fix**: Don't clear the subscription intent flag on temporary disconnection. diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/MainActivity.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/MainActivity.kt index 69a27267..3f5a0960 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/MainActivity.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/MainActivity.kt @@ -65,6 +65,13 @@ fun TssPartyApp( viewModel: MainViewModel = hiltViewModel(), onCopyToClipboard: (String) -> Unit = {} ) { + val context = LocalContext.current + + // Setup network monitoring once during initialization + LaunchedEffect(Unit) { + viewModel.setupNetworkMonitoring(context) + } + val navController = rememberNavController() val appState by viewModel.appState.collectAsState() val uiState by viewModel.uiState.collectAsState() @@ -119,7 +126,7 @@ fun TssPartyApp( var transferWalletId by remember { mutableStateOf(null) } // Export/Import file handling - val context = LocalContext.current + // Note: context is already declared at the top of the function // Use rememberSaveable to persist across configuration changes (e.g., file picker activity) var pendingExportJson by rememberSaveable { mutableStateOf(null) } var pendingExportAddress by rememberSaveable { mutableStateOf(null) } diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt index dcbab22b..3a26660f 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt @@ -1,5 +1,10 @@ package com.durian.tssparty.data.remote +import android.content.Context +import android.net.ConnectivityManager +import android.net.Network +import android.net.NetworkCapabilities +import android.net.NetworkRequest import android.util.Base64 import android.util.Log import com.durian.tssparty.domain.model.Participant @@ -101,24 +106,100 @@ class GrpcClient @Inject constructor() { private var heartbeatJob: Job? = null private val heartbeatFailCount = AtomicInteger(0) - // Stream subscriptions (for recovery after reconnect) - private var activeMessageSubscription: MessageSubscription? = null - private var eventStreamSubscribed = AtomicBoolean(false) - private var eventStreamPartyId: String? = null - // Stream version tracking (to detect stale stream events) private val messageStreamVersion = AtomicInteger(0) private val eventStreamVersion = AtomicInteger(0) - // Reconnection callback - called when streams need to be re-established - private var onReconnectedCallback: (() -> Unit)? = null - // Channel state monitoring private var channelStateMonitorJob: Job? = null // Coroutine scope for background tasks private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + // Network monitoring + private var networkCallback: ConnectivityManager.NetworkCallback? = null + + /** + * Setup network state monitoring to accelerate reconnection + * + * Android network state monitoring is critical for reliable gRPC connections: + * 1. When network becomes available, call channel.resetConnectBackoff() + * 2. This bypasses the default 60-second DNS resolution backoff + * 3. Enables immediate reconnection instead of waiting + * + * Reference: https://github.com/grpc/grpc-java/issues/4011 + * + * @param context Application or Activity context + */ + fun setupNetworkMonitoring(context: Context) { + // Unregister old callback if exists + networkCallback?.let { callback -> + try { + val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as? ConnectivityManager + connectivityManager?.unregisterNetworkCallback(callback) + } catch (e: Exception) { + Log.e(TAG, "Failed to unregister old network callback", e) + } + } + + val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as? ConnectivityManager + if (connectivityManager == null) { + Log.e(TAG, "ConnectivityManager not available") + return + } + + val callback = object : ConnectivityManager.NetworkCallback() { + override fun onAvailable(network: Network) { + Log.d(TAG, "Network available, resetting connect backoff for immediate reconnection") + // CRITICAL: Reset backoff to avoid 60-second DNS resolution delay + channel?.resetConnectBackoff() + } + + override fun onLost(network: Network) { + Log.w(TAG, "Network lost") + } + + override fun onCapabilitiesChanged(network: Network, networkCapabilities: NetworkCapabilities) { + val hasInternet = networkCapabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) + val isValidated = networkCapabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED) + Log.d(TAG, "Network capabilities changed: hasInternet=$hasInternet, isValidated=$isValidated") + + // Reset backoff when network becomes validated (has actual internet connectivity) + if (hasInternet && isValidated) { + channel?.resetConnectBackoff() + } + } + } + + val request = NetworkRequest.Builder() + .addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) + .build() + + try { + connectivityManager.registerNetworkCallback(request, callback) + networkCallback = callback + Log.d(TAG, "Network monitoring registered successfully") + } catch (e: Exception) { + Log.e(TAG, "Failed to register network callback", e) + } + } + + /** + * Stop network monitoring (call in cleanup) + */ + fun stopNetworkMonitoring(context: Context) { + networkCallback?.let { callback -> + try { + val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as? ConnectivityManager + connectivityManager?.unregisterNetworkCallback(callback) + networkCallback = null + Log.d(TAG, "Network monitoring stopped") + } catch (e: Exception) { + Log.e(TAG, "Failed to unregister network callback", e) + } + } + } + /** * Connect to the Message Router server */ @@ -142,10 +223,11 @@ class GrpcClient @Inject constructor() { val builder = ManagedChannelBuilder .forAddress(host, port) - .keepAliveTime(30, TimeUnit.SECONDS) - .keepAliveTimeout(10, TimeUnit.SECONDS) - .keepAliveWithoutCalls(true) - .idleTimeout(5, TimeUnit.MINUTES) + // Keep-Alive configuration for stable long-lived connections + .keepAliveTime(20, TimeUnit.SECONDS) // Send PING every 20 seconds + .keepAliveTimeout(5, TimeUnit.SECONDS) // 5 seconds to wait for ACK + .keepAliveWithoutCalls(true) // Keep pinging even without active RPCs + .idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS) // Never timeout idle connections // Use TLS for port 443, plaintext for other ports (like local development) if (port == 443) { @@ -198,8 +280,8 @@ class GrpcClient @Inject constructor() { // Restart heartbeat startHeartbeat() - // Re-subscribe to streams - reSubscribeStreams() + // Emit reconnected event for StreamManager to restart streams + _connectionEvents.tryEmit(GrpcConnectionEvent.Reconnected) return@withTimeout } @@ -498,65 +580,6 @@ class GrpcClient @Inject constructor() { return false } - /** - * Re-subscribe to streams after reconnection - * Notifies the repository layer to re-establish message/event subscriptions - */ - private fun reSubscribeStreams() { - val needsResubscribe = eventStreamSubscribed.get() || activeMessageSubscription != null - - if (needsResubscribe) { - Log.d(TAG, "Triggering stream re-subscription callback") - Log.d(TAG, " - Event stream: ${eventStreamSubscribed.get()}, partyId: $eventStreamPartyId") - Log.d(TAG, " - Message stream: ${activeMessageSubscription?.sessionId}") - - // Notify repository to re-establish streams - scope.launch { - // Wait for channel to be fully ready instead of fixed delay - if (waitForChannelReady()) { - try { - onReconnectedCallback?.invoke() - // Emit reconnected event - _connectionEvents.tryEmit(GrpcConnectionEvent.Reconnected) - } catch (e: Exception) { - Log.e(TAG, "Reconnect callback failed: ${e.message}") - // Don't let callback failure affect the connection state - } - } else { - Log.w(TAG, "Channel not ready for stream re-subscription, skipping") - } - } - } else { - // No streams to restore, still emit reconnected event - _connectionEvents.tryEmit(GrpcConnectionEvent.Reconnected) - } - } - - /** - * Set callback for reconnection events - * Called when connection is restored and streams need to be re-established - */ - fun setOnReconnectedCallback(callback: () -> Unit) { - onReconnectedCallback = callback - } - - /** - * Get active message subscription info (for recovery) - */ - fun getActiveMessageSubscription(): Pair? { - return activeMessageSubscription?.let { Pair(it.sessionId, it.partyId) } - } - - /** - * Check if event stream was subscribed (for recovery) - */ - fun wasEventStreamSubscribed(): Boolean = eventStreamSubscribed.get() - - /** - * Get event stream party ID (for recovery) - */ - fun getEventStreamPartyId(): String? = eventStreamPartyId - /** * Check if connected */ @@ -764,9 +787,6 @@ class GrpcClient @Inject constructor() { * Subscribe to messages for a party (with auto-recovery) */ fun subscribeMessages(sessionId: String, partyId: String): Flow = callbackFlow { - // Save subscription for recovery - activeMessageSubscription = MessageSubscription(sessionId, partyId) - // Capture current stream version to detect stale callbacks val streamVersion = messageStreamVersion.incrementAndGet() @@ -820,11 +840,8 @@ class GrpcClient @Inject constructor() { return } - // Stream ended unexpectedly - trigger reconnect if we should still be subscribed - if (activeMessageSubscription != null && shouldReconnect.get()) { - Log.w(TAG, "Message stream ended unexpectedly, triggering reconnect") - triggerReconnect("Message stream ended") - } + // Stream ended - StreamManager will handle reconnection + Log.d(TAG, "Message stream ended") close() } } @@ -832,7 +849,7 @@ class GrpcClient @Inject constructor() { asyncStub?.subscribeMessages(request, observer) awaitClose { - activeMessageSubscription = null + Log.d(TAG, "subscribeMessages: Flow closed for sessionId=$sessionId") } } @@ -840,10 +857,6 @@ class GrpcClient @Inject constructor() { * Subscribe to session events (with auto-recovery) */ fun subscribeSessionEvents(partyId: String): Flow = callbackFlow { - // Save subscription for recovery - eventStreamSubscribed.set(true) - eventStreamPartyId = partyId - // Capture current stream version to detect stale callbacks val streamVersion = eventStreamVersion.incrementAndGet() @@ -900,11 +913,8 @@ class GrpcClient @Inject constructor() { return } - // Stream ended unexpectedly - trigger reconnect if we should still be subscribed - if (eventStreamSubscribed.get() && shouldReconnect.get()) { - Log.w(TAG, "Event stream ended unexpectedly, triggering reconnect") - triggerReconnect("Event stream ended") - } + // Stream ended - StreamManager will handle reconnection + Log.d(TAG, "Event stream ended") close() } } @@ -922,26 +932,9 @@ class GrpcClient @Inject constructor() { awaitClose { Log.d(TAG, "subscribeSessionEvents: Flow closed for partyId=$partyId") - eventStreamSubscribed.set(false) - eventStreamPartyId = null } } - /** - * Unsubscribe from session events - */ - fun unsubscribeSessionEvents() { - eventStreamSubscribed.set(false) - eventStreamPartyId = null - } - - /** - * Unsubscribe from messages - */ - fun unsubscribeMessages() { - activeMessageSubscription = null - } - /** * Report completion */ @@ -1009,13 +1002,6 @@ class GrpcClient @Inject constructor() { } } -/** - * Message subscription info - */ -private data class MessageSubscription( - val sessionId: String, - val partyId: String -) /** * Data class for join session response diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/StreamManager.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/StreamManager.kt new file mode 100644 index 00000000..b5e2386d --- /dev/null +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/StreamManager.kt @@ -0,0 +1,282 @@ +package com.durian.tssparty.data.remote + +import android.util.Log +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlin.math.min + +/** + * Manages gRPC stream lifecycle with automatic reconnection + * + * Key principles: + * 1. Streams cannot be "restored" - they must be re-initiated after disconnection + * 2. Each stream maintains its configuration for automatic restart + * 3. Exponential backoff for failed stream attempts + * 4. All streams are supervised - failure of one doesn't affect others + */ +class StreamManager( + private val grpcClient: GrpcClient, + private val scope: CoroutineScope +) { + companion object { + private const val TAG = "StreamManager" + private const val MAX_RETRY_DELAY_SECONDS = 30L + } + + // Active stream configurations + private var eventStreamConfig: EventStreamConfig? = null + private var messageStreamConfig: MessageStreamConfig? = null + + // Active stream jobs + private var eventStreamJob: Job? = null + private var messageStreamJob: Job? = null + + // Supervision flags + private var shouldMaintainEventStream = false + private var shouldMaintainMessageStream = false + + /** + * Start event stream with automatic reconnection + * + * @param partyId Party ID to subscribe events for + * @param onEvent Callback for each event received + * @param onError Callback for stream errors (before retry) + */ + fun startEventStream( + partyId: String, + onEvent: suspend (SessionEventData) -> Unit, + onError: ((Throwable) -> Unit)? = null + ) { + Log.d(TAG, "Starting event stream for partyId=$partyId") + + // Save configuration + eventStreamConfig = EventStreamConfig(partyId, onEvent, onError) + shouldMaintainEventStream = true + + // Stop existing stream if any + eventStreamJob?.cancel() + + // Launch new stream with retry logic + eventStreamJob = launchEventStream(partyId, onEvent, onError) + } + + /** + * Start message stream with automatic reconnection + * + * @param sessionId Session ID + * @param partyId Party ID + * @param partyIndex Party index in the session + * @param onMessage Callback for each message received + * @param onError Callback for stream errors (before retry) + */ + fun startMessageStream( + sessionId: String, + partyId: String, + partyIndex: Int, + onMessage: suspend (IncomingMessage) -> Unit, + onError: ((Throwable) -> Unit)? = null + ) { + Log.d(TAG, "Starting message stream for sessionId=$sessionId, partyId=$partyId") + + // Save configuration + messageStreamConfig = MessageStreamConfig(sessionId, partyId, partyIndex, onMessage, onError) + shouldMaintainMessageStream = true + + // Stop existing stream if any + messageStreamJob?.cancel() + + // Launch new stream with retry logic + messageStreamJob = launchMessageStream(sessionId, partyId, partyIndex, onMessage, onError) + } + + /** + * Stop event stream + */ + fun stopEventStream() { + Log.d(TAG, "Stopping event stream") + shouldMaintainEventStream = false + eventStreamConfig = null + eventStreamJob?.cancel() + eventStreamJob = null + } + + /** + * Stop message stream + */ + fun stopMessageStream() { + Log.d(TAG, "Stopping message stream") + shouldMaintainMessageStream = false + messageStreamConfig = null + messageStreamJob?.cancel() + messageStreamJob = null + } + + /** + * Restart all active streams (called after reconnection) + */ + fun restartAllStreams() { + Log.d(TAG, "Restarting all active streams") + + // Restart event stream if it was active + if (shouldMaintainEventStream) { + eventStreamConfig?.let { config -> + Log.d(TAG, "Restarting event stream for partyId=${config.partyId}") + eventStreamJob?.cancel() + eventStreamJob = launchEventStream(config.partyId, config.onEvent, config.onError) + } + } + + // Restart message stream if it was active + if (shouldMaintainMessageStream) { + messageStreamConfig?.let { config -> + Log.d(TAG, "Restarting message stream for sessionId=${config.sessionId}") + messageStreamJob?.cancel() + messageStreamJob = launchMessageStream( + config.sessionId, + config.partyId, + config.partyIndex, + config.onMessage, + config.onError + ) + } + } + } + + /** + * Check if event stream is active + */ + fun isEventStreamActive(): Boolean = shouldMaintainEventStream + + /** + * Check if message stream is active + */ + fun isMessageStreamActive(): Boolean = shouldMaintainMessageStream + + /** + * Get current event stream party ID + */ + fun getEventStreamPartyId(): String? = eventStreamConfig?.partyId + + /** + * Get current message stream session ID + */ + fun getMessageStreamSessionId(): String? = messageStreamConfig?.sessionId + + // ===== Private Implementation ===== + + /** + * Launch event stream with exponential backoff retry + */ + private fun launchEventStream( + partyId: String, + onEvent: suspend (SessionEventData) -> Unit, + onError: ((Throwable) -> Unit)? + ): Job = scope.launch { + Log.d(TAG, "Launching event stream flow with auto-retry for partyId=$partyId") + + flow { + // Re-initiate gRPC subscription (not "restore") + grpcClient.subscribeSessionEvents(partyId).collect { event -> + emit(event) + } + } + .retryWhen { cause, attempt -> + // Stop retrying if stream was explicitly stopped + if (!shouldMaintainEventStream) { + Log.d(TAG, "Event stream stopped, not retrying") + return@retryWhen false + } + + // Calculate exponential backoff delay + val delaySeconds = min(attempt + 1, MAX_RETRY_DELAY_SECONDS) + Log.w(TAG, "Event stream failed (attempt ${attempt + 1}), retrying in ${delaySeconds}s: ${cause.message}") + + // Notify error callback + onError?.invoke(cause) + + // Wait before retry + delay(delaySeconds * 1000) + true // Always retry if stream should be maintained + } + .catch { e -> + // This should not happen due to retryWhen, but handle just in case + Log.e(TAG, "Event stream terminated unexpectedly", e) + onError?.invoke(e) + } + .collect { event -> + try { + onEvent(event) + } catch (e: Exception) { + Log.e(TAG, "Error processing event", e) + // Don't let handler errors kill the stream + } + } + } + + /** + * Launch message stream with exponential backoff retry + */ + private fun launchMessageStream( + sessionId: String, + partyId: String, + partyIndex: Int, + onMessage: suspend (IncomingMessage) -> Unit, + onError: ((Throwable) -> Unit)? + ): Job = scope.launch { + Log.d(TAG, "Launching message stream flow with auto-retry for sessionId=$sessionId") + + flow { + // Re-initiate gRPC subscription (not "restore") + grpcClient.subscribeMessages(sessionId, partyId).collect { message -> + emit(message) + } + } + .retryWhen { cause, attempt -> + // Stop retrying if stream was explicitly stopped + if (!shouldMaintainMessageStream) { + Log.d(TAG, "Message stream stopped, not retrying") + return@retryWhen false + } + + // Calculate exponential backoff delay + val delaySeconds = min(attempt + 1, MAX_RETRY_DELAY_SECONDS) + Log.w(TAG, "Message stream failed (attempt ${attempt + 1}), retrying in ${delaySeconds}s: ${cause.message}") + + // Notify error callback + onError?.invoke(cause) + + // Wait before retry + delay(delaySeconds * 1000) + true // Always retry if stream should be maintained + } + .catch { e -> + // This should not happen due to retryWhen, but handle just in case + Log.e(TAG, "Message stream terminated unexpectedly", e) + onError?.invoke(e) + } + .collect { message -> + try { + onMessage(message) + } catch (e: Exception) { + Log.e(TAG, "Error processing message", e) + // Don't let handler errors kill the stream + } + } + } + + // ===== Configuration Data Classes ===== + + private data class EventStreamConfig( + val partyId: String, + val onEvent: suspend (SessionEventData) -> Unit, + val onError: ((Throwable) -> Unit)? + ) + + private data class MessageStreamConfig( + val sessionId: String, + val partyId: String, + val partyIndex: Int, + val onMessage: suspend (IncomingMessage) -> Unit, + val onError: ((Throwable) -> Unit)? + ) +} diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt index 54fdad75..0870bf2d 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt @@ -14,6 +14,7 @@ import com.durian.tssparty.data.remote.GrpcConnectionState import com.durian.tssparty.data.remote.IncomingMessage import com.durian.tssparty.data.remote.JoinSessionData import com.durian.tssparty.data.remote.SessionEventData +import com.durian.tssparty.data.remote.StreamManager import com.durian.tssparty.domain.model.* import com.durian.tssparty.util.AddressUtils import com.durian.tssparty.util.TransactionUtils @@ -213,6 +214,33 @@ class TssRepository @Inject constructor( coroutineExceptionHandler ) + /** + * StreamManager - 管理 gRPC 双向流的生命周期 + * + * 【架构重构 - 可靠的流管理机制】 + * + * 核心原则(来自 gRPC 官方文档): + * - gRPC 流无法"恢复",必须重新发起 RPC 调用 + * - 流断开后需要重新调用 subscribeSessionEvents() / subscribeMessages() + * - 保存流配置,自动重试失败的流(指数退避) + * + * 修复的关键问题: + * 1. 旧设计尝试"恢复"已关闭的 Flow → 失败(Flow 不是持久化对象) + * 2. 网络断开后 eventStreamSubscribed flag 被清除 → callback 不触发 + * 3. 流失败后没有自动重试机制 → 永久失败 + * + * StreamManager 功能: + * - 保存每个流的配置(partyId, sessionId 等) + * - 流失败后自动重新发起 RPC(不是"恢复") + * - 使用 Flow.retryWhen 实现指数退避重试 + * - 网络重连时,重启所有活跃的流 + * + * 参考资料: + * - https://github.com/grpc/grpc-java/issues/8177 + * - https://grpc.io/docs/guides/keepalive/ + */ + private val streamManager = StreamManager(grpcClient, repositoryScope) + companion object { // Job 名称常量 private const val JOB_MESSAGE_COLLECTION = "message_collection" @@ -289,32 +317,16 @@ class TssRepository @Inject constructor( private var accountServiceUrl: String = "https://rwaapi.szaiai.com" init { - // Set up reconnection callback to restore streams - grpcClient.setOnReconnectedCallback { - android.util.Log.d("TssRepository", "gRPC reconnected, restoring streams...") - restoreStreamsAfterReconnect() - } - } - - /** - * Restore message and event streams after gRPC reconnection - */ - private fun restoreStreamsAfterReconnect() { - val sessionId = currentMessageRoutingSessionId - val partyIndex = currentMessageRoutingPartyIndex - val routingPartyId = currentMessageRoutingPartyId - - // Restore message routing if we had an active session - if (sessionId != null && partyIndex != null) { - android.util.Log.d("TssRepository", "Restoring message routing for session: $sessionId, routingPartyId: $routingPartyId") - startMessageRouting(sessionId, partyIndex, routingPartyId) - } - - // Restore session event subscription with the correct partyId - if (grpcClient.wasEventStreamSubscribed()) { - val eventPartyId = currentSessionEventPartyId - android.util.Log.d("TssRepository", "Restoring session event subscription with partyId: $eventPartyId") - startSessionEventSubscription(eventPartyId) + // Monitor gRPC reconnection events and restart streams + // IMPORTANT: Don't use callback pattern - use event Flow for better reliability + repositoryScope.launch { + grpcConnectionEvents + .filter { it is GrpcConnectionEvent.Reconnected } + .collect { + android.util.Log.d("TssRepository", "gRPC reconnected, restarting streams via StreamManager...") + // StreamManager will re-initiate RPC calls (not "restore" closed Flows) + streamManager.restartAllStreams() + } } } @@ -362,6 +374,18 @@ class TssRepository @Inject constructor( grpcClient.connect(host, port) } + /** + * Setup network state monitoring for reliable reconnection + * + * This should be called once during app initialization (e.g., in MainActivity.onCreate) + * to enable immediate reconnection when network becomes available. + * + * @param context Application or Activity context + */ + fun setupNetworkMonitoring(context: android.content.Context) { + grpcClient.setupNetworkMonitoring(context) + } + /** * Disconnect from the server */ @@ -483,9 +507,10 @@ class TssRepository @Inject constructor( currentSessionEventPartyId = effectivePartyId android.util.Log.d("TssRepository", "Starting session event subscription for partyId: $effectivePartyId (device partyId: $devicePartyId)") - // 使用 JobManager 启动(自动取消同名旧 Job) - jobManager.launch(JOB_SESSION_EVENT) { - grpcClient.subscribeSessionEvents(effectivePartyId).collect { event -> + // Use StreamManager for reliable stream management with auto-reconnection + streamManager.startEventStream( + partyId = effectivePartyId, + onEvent = { event -> android.util.Log.d("TssRepository", "=== Session event received ===") android.util.Log.d("TssRepository", " eventType: ${event.eventType}") android.util.Log.d("TssRepository", " sessionId: ${event.sessionId}") @@ -570,8 +595,12 @@ class TssRepository @Inject constructor( android.util.Log.d("TssRepository", " Reason: sessionId mismatch (event: ${event.sessionId}, active: ${activeSession.sessionId})") } } + }, + onError = { error -> + android.util.Log.e("TssRepository", "Event stream error: ${error.message}") + // StreamManager will automatically retry } - } + ) } /** @@ -585,8 +614,8 @@ class TssRepository @Inject constructor( * partyId from keygen (shareEntity.partyId). */ private fun ensureSessionEventSubscriptionActive(signingPartyId: String? = null) { - // Check if the session event job is still active using JobManager - val isActive = jobManager.isActive(JOB_SESSION_EVENT) + // Check if the event stream is still active using StreamManager + val isActive = streamManager.isEventStreamActive() val devicePartyId = requirePartyId() // Ensure partyId is initialized val effectivePartyId = signingPartyId ?: currentSessionEventPartyId ?: devicePartyId android.util.Log.d("TssRepository", "Checking session event subscription: isActive=$isActive, effectivePartyId=$effectivePartyId") @@ -595,16 +624,14 @@ class TssRepository @Inject constructor( android.util.Log.w("TssRepository", "Session event subscription is not active, restarting...") startSessionEventSubscription(signingPartyId) } else { - // Even if the job is "active", the gRPC stream may have silently disconnected - // Force a restart to ensure we have a fresh connection - // Also restart if we need to switch to a different partyId for signing + // Check if we need to switch to a different partyId for signing val needsRestart = signingPartyId != null && signingPartyId != currentSessionEventPartyId if (needsRestart) { android.util.Log.d("TssRepository", "Switching session event subscription to signingPartyId: $signingPartyId") + startSessionEventSubscription(signingPartyId) } else { - android.util.Log.d("TssRepository", "Refreshing session event subscription to ensure fresh connection") + android.util.Log.d("TssRepository", "Event stream is active with correct partyId, no restart needed") } - startSessionEventSubscription(signingPartyId) } } @@ -2038,40 +2065,50 @@ class TssRepository @Inject constructor( // Save for reconnection recovery currentMessageRoutingPartyId = effectivePartyId + android.util.Log.d("TssRepository", "Starting message routing: sessionId=$sessionId, routingPartyId=$effectivePartyId") + + // Part 1: Collect outgoing messages from TSS and route via gRPC + // This doesn't need StreamManager - it's a local Flow collection jobManager.launch(JOB_MESSAGE_COLLECTION) { - android.util.Log.d("TssRepository", "Starting message routing: sessionId=$sessionId, routingPartyId=$effectivePartyId") - - // Collect outgoing messages from TSS and route via gRPC - launch { - tssNativeBridge.outgoingMessages.collect { message -> - val payload = Base64.decode(message.payload, Base64.NO_WRAP) - grpcClient.routeMessage( - sessionId = sessionId, - fromParty = effectivePartyId, // Use the correct partyId for routing - toParties = message.toParties ?: emptyList(), - roundNumber = 0, - messageType = if (message.isBroadcast) "broadcast" else "p2p", - payload = payload - ) - } + tssNativeBridge.outgoingMessages.collect { message -> + val payload = Base64.decode(message.payload, Base64.NO_WRAP) + grpcClient.routeMessage( + sessionId = sessionId, + fromParty = effectivePartyId, // Use the correct partyId for routing + toParties = message.toParties ?: emptyList(), + roundNumber = 0, + messageType = if (message.isBroadcast) "broadcast" else "p2p", + payload = payload + ) } + } - // Collect incoming messages from gRPC and send to TSS - launch { - grpcClient.subscribeMessages(sessionId, effectivePartyId).collect { message -> - // Find party index from party ID - val session = _currentSession.value - val fromPartyIndex = session?.participants?.find { it.partyId == message.fromParty }?.partyIndex - ?: return@collect + // Part 2: Subscribe to incoming messages from gRPC and send to TSS + // Use StreamManager for reliable gRPC stream management with auto-reconnection + streamManager.startMessageStream( + sessionId = sessionId, + partyId = effectivePartyId, + partyIndex = partyIndex, + onMessage = { message -> + // Find party index from party ID + val session = _currentSession.value + val fromPartyIndex = session?.participants?.find { it.partyId == message.fromParty }?.partyIndex + if (fromPartyIndex != null) { tssNativeBridge.sendIncomingMessage( fromPartyIndex = fromPartyIndex, isBroadcast = message.isBroadcast, payload = message.payload ) + } else { + android.util.Log.w("TssRepository", "Unknown fromParty: ${message.fromParty}, skipping message") } + }, + onError = { error -> + android.util.Log.e("TssRepository", "Message stream error: ${error.message}") + // StreamManager will automatically retry } - } + ) } /** diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt index ff96c47f..49f6a577 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt @@ -231,6 +231,18 @@ class MainViewModel @Inject constructor( } } + /** + * Setup network state monitoring for reliable reconnection + * + * This enables immediate reconnection when network becomes available + * instead of waiting for DNS resolution backoff (up to 60 seconds). + * + * Should be called once during app initialization. + */ + fun setupNetworkMonitoring(context: android.content.Context) { + repository.setupNetworkMonitoring(context) + } + /** * Connect to Message Router server */