fix(android): 修复初次连接被误当成重连的bug

问题:
- waitForConnection() 在 channel 变成 READY 时无条件执行 reRegisterIfNeeded() 和 reSubscribeStreams()
- 导致初次连接时重复注册 party 和重复订阅事件流

修复:
- 使用 isReconnecting 标志区分初次连接和重连
- connect() 中确保 isReconnecting = false
- triggerReconnect() 设置 isReconnecting = true
- waitForConnection() 中先读取 isReconnecting 再重置,只有重连时才恢复流

添加详细日志用于调试:
- GrpcClient: connect(), doConnect(), waitForConnection(), triggerReconnect()
- TssRepository: registerParty(), restoreStreamsAfterReconnect(), onReconnectedCallback

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-27 04:07:26 -08:00
parent 4e4d731b44
commit 69de49a000
2 changed files with 61 additions and 15 deletions

View File

@ -123,17 +123,26 @@ class GrpcClient @Inject constructor() {
* Connect to the Message Router server * Connect to the Message Router server
*/ */
fun connect(host: String, port: Int) { fun connect(host: String, port: Int) {
Log.d(TAG, "=== connect() called ===")
Log.d(TAG, " host: $host, port: $port")
Log.d(TAG, " isReconnecting before reset: ${isReconnecting.get()}")
// Save connection params for reconnection // Save connection params for reconnection
currentHost = host currentHost = host
currentPort = port currentPort = port
shouldReconnect.set(true) shouldReconnect.set(true)
reconnectAttempts.set(0) reconnectAttempts.set(0)
// 重要:初次连接时确保 isReconnecting 为 false
// 这样 waitForConnection() 能正确区分初次连接和重连
isReconnecting.set(false)
Log.d(TAG, " isReconnecting after reset: ${isReconnecting.get()} (should be false for first connect)")
doConnect(host, port) doConnect(host, port)
} }
private fun doConnect(host: String, port: Int) { private fun doConnect(host: String, port: Int) {
Log.d(TAG, "Connecting to $host:$port") Log.d(TAG, "doConnect: $host:$port, isReconnecting=${isReconnecting.get()}")
_connectionState.value = GrpcConnectionState.Connecting _connectionState.value = GrpcConnectionState.Connecting
try { try {
@ -183,24 +192,39 @@ class GrpcClient @Inject constructor() {
when (state) { when (state) {
ConnectivityState.READY -> { ConnectivityState.READY -> {
Log.d(TAG, "Connected successfully") // 关键修复:先读取 isReconnecting 再重置,用于区分初次连接和重连
// - 初次连接isReconnecting = false由 connect() 触发)
// - 重连isReconnecting = true由 triggerReconnect() 触发,包括后台唤醒)
val wasReconnecting = isReconnecting.getAndSet(false)
Log.d(TAG, "=== Channel READY ===")
Log.d(TAG, " wasReconnecting: $wasReconnecting")
Log.d(TAG, " registeredPartyId: $registeredPartyId")
Log.d(TAG, " eventStreamSubscribed: ${eventStreamSubscribed.get()}")
Log.d(TAG, " eventStreamPartyId: $eventStreamPartyId")
_connectionState.value = GrpcConnectionState.Connected _connectionState.value = GrpcConnectionState.Connected
reconnectAttempts.set(0) reconnectAttempts.set(0)
heartbeatFailCount.set(0) heartbeatFailCount.set(0)
isReconnecting.set(false)
// Start channel state monitoring // Start channel state monitoring
startChannelStateMonitor() startChannelStateMonitor()
// Re-register if we were registered before // 只有重连时才需要恢复注册和订阅
reRegisterIfNeeded() // 初次连接时registerParty() 和 subscribeSessionEvents() 会在外部显式调用
if (wasReconnecting) {
Log.d(TAG, ">>> RECONNECT: Restoring registration and streams")
// Re-register if we were registered before
reRegisterIfNeeded()
// Re-subscribe to streams
reSubscribeStreams()
} else {
Log.d(TAG, ">>> FIRST CONNECT: Skipping restore (will be done by caller)")
}
// Restart heartbeat // Restart heartbeat (both first connect and reconnect need this)
startHeartbeat() startHeartbeat()
// Re-subscribe to streams
reSubscribeStreams()
return@withTimeout return@withTimeout
} }
ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.SHUTDOWN -> { ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.SHUTDOWN -> {
@ -347,7 +371,10 @@ class GrpcClient @Inject constructor() {
Log.d(TAG, "Reconnecting in ${delay}ms (attempt $attempt/${reconnectConfig.maxRetries})") Log.d(TAG, "Reconnecting in ${delay}ms (attempt $attempt/${reconnectConfig.maxRetries})")
delay(delay) delay(delay)
isReconnecting.set(false) // 注意:不要在这里重置 isReconnecting
// isReconnecting 会在 waitForConnection() 的 READY 分支中被重置
// 这样 waitForConnection() 才能知道这是重连而非初次连接
Log.d(TAG, ">>> Starting reconnect, isReconnecting=$isReconnecting (should be true)")
doConnect(host, port) doConnect(host, port)
} }
} }

View File

@ -120,7 +120,7 @@ class TssRepository @Inject constructor(
init { init {
// Set up reconnection callback to restore streams // Set up reconnection callback to restore streams
grpcClient.setOnReconnectedCallback { grpcClient.setOnReconnectedCallback {
android.util.Log.d("TssRepository", "gRPC reconnected, restoring streams...") android.util.Log.d("TssRepository", ">>> onReconnectedCallback triggered! <<<")
restoreStreamsAfterReconnect() restoreStreamsAfterReconnect()
} }
} }
@ -129,22 +129,35 @@ class TssRepository @Inject constructor(
* Restore message and event streams after gRPC reconnection * Restore message and event streams after gRPC reconnection
*/ */
private fun restoreStreamsAfterReconnect() { private fun restoreStreamsAfterReconnect() {
android.util.Log.d("TssRepository", "=== restoreStreamsAfterReconnect() called ===")
val sessionId = currentMessageRoutingSessionId val sessionId = currentMessageRoutingSessionId
val partyIndex = currentMessageRoutingPartyIndex val partyIndex = currentMessageRoutingPartyIndex
val routingPartyId = currentMessageRoutingPartyId val routingPartyId = currentMessageRoutingPartyId
android.util.Log.d("TssRepository", " currentMessageRoutingSessionId: $sessionId")
android.util.Log.d("TssRepository", " currentMessageRoutingPartyIndex: $partyIndex")
android.util.Log.d("TssRepository", " currentSessionEventPartyId: $currentSessionEventPartyId")
android.util.Log.d("TssRepository", " grpcClient.wasEventStreamSubscribed(): ${grpcClient.wasEventStreamSubscribed()}")
// Restore message routing if we had an active session // Restore message routing if we had an active session
if (sessionId != null && partyIndex != null) { if (sessionId != null && partyIndex != null) {
android.util.Log.d("TssRepository", "Restoring message routing for session: $sessionId, routingPartyId: $routingPartyId") android.util.Log.d("TssRepository", " >>> Restoring message routing for session: $sessionId")
startMessageRouting(sessionId, partyIndex, routingPartyId) startMessageRouting(sessionId, partyIndex, routingPartyId)
} else {
android.util.Log.d("TssRepository", " No active message routing to restore")
} }
// Restore session event subscription with the correct partyId // Restore session event subscription with the correct partyId
if (grpcClient.wasEventStreamSubscribed()) { if (grpcClient.wasEventStreamSubscribed()) {
val eventPartyId = currentSessionEventPartyId val eventPartyId = currentSessionEventPartyId
android.util.Log.d("TssRepository", "Restoring session event subscription with partyId: $eventPartyId") android.util.Log.d("TssRepository", " >>> Restoring session event subscription with partyId: $eventPartyId")
startSessionEventSubscription(eventPartyId) startSessionEventSubscription(eventPartyId)
} else {
android.util.Log.d("TssRepository", " No event stream subscription to restore")
} }
android.util.Log.d("TssRepository", "=== restoreStreamsAfterReconnect() completed ===")
} }
// HTTP client for API calls // HTTP client for API calls
@ -231,22 +244,28 @@ class TssRepository @Inject constructor(
* getOrCreatePartyId pattern). The partyId is loaded ONCE here and cached in memory. * getOrCreatePartyId pattern). The partyId is loaded ONCE here and cached in memory.
*/ */
suspend fun registerParty(): String { suspend fun registerParty(): String {
android.util.Log.d("TssRepository", "=== registerParty() called ===")
// Load or create persistent partyId (matching Electron's getOrCreatePartyId) // Load or create persistent partyId (matching Electron's getOrCreatePartyId)
val existingPartyId = appSettingDao.getValue("party_id") val existingPartyId = appSettingDao.getValue("party_id")
partyId = if (existingPartyId != null) { partyId = if (existingPartyId != null) {
android.util.Log.d("TssRepository", "Loaded existing partyId: $existingPartyId") android.util.Log.d("TssRepository", " Loaded existing partyId: $existingPartyId")
existingPartyId existingPartyId
} else { } else {
val newPartyId = UUID.randomUUID().toString() val newPartyId = UUID.randomUUID().toString()
appSettingDao.setValue(AppSettingEntity("party_id", newPartyId)) appSettingDao.setValue(AppSettingEntity("party_id", newPartyId))
android.util.Log.d("TssRepository", "Generated new partyId: $newPartyId") android.util.Log.d("TssRepository", " Generated new partyId: $newPartyId")
newPartyId newPartyId
} }
android.util.Log.d("TssRepository", " Calling grpcClient.registerParty()...")
grpcClient.registerParty(partyId, "temporary", "1.0.0") grpcClient.registerParty(partyId, "temporary", "1.0.0")
android.util.Log.d("TssRepository", " grpcClient.registerParty() completed")
// Subscribe to session events immediately after registration (like Electron does) // Subscribe to session events immediately after registration (like Electron does)
android.util.Log.d("TssRepository", " Calling startSessionEventSubscription()...")
startSessionEventSubscription() startSessionEventSubscription()
android.util.Log.d("TssRepository", "=== registerParty() completed ===")
return partyId return partyId
} }