5.9 KiB
5.9 KiB
重构总结 - 回归简单可靠的架构
修复的问题
用户反馈: "让处理异常,你个狗日的把逻辑,流程都改错了。"
根本原因: 在添加异常处理和 gRPC 可靠性改进时,引入了 StreamManager 抽象层,导致:
- RegisterParty 失败但代码继续执行
- StreamManager 日志完全缺失
- 流程变复杂,引入新问题
本次重构的原则
保留的好东西(来自 gRPC 官方推荐)✅:
- gRPC Keep-Alive 配置(20s PING, 5s timeout, 永不 idle)
- Android 网络状态监听(resetConnectBackoff)
- registerParty 错误检查和重试
- markPartyReady 重试机制
删除的坏东西(过度设计)❌:
- StreamManager.kt 整个文件
- 复杂的 init 块监听 reconnection 事件
- 回调式的流管理
恢复的简单逻辑(工作的代码)✅:
- 直接用 jobManager.launch + grpcClient.subscribeSessionEvents().collect
- 在 collect 外包一层 flow { }.retryWhen { } 实现自动重连
- 保持原有的事件处理逻辑不变
代码变更详情
1. TssRepository.kt
删除 StreamManager 相关代码:
// 删除了
- import com.durian.tssparty.data.remote.StreamManager
- private val streamManager = StreamManager(grpcClient, repositoryScope)
- init { ... streamManager.restartAllStreams() }
恢复简单的事件订阅:
// 之前(复杂)
streamManager.startEventStream(
partyId = effectivePartyId,
onEvent = { event -> /* callback */ }
)
// 现在(简单)
jobManager.launch(JOB_SESSION_EVENT) {
flow {
grpcClient.subscribeSessionEvents(effectivePartyId).collect { emit(it) }
}
.retryWhen { cause, attempt ->
Log.w(TAG, "Event stream failed (attempt ${attempt + 1}), retrying...")
delay(min(attempt + 1, 30) * 1000L)
true // 永远重试
}
.collect { event ->
// 直接处理事件(保持原有逻辑)
}
}
恢复简单的消息路由:
// Part 1: 发送消息(重命名为 JOB_MESSAGE_SENDING)
jobManager.launch(JOB_MESSAGE_SENDING) {
tssNativeBridge.outgoingMessages.collect { message ->
grpcClient.routeMessage(...)
}
}
// Part 2: 接收消息(使用 JOB_MESSAGE_COLLECTION + retryWhen)
jobManager.launch(JOB_MESSAGE_COLLECTION) {
flow {
grpcClient.subscribeMessages(sessionId, effectivePartyId).collect { emit(it) }
}
.retryWhen { cause, attempt ->
Log.w(TAG, "Message stream failed (attempt ${attempt + 1}), retrying...")
delay(min(attempt + 1, 30) * 1000L)
true // 永远重试
}
.collect { message ->
// 处理消息
}
}
修复 ensureSessionEventSubscriptionActive:
// 之前
val isActive = streamManager.isEventStreamActive()
// 现在
val isActive = jobManager.isActive(JOB_SESSION_EVENT)
2. 删除的文件
app/src/main/java/com/durian/tssparty/data/remote/StreamManager.kt
3. 保留的 gRPC 改进
GrpcClient.kt - Keep-Alive 配置(保留)✅:
val builder = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.keepAliveTime(20, TimeUnit.SECONDS) // 每 20 秒 PING
.keepAliveTimeout(5, TimeUnit.SECONDS) // 5 秒超时
.keepAliveWithoutCalls(true) // 没有活跃 RPC 也 PING
.idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS) // 永不超时
GrpcClient.kt - 网络监听(保留)✅:
fun setupNetworkMonitoring(context: Context) {
val callback = object : ConnectivityManager.NetworkCallback() {
override fun onAvailable(network: Network) {
channel?.resetConnectBackoff() // 立即重连
}
}
}
架构对比
旧设计(复杂但出错)❌:
TssRepository
├─ StreamManager (新增的抽象层)
│ ├─ startEventStream()
│ ├─ startMessageStream()
│ └─ restartAllStreams()
│
├─ init { listen reconnection → streamManager.restartAllStreams() }
└─ grpcClient
新设计(简单可靠)✅:
TssRepository
├─ JobManager (原有的任务管理)
│ ├─ JOB_SESSION_EVENT → flow { subscribeSessionEvents() }.retryWhen { }
│ ├─ JOB_MESSAGE_COLLECTION → flow { subscribeMessages() }.retryWhen { }
│ └─ JOB_MESSAGE_SENDING → outgoingMessages.collect { }
│
└─ grpcClient (带 Keep-Alive + Network Monitoring)
为什么新设计更好
- 更少的抽象层: 直接用 jobManager.launch,不需要 StreamManager
- 自动重连: Flow.retryWhen 在流失败时自动重新发起 RPC
- 保持原有逻辑: 事件处理代码保持不变,只在外面包一层 retryWhen
- 更好的日志: 直接在 collect { } 里打日志,不会丢失
- 符合 Kotlin 风格: Flow transformation 比 callback 更符合 Kotlin 惯用法
测试重点
- ✅ 编译成功(已验证)
- ⏳ RegisterParty 成功(需要测试)
- ⏳ 事件订阅成功(看到 "Starting session event subscription" 日志)
- ⏳ 创建 2-of-3 会话成功
- ⏳ 飞行模式测试自动重连
编译结果
BUILD SUCCESSFUL in 1m 26s
46 actionable tasks: 17 executed, 29 up-to-date
只有一些参数未使用的警告,没有错误。
核心教训
简单就是可靠:
工作的代码 + 官方推荐配置 + 最小改动 = 可靠的系统
不是:
工作的代码 → 完全重构 → 引入新抽象 → 新问题
gRPC 流的正确管理方式:
- 流断开时,用 Flow.retryWhen 自动重新发起 RPC(不是"恢复")
- 不需要复杂的 StreamManager,Kotlin Flow 本身就是流管理器
- Keep-Alive 防止连接假死
- Network Monitoring 加速重连
下一步
准备测试!使用 build-install-debug.bat 安装到设备,验证:
- RegisterParty 是否成功
- 事件流是否正常工作
- 2-of-3 创建是否成功
- 网络断开重连是否自动恢复