rwadurian/backend/mpc-system/services/service-party-android/WORKING_CODE_ANALYSIS.md

7.7 KiB
Raw Blame History

工作代码分析和正确修复方案

问题根源

我犯的错误: 在添加异常处理时,把整个流管理逻辑都改了,引入了 StreamManager 抽象层,导致流程变复杂并出现新问题。

用户的要求:

  1. 添加异常处理(如 markPartyReady 重试)
  2. 保留 gRPC 官方推荐Keep-Alive, network monitoring
  3. 不要改变原有的工作流程

工作代码的逻辑commit 41e7eed2

1. 连接初始化序列

// MainActivity.kt → MainViewModel.kt → TssRepository.kt

1. GrpcClient.connectToServer(host, port)
   
2. 创建 ManagedChannel
   
3. TssRepository.registerParty()
   
4. grpcClient.registerParty(partyId, "temporary", "1.0.0")  // 没有错误检查
   
5. startSessionEventSubscription()  // 立即订阅事件流

2. 事件流订阅逻辑

// TssRepository.kt - 工作的代码

private fun startSessionEventSubscription(subscriptionPartyId: String? = null) {
    val effectivePartyId = subscriptionPartyId ?: requirePartyId()
    currentSessionEventPartyId = effectivePartyId

    // 关键:使用 JobManager 直接启动
    jobManager.launch(JOB_SESSION_EVENT) {
        grpcClient.subscribeSessionEvents(effectivePartyId).collect { event ->
            // 直接在这里处理事件
            when (event.eventType) {
                "session_started" -> { /* ... */ }
                "participant_joined" -> { /* ... */ }
                // ...
            }
        }
    }
}

为什么这个简单的方式可以工作?

  • jobManager.launch() 自动取消同名旧 Job
  • grpcClient.subscribeSessionEvents() 返回一个 Flow
  • Flow 在网络断开时会自动关闭
  • 但没有自动重连机制(这是需要修复的)

3. 消息路由逻辑

// TssRepository.kt - 消息路由

private fun startMessageRouting(sessionId: String, partyId: String, partyIndex: Int) {
    // 1. 启动消息收集 Job
    jobManager.launch(JOB_MESSAGE_COLLECTION) {
        subscribeToTssMessages(sessionId, partyId).collect { message ->
            tssNativeBridge.routeIncomingMessage(sessionId, message)
        }
    }

    // 2. 同时启动消息发送 Job在同一个 JobManager 中)
    jobManager.launch(JOB_MESSAGE_SENDING) {
        tssNativeBridge.outgoingMessages.collect { message ->
            grpcClient.sendMessage(sessionId, message)
        }
    }
}

我的错误修改

错误 1: 引入了 StreamManager 抽象层

// 新代码(错误)
streamManager.startEventStream(
    partyId = effectivePartyId,
    onEvent = { event -> /* callback */ }
)

问题:

  • 增加了一层不必要的抽象
  • StreamManager 的实现可能有 bug
  • 日志显示 StreamManager 根本没有启动

错误 2: 修改了连接重建后的流恢复逻辑

// 旧代码(工作的)
grpcConnectionEvents
    .filter { it is GrpcConnectionEvent.Reconnected }
    .collect {
        onReconnectedCallback?.invoke()  // 简单的 callback
    }

// 新代码(复杂但出错)
grpcConnectionEvents
    .filter { it is GrpcConnectionEvent.Reconnected }
    .collect {
        streamManager.restartAllStreams()  // StreamManager 可能有问题
    }

正确的修复方案

保留的部分(这些是好的)

  1. gRPC Keep-Alive 配置GrpcClient.kt line 143-150:
val builder = ManagedChannelBuilder
    .forAddress(host, port)
    .keepAliveTime(20, TimeUnit.SECONDS)
    .keepAliveTimeout(5, TimeUnit.SECONDS)
    .keepAliveWithoutCalls(true)
    .idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS)
  1. Android 网络监听GrpcClient.kt line 151-183:
fun setupNetworkMonitoring(context: Context) {
    val callback = object : ConnectivityManager.NetworkCallback() {
        override fun onAvailable(network: Network) {
            channel?.resetConnectBackoff()
        }
    }
}
  1. registerParty 错误检查TssRepository.kt line 489-494:
