From 69de49a0002a7ab361ffdf3a1dbe3df10c5cdd73 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 27 Jan 2026 04:07:26 -0800 Subject: [PATCH] =?UTF-8?q?fix(android):=20=E4=BF=AE=E5=A4=8D=E5=88=9D?= =?UTF-8?q?=E6=AC=A1=E8=BF=9E=E6=8E=A5=E8=A2=AB=E8=AF=AF=E5=BD=93=E6=88=90?= =?UTF-8?q?=E9=87=8D=E8=BF=9E=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题: - waitForConnection() 在 channel 变成 READY 时无条件执行 reRegisterIfNeeded() 和 reSubscribeStreams() - 导致初次连接时重复注册 party 和重复订阅事件流 修复: - 使用 isReconnecting 标志区分初次连接和重连 - connect() 中确保 isReconnecting = false - triggerReconnect() 设置 isReconnecting = true - waitForConnection() 中先读取 isReconnecting 再重置,只有重连时才恢复流 添加详细日志用于调试: - GrpcClient: connect(), doConnect(), waitForConnection(), triggerReconnect() - TssRepository: registerParty(), restoreStreamsAfterReconnect(), onReconnectedCallback Co-Authored-By: Claude Opus 4.5 --- .../durian/tssparty/data/remote/GrpcClient.kt | 47 +++++++++++++++---- .../tssparty/data/repository/TssRepository.kt | 29 ++++++++++-- 2 files changed, 61 insertions(+), 15 deletions(-) diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt index bb2427c5..096c1bc0 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt @@ -123,17 +123,26 @@ class GrpcClient @Inject constructor() { * Connect to the Message Router server */ fun connect(host: String, port: Int) { + Log.d(TAG, "=== connect() called ===") + Log.d(TAG, " host: $host, port: $port") + Log.d(TAG, " isReconnecting before reset: ${isReconnecting.get()}") + // Save connection params for reconnection currentHost = host currentPort = port shouldReconnect.set(true) reconnectAttempts.set(0) + // 重要:初次连接时确保 isReconnecting 为 false + // 这样 waitForConnection() 能正确区分初次连接和重连 + isReconnecting.set(false) + Log.d(TAG, " isReconnecting after reset: ${isReconnecting.get()} (should be false for first connect)") + doConnect(host, port) } private fun doConnect(host: String, port: Int) { - Log.d(TAG, "Connecting to $host:$port") + Log.d(TAG, "doConnect: $host:$port, isReconnecting=${isReconnecting.get()}") _connectionState.value = GrpcConnectionState.Connecting try { @@ -183,24 +192,39 @@ class GrpcClient @Inject constructor() { when (state) { ConnectivityState.READY -> { - Log.d(TAG, "Connected successfully") + // 关键修复:先读取 isReconnecting 再重置,用于区分初次连接和重连 + // - 初次连接:isReconnecting = false(由 connect() 触发) + // - 重连:isReconnecting = true(由 triggerReconnect() 触发,包括后台唤醒) + val wasReconnecting = isReconnecting.getAndSet(false) + + Log.d(TAG, "=== Channel READY ===") + Log.d(TAG, " wasReconnecting: $wasReconnecting") + Log.d(TAG, " registeredPartyId: $registeredPartyId") + Log.d(TAG, " eventStreamSubscribed: ${eventStreamSubscribed.get()}") + Log.d(TAG, " eventStreamPartyId: $eventStreamPartyId") + _connectionState.value = GrpcConnectionState.Connected reconnectAttempts.set(0) heartbeatFailCount.set(0) - isReconnecting.set(false) // Start channel state monitoring startChannelStateMonitor() - // Re-register if we were registered before - reRegisterIfNeeded() + // 只有重连时才需要恢复注册和订阅 + // 初次连接时,registerParty() 和 subscribeSessionEvents() 会在外部显式调用 + if (wasReconnecting) { + Log.d(TAG, ">>> RECONNECT: Restoring registration and streams") + // Re-register if we were registered before + reRegisterIfNeeded() + // Re-subscribe to streams + reSubscribeStreams() + } else { + Log.d(TAG, ">>> FIRST CONNECT: Skipping restore (will be done by caller)") + } - // Restart heartbeat + // Restart heartbeat (both first connect and reconnect need this) startHeartbeat() - // Re-subscribe to streams - reSubscribeStreams() - return@withTimeout } ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.SHUTDOWN -> { @@ -347,7 +371,10 @@ class GrpcClient @Inject constructor() { Log.d(TAG, "Reconnecting in ${delay}ms (attempt $attempt/${reconnectConfig.maxRetries})") delay(delay) - isReconnecting.set(false) + // 注意:不要在这里重置 isReconnecting! + // isReconnecting 会在 waitForConnection() 的 READY 分支中被重置 + // 这样 waitForConnection() 才能知道这是重连而非初次连接 + Log.d(TAG, ">>> Starting reconnect, isReconnecting=$isReconnecting (should be true)") doConnect(host, port) } } diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt index 35c26a47..8033d9cf 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt @@ -120,7 +120,7 @@ class TssRepository @Inject constructor( init { // Set up reconnection callback to restore streams grpcClient.setOnReconnectedCallback { - android.util.Log.d("TssRepository", "gRPC reconnected, restoring streams...") + android.util.Log.d("TssRepository", ">>> onReconnectedCallback triggered! <<<") restoreStreamsAfterReconnect() } } @@ -129,22 +129,35 @@ class TssRepository @Inject constructor( * Restore message and event streams after gRPC reconnection */ private fun restoreStreamsAfterReconnect() { + android.util.Log.d("TssRepository", "=== restoreStreamsAfterReconnect() called ===") + val sessionId = currentMessageRoutingSessionId val partyIndex = currentMessageRoutingPartyIndex val routingPartyId = currentMessageRoutingPartyId + android.util.Log.d("TssRepository", " currentMessageRoutingSessionId: $sessionId") + android.util.Log.d("TssRepository", " currentMessageRoutingPartyIndex: $partyIndex") + android.util.Log.d("TssRepository", " currentSessionEventPartyId: $currentSessionEventPartyId") + android.util.Log.d("TssRepository", " grpcClient.wasEventStreamSubscribed(): ${grpcClient.wasEventStreamSubscribed()}") + // Restore message routing if we had an active session if (sessionId != null && partyIndex != null) { - android.util.Log.d("TssRepository", "Restoring message routing for session: $sessionId, routingPartyId: $routingPartyId") + android.util.Log.d("TssRepository", " >>> Restoring message routing for session: $sessionId") startMessageRouting(sessionId, partyIndex, routingPartyId) + } else { + android.util.Log.d("TssRepository", " No active message routing to restore") } // Restore session event subscription with the correct partyId if (grpcClient.wasEventStreamSubscribed()) { val eventPartyId = currentSessionEventPartyId - android.util.Log.d("TssRepository", "Restoring session event subscription with partyId: $eventPartyId") + android.util.Log.d("TssRepository", " >>> Restoring session event subscription with partyId: $eventPartyId") startSessionEventSubscription(eventPartyId) + } else { + android.util.Log.d("TssRepository", " No event stream subscription to restore") } + + android.util.Log.d("TssRepository", "=== restoreStreamsAfterReconnect() completed ===") } // HTTP client for API calls @@ -231,22 +244,28 @@ class TssRepository @Inject constructor( * getOrCreatePartyId pattern). The partyId is loaded ONCE here and cached in memory. */ suspend fun registerParty(): String { + android.util.Log.d("TssRepository", "=== registerParty() called ===") + // Load or create persistent partyId (matching Electron's getOrCreatePartyId) val existingPartyId = appSettingDao.getValue("party_id") partyId = if (existingPartyId != null) { - android.util.Log.d("TssRepository", "Loaded existing partyId: $existingPartyId") + android.util.Log.d("TssRepository", " Loaded existing partyId: $existingPartyId") existingPartyId } else { val newPartyId = UUID.randomUUID().toString() appSettingDao.setValue(AppSettingEntity("party_id", newPartyId)) - android.util.Log.d("TssRepository", "Generated new partyId: $newPartyId") + android.util.Log.d("TssRepository", " Generated new partyId: $newPartyId") newPartyId } + android.util.Log.d("TssRepository", " Calling grpcClient.registerParty()...") grpcClient.registerParty(partyId, "temporary", "1.0.0") + android.util.Log.d("TssRepository", " grpcClient.registerParty() completed") // Subscribe to session events immediately after registration (like Electron does) + android.util.Log.d("TssRepository", " Calling startSessionEventSubscription()...") startSessionEventSubscription() + android.util.Log.d("TssRepository", "=== registerParty() completed ===") return partyId }