val registerResult = grpcClient.registerParty(partyId, "temporary", "1.0.0")
if (registerResult.isFailure) {
    throw registerResult.exceptionOrNull() ?: Exception("Failed to register party")
}
  1. markPartyReady 重试机制TssRepository.kt line ~2140:
repeat(5) { attempt ->
    if (markReadySuccess) return@repeat
    val markReadyResult = grpcClient.markPartyReady(sessionId, partyId)
    if (markReadyResult.isSuccess) {
        markReadySuccess = true
        return@repeat
    }
    delay((attempt + 1) * 500L)
}

需要回退的部分(这些破坏了原有逻辑)

  1. 删除 StreamManager:

    • 删除 StreamManager.kt 文件
    • 删除 TssRepository.kt 中的 streamManager 实例
  2. 恢复原有的事件订阅逻辑:

// 恢复为这样(简单直接)
private fun startSessionEventSubscription(subscriptionPartyId: String? = null) {
    val effectivePartyId = subscriptionPartyId ?: requirePartyId()
    currentSessionEventPartyId = effectivePartyId

    jobManager.launch(JOB_SESSION_EVENT) {
        // 添加 retryWhen 自动重连(新增的改进)
        flow {
            grpcClient.subscribeSessionEvents(effectivePartyId).collect { emit(it) }
        }
        .retryWhen { cause, attempt ->
            Log.w(TAG, "Event stream failed (attempt ${attempt + 1}), retrying: ${cause.message}")
            delay(min(attempt + 1, 30) * 1000L)
            true  // 永远重试
        }
        .collect { event ->
            // 直接处理事件(保持原有逻辑不变)
            Log.d(TAG, "=== Session event received ===")
            when (event.eventType) {
                "session_started" -> { /* ... */ }
                // ...
            }
        }
    }
}
  1. 删除 GrpcClient 中复杂的 reconnection callback:
    • 保持简单的连接状态 Flow
    • 不需要复杂的 reSubscribeStreams() 逻辑

正确的架构

简单而可靠的架构:

GrpcClient (基础层)
  ├─ Keep-Alive 配置 ✅
  ├─ Network Monitoring ✅
  ├─ subscribeSessionEvents() → Flow ✅
  └─ subscribeMessages() → Flow ✅

TssRepository (业务层)
  ├─ JobManager 管理所有协程 ✅
  ├─ jobManager.launch(JOB_SESSION_EVENT) {
  │    flow { grpcClient.subscribeSessionEvents().collect { emit(it) } }
  │      .retryWhen { ... }  ← 新增自动重连
  │      .collect { event -> /* 处理 */ }
  │  }
  └─ 同样的模式用于消息流 ✅

实施步骤

步骤 1: 回退 TssRepository.kt 的事件订阅逻辑

// 删除 StreamManager 相关代码line 217-242
- private val streamManager = StreamManager(grpcClient, repositoryScope)
- init { repositoryScope.launch { grpcConnectionEvents... streamManager.restartAllStreams() } }

// 恢复 startSessionEventSubscription 为原来的简单版本line 511-612
// 但在 collect 外包一层 flow { }.retryWhen { }

步骤 2: 删除 StreamManager.kt 文件

rm StreamManager.kt

步骤 3: 简化 GrpcClient.kt 的重连逻辑

// 删除复杂的 reSubscribeStreams() 方法
// 保留简单的 GrpcConnectionEvent 发送

步骤 4: 测试验证

  1. 编译成功
  2. 启动时 RegisterParty 成功
  3. 事件订阅成功(看到 "Starting session event subscription" 日志)
  4. 创建 2-of-3 会话成功
  5. 飞行模式测试自动重连

总结

核心教训:

  • 不要过度设计StreamManager 是不必要的抽象)
  • 在原有工作的代码基础上做最小改动
  • 保留 gRPC 官方推荐的配置Keep-Alive, network monitoring
  • 只在必要的地方添加错误处理和重试逻辑

修复原则:

旧代码 + 官方推荐 + 最小改动 = 可靠的解决方案

不是: 旧代码 → 完全重构 → StreamManager → 新问题