feat(android): 实现可靠的 gRPC 连接和流管理机制
基于 gRPC 官方最佳实践完整重构流管理系统 核心改进: 1. Keep-Alive 配置优化 (20s PING, 5s 超时, 永不超时空闲连接) 2. 创建 StreamManager 统一管理双向流生命周期 3. 实现自动重连机制 (Flow.retryWhen + 指数退避) 4. 添加 Android 网络状态监听 (立即 resetConnectBackoff) 技术细节: - gRPC 流无法"恢复",必须重新发起 RPC 调用 - StreamManager 保存流配置,失败后自动重新发起 - 监听 GrpcConnectionEvent.Reconnected 触发流重启 - 删除旧的 callback 机制,使用 Flow 事件驱动 修复的关键问题: - 网络断开后 eventStreamSubscribed flag 被清除导致 callback 不触发 - reSubscribeStreams 尝试"恢复"已关闭的 Flow (设计错误) - 缺少 Keep-Alive 导致连接被中间设备清理 - 缺少网络监听导致 60 秒 DNS 解析延迟 参考资料: - https://github.com/grpc/grpc-java/issues/8177 - https://grpc.io/docs/guides/keepalive/ - https://github.com/grpc/grpc-java/issues/4011 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
41e7eed2c1
commit
7b95711406
|
|
@ -809,7 +809,11 @@
|
|||
"Bash(./gradlew:*)",
|
||||
"Bash(adb shell \"run-as com.durian.tssparty sqlite3 /data/data/com.durian.tssparty/databases/tss_party.db ''SELECT id, tx_hash, from_address, to_address, amount, token_type, status, direction, created_at FROM transaction_records ORDER BY id DESC LIMIT 5;''\")",
|
||||
"WebFetch(domain:docs.kava.io)",
|
||||
"WebFetch(domain:kavascan.com)"
|
||||
"WebFetch(domain:kavascan.com)",
|
||||
"Bash(.gradlew.bat compileDebugKotlin:*)",
|
||||
"WebFetch(domain:github.com)",
|
||||
"WebFetch(domain:oneuptime.com)",
|
||||
"Bash(gradlew.bat assembleDebug:*)"
|
||||
],
|
||||
"deny": [],
|
||||
"ask": []
|
||||
|
|
|
|||
|
|
@ -0,0 +1,357 @@
|
|||
# gRPC 稳定连接的正确解决方案
|
||||
|
||||
基于官方 gRPC 文档和最佳实践研究
|
||||
|
||||
## 核心问题
|
||||
|
||||
**当前代码的设计错误**: 尝试通过 callback "恢复" (restore) 已关闭的流
|
||||
|
||||
**gRPC 官方说法**:
|
||||
> "You don't need to re-create the channel - just **re-do the streaming RPC** on the current channel."
|
||||
>
|
||||
> "gRPC stream will be mapped to the underlying http2 stream which is **lost when the connection is lost**."
|
||||
|
||||
**结论**: **双向流无法恢复,必须重新发起 RPC 调用**
|
||||
|
||||
## 为什么当前设计有问题
|
||||
|
||||
```kotlin
|
||||
// 当前错误设计:
|
||||
1. 订阅事件流 → Flow 开始
|
||||
2. 网络断开 → Flow 关闭
|
||||
3. 网络重连 → 尝试"恢复"流 ❌
|
||||
4. 调用 callback → 期望流恢复 ❌
|
||||
|
||||
// 问题:
|
||||
- Flow 已经关闭,无法恢复
|
||||
- 需要重新调用 subscribeSessionEvents()
|
||||
```
|
||||
|
||||
## 正确的设计模式
|
||||
|
||||
### 模式 1: Application-Level Stream Management (推荐)
|
||||
|
||||
```kotlin
|
||||
class TssRepository {
|
||||
private val streamManager = StreamManager()
|
||||
|
||||
init {
|
||||
// 监听连接事件,自动重启流
|
||||
grpcClient.connectionEvents
|
||||
.filter { it is GrpcConnectionEvent.Reconnected }
|
||||
.onEach {
|
||||
android.util.Log.d(TAG, "Reconnected, restarting streams...")
|
||||
streamManager.restartAllStreams()
|
||||
}
|
||||
.launchIn(scope)
|
||||
}
|
||||
|
||||
class StreamManager {
|
||||
private var eventStreamConfig: EventStreamConfig? = null
|
||||
private var messageStreamConfig: MessageStreamConfig? = null
|
||||
|
||||
fun startEventStream(partyId: String) {
|
||||
// 保存配置
|
||||
eventStreamConfig = EventStreamConfig(partyId)
|
||||
// 启动流
|
||||
doStartEventStream(partyId)
|
||||
}
|
||||
|
||||
fun restartAllStreams() {
|
||||
// 重新发起 RPC 调用(不是"恢复")
|
||||
eventStreamConfig?.let { doStartEventStream(it.partyId) }
|
||||
messageStreamConfig?.let { doStartMessageStream(it.sessionId, it.partyId) }
|
||||
}
|
||||
|
||||
private fun doStartEventStream(partyId: String) {
|
||||
grpcClient.subscribeSessionEvents(partyId)
|
||||
.catch { e ->
|
||||
Log.e(TAG, "Event stream failed: ${e.message}")
|
||||
// 如果失败,延迟后重试
|
||||
delay(5000)
|
||||
doStartEventStream(partyId)
|
||||
}
|
||||
.collect { event ->
|
||||
// 处理事件
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 模式 2: 使用 Kotlin Flow retry + retryWhen
|
||||
|
||||
```kotlin
|
||||
fun subscribeSessionEventsWithAutoRestart(partyId: String): Flow<SessionEventData> {
|
||||
return flow {
|
||||
// 重新发起 RPC 调用
|
||||
grpcClient.subscribeSessionEvents(partyId).collect {
|
||||
emit(it)
|
||||
}
|
||||
}.retryWhen { cause, attempt ->
|
||||
android.util.Log.w(TAG, "Event stream failed (attempt $attempt): ${cause.message}")
|
||||
delay(min(1000L * (attempt + 1), 30000L)) // 指数退避,最多 30 秒
|
||||
true // 始终重试
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Keep-Alive 配置(防止连接假死)
|
||||
|
||||
基于 [gRPC Keepalive 官方文档](https://grpc.io/docs/guides/keepalive/)
|
||||
|
||||
### Android 客户端配置
|
||||
|
||||
```kotlin
|
||||
val channel = AndroidChannelBuilder
|
||||
.forAddress(host, port)
|
||||
.usePlaintext() // 或使用 useTransportSecurity()
|
||||
|
||||
// Keep-Alive 配置
|
||||
.keepAliveTime(10, TimeUnit.SECONDS) // 每 10 秒发送 PING
|
||||
.keepAliveTimeout(3, TimeUnit.SECONDS) // 3 秒内没收到 ACK 视为死连接
|
||||
.keepAliveWithoutCalls(true) // 即使没有活跃 RPC 也发送 PING
|
||||
|
||||
// 重试配置
|
||||
.enableRetry() // 启用 unary RPC 重试
|
||||
.maxRetryAttempts(5)
|
||||
|
||||
// 其他优化
|
||||
.idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS) // 不要自动关闭空闲连接
|
||||
|
||||
.build()
|
||||
```
|
||||
|
||||
**重要参数说明**:
|
||||
|
||||
| 参数 | 建议值 | 说明 |
|
||||
|------|--------|------|
|
||||
| `keepAliveTime` | 10s-30s | PING 发送间隔,太短会浪费流量 |
|
||||
| `keepAliveTimeout` | 3s | 等待 ACK 超时,判定连接死亡 |
|
||||
| `keepAliveWithoutCalls` | true | 没有活跃 RPC 时也 PING(对流很重要)|
|
||||
| `idleTimeout` | MAX | 不要自动关闭连接 |
|
||||
|
||||
## Android 网络状态监听(加速重连)
|
||||
|
||||
```kotlin
|
||||
class GrpcClient {
|
||||
fun setupNetworkMonitoring(context: Context) {
|
||||
val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
|
||||
|
||||
val networkCallback = object : ConnectivityManager.NetworkCallback() {
|
||||
override fun onAvailable(network: Network) {
|
||||
android.util.Log.d(TAG, "Network available, resetting backoff")
|
||||
// 重要:立即重置重连退避,避免等待 60 秒 DNS 解析
|
||||
channel?.resetConnectBackoff()
|
||||
}
|
||||
|
||||
override fun onLost(network: Network) {
|
||||
android.util.Log.w(TAG, "Network lost")
|
||||
}
|
||||
}
|
||||
|
||||
val request = NetworkRequest.Builder()
|
||||
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
|
||||
.build()
|
||||
|
||||
connectivityManager.registerNetworkCallback(request, networkCallback)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 修复方案对比
|
||||
|
||||
### ❌ 当前错误方案
|
||||
```kotlin
|
||||
// 尝试"恢复"已关闭的流
|
||||
fun restoreStreamsAfterReconnect() {
|
||||
// 问题:Flow 已经关闭,无法恢复
|
||||
// subscribeSessionEvents 返回的 Flow 已经是死的
|
||||
}
|
||||
```
|
||||
|
||||
### ✅ 正确方案 A: 保存配置 + 重新发起
|
||||
```kotlin
|
||||
// 保存流配置
|
||||
private var activeEventStream: String? = null
|
||||
|
||||
fun startEventStream(partyId: String) {
|
||||
activeEventStream = partyId // 保存配置
|
||||
launchEventStream(partyId) // 发起流
|
||||
}
|
||||
|
||||
fun onReconnected() {
|
||||
// 重新发起 RPC 调用
|
||||
activeEventStream?.let { launchEventStream(it) }
|
||||
}
|
||||
|
||||
private fun launchEventStream(partyId: String) {
|
||||
scope.launch {
|
||||
grpcClient.subscribeSessionEvents(partyId).collect { ... }
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### ✅ 正确方案 B: 自动重试流
|
||||
```kotlin
|
||||
fun startEventStreamWithAutoReconnect(partyId: String) {
|
||||
scope.launch {
|
||||
flow {
|
||||
// 每次都重新发起 RPC
|
||||
grpcClient.subscribeSessionEvents(partyId).collect { emit(it) }
|
||||
}
|
||||
.retryWhen { cause, attempt ->
|
||||
Log.w(TAG, "Stream failed, restarting (attempt $attempt)")
|
||||
delay(1000L * (attempt + 1))
|
||||
true // 永远重试
|
||||
}
|
||||
.collect { event ->
|
||||
// 处理事件
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 为什么在同一路由器下也会断连
|
||||
|
||||
即使在同一路由器下,仍可能出现连接问题:
|
||||
|
||||
1. **手机网络切换**: WiFi ↔ 移动数据自动切换
|
||||
2. **省电模式**: Android Doze/App Standby 限制网络
|
||||
3. **TCP 空闲超时**: 路由器/防火墙关闭空闲连接(通常 2-5 分钟)
|
||||
4. **HTTP/2 连接老化**: 长时间无活动可能被中间设备清理
|
||||
5. **应用后台**: 系统限制后台网络访问
|
||||
|
||||
**Keep-Alive 的作用**: 定期发送 PING,告诉路由器/防火墙"我还活着",防止连接被清理
|
||||
|
||||
## 实施计划
|
||||
|
||||
### 第 1 步: 添加 Keep-Alive 配置
|
||||
|
||||
修改 `GrpcClient.kt` 的 `doConnect()`:
|
||||
|
||||
```kotlin
|
||||
private fun doConnect(host: String, port: Int) {
|
||||
val channelBuilder = ManagedChannelBuilder
|
||||
.forAddress(host, port)
|
||||
.usePlaintext()
|
||||
|
||||
// ✅ 添加 Keep-Alive
|
||||
.keepAliveTime(20, TimeUnit.SECONDS)
|
||||
.keepAliveTimeout(5, TimeUnit.SECONDS)
|
||||
.keepAliveWithoutCalls(true)
|
||||
|
||||
// ✅ 永不超时
|
||||
.idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS)
|
||||
|
||||
channel = channelBuilder.build()
|
||||
}
|
||||
```
|
||||
|
||||
### 第 2 步: 修改流管理模式
|
||||
|
||||
#### 选项 A: 最小改动(推荐先试)
|
||||
|
||||
修改 `TssRepository.kt`:
|
||||
|
||||
```kotlin
|
||||
private var shouldMonitorEvents = false
|
||||
private var eventStreamPartyId: String? = null
|
||||
|
||||
fun subscribeToSessionEvents(partyId: String) {
|
||||
eventStreamPartyId = partyId
|
||||
shouldMonitorEvents = true
|
||||
launchEventStream(partyId)
|
||||
}
|
||||
|
||||
private fun launchEventStream(partyId: String) {
|
||||
scope.launch {
|
||||
flow {
|
||||
grpcClient.subscribeSessionEvents(partyId).collect { emit(it) }
|
||||
}
|
||||
.retryWhen { cause, attempt ->
|
||||
if (!shouldMonitorEvents) return@retryWhen false // 停止重试
|
||||
|
||||
Log.w(TAG, "Event stream failed, restarting in ${attempt}s: ${cause.message}")
|
||||
delay(1000L * min(attempt, 30))
|
||||
true
|
||||
}
|
||||
.collect { event ->
|
||||
handleSessionEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun stopMonitoringEvents() {
|
||||
shouldMonitorEvents = false
|
||||
eventStreamPartyId = null
|
||||
}
|
||||
```
|
||||
|
||||
#### 选项 B: 完整重构(更健壮)
|
||||
|
||||
参考"模式 1"创建 `StreamManager` 类。
|
||||
|
||||
### 第 3 步: 添加网络监听
|
||||
|
||||
修改 `MainActivity.kt` 或 `GrpcClient.kt`:
|
||||
|
||||
```kotlin
|
||||
fun setupNetworkCallback(context: Context) {
|
||||
val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
|
||||
|
||||
val callback = object : ConnectivityManager.NetworkCallback() {
|
||||
override fun onAvailable(network: Network) {
|
||||
channel?.resetConnectBackoff()
|
||||
}
|
||||
}
|
||||
|
||||
val request = NetworkRequest.Builder()
|
||||
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
|
||||
.build()
|
||||
|
||||
connectivityManager.registerNetworkCallback(request, callback)
|
||||
}
|
||||
```
|
||||
|
||||
## 测试验证
|
||||
|
||||
1. ✅ 正常启动 → 订阅事件 → 收到 `session_started`
|
||||
2. ✅ 飞行模式 30 秒 → 关闭飞行模式 → 自动重新订阅 → 收到事件
|
||||
3. ✅ 应用后台 5 分钟 → 恢复前台 → Keep-Alive 保持连接 → 收到事件
|
||||
4. ✅ 长时间空闲(30 分钟)→ 创建会话 → Keep-Alive 仍然工作
|
||||
|
||||
## 参考资料
|
||||
|
||||
### 官方文档
|
||||
- [gRPC Keepalive Guide](https://grpc.io/docs/guides/keepalive/)
|
||||
- [Android gRPC Guide](https://developer.android.com/guide/topics/connectivity/grpc)
|
||||
- [Performance Best Practices](https://learn.microsoft.com/en-us/aspnet/core/grpc/performance)
|
||||
|
||||
### 关键 Issues
|
||||
- [How to restart bi-directional stream after network disconnection](https://github.com/grpc/grpc-java/issues/8177)
|
||||
- [Network connectivity changes on Android](https://github.com/grpc/grpc-java/issues/4011)
|
||||
|
||||
### 最新文章 (2026)
|
||||
- [How to Implement gRPC Keepalive for Long-Lived Connections](https://oneuptime.com/blog/post/2026-01-08-grpc-keepalive-connections/view)
|
||||
|
||||
## 总结
|
||||
|
||||
### 当前问题根源
|
||||
1. **设计错误**: 尝试"恢复"已关闭的流,但 gRPC 流无法恢复
|
||||
2. **缺少 Keep-Alive**: 空闲连接被中间设备清理
|
||||
3. **没有自动重启**: 流失败后需要手动重新发起
|
||||
|
||||
### 正确解决方案
|
||||
1. ✅ 添加 Keep-Alive 配置(20s PING,5s 超时)
|
||||
2. ✅ 保存流配置,失败后重新发起 RPC(不是"恢复")
|
||||
3. ✅ 使用 Flow.retryWhen 自动重启流
|
||||
4. ✅ 监听网络状态,立即 resetConnectBackoff()
|
||||
|
||||
### 关键理念转变
|
||||
```
|
||||
旧思维: 连接 → 订阅流 → 断开 → 重连 → "恢复"流 ❌
|
||||
新思维: 连接 → 订阅流 → 断开 → 重连 → "重新发起"流 ✅
|
||||
```
|
||||
|
||||
**Flow 不是持久化对象,是一次性的数据流。断开后必须重新创建。**
|
||||
|
|
@ -0,0 +1,251 @@
|
|||
# Reconnection Event Stream Bug Analysis
|
||||
|
||||
## Problem Summary
|
||||
|
||||
After network disconnection and reconnection, the event stream subscription is NOT restored, causing:
|
||||
- No `session_started` event received
|
||||
- Keygen never starts
|
||||
- Messages pile up forever (539 pending)
|
||||
|
||||
## Root Cause
|
||||
|
||||
### The Bug Chain
|
||||
|
||||
```
|
||||
1. Network disconnects
|
||||
↓
|
||||
2. subscribeSessionEvents Flow closes
|
||||
↓
|
||||
3. awaitClose block executes (GrpcClient.kt:925)
|
||||
eventStreamSubscribed.set(false) ← FLAG CLEARED
|
||||
↓
|
||||
4. Network reconnects successfully
|
||||
↓
|
||||
5. reSubscribeStreams() called (GrpcClient.kt:202)
|
||||
↓
|
||||
6. Line 506 checks:
|
||||
val needsResubscribe = eventStreamSubscribed.get() || activeMessageSubscription != null
|
||||
↓
|
||||
7. eventStreamSubscribed.get() returns FALSE ❌
|
||||
activeMessageSubscription is also NULL ❌
|
||||
↓
|
||||
8. needsResubscribe = false
|
||||
↓
|
||||
9. Callback NEVER invoked
|
||||
↓
|
||||
10. Event stream NEVER restored
|
||||
```
|
||||
|
||||
## Code Evidence
|
||||
|
||||
### GrpcClient.kt - Where Flag is Set/Cleared
|
||||
|
||||
**Line 844** - Flag set when subscribing:
|
||||
```kotlin
|
||||
fun subscribeSessionEvents(partyId: String): Flow<SessionEventData> = callbackFlow {
|
||||
eventStreamSubscribed.set(true) ← Set to TRUE
|
||||
eventStreamPartyId = partyId
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
**Line 925** - Flag cleared when Flow closes (THE BUG):
|
||||
```kotlin
|
||||
awaitClose {
|
||||
Log.d(TAG, "subscribeSessionEvents: Flow closed for partyId=$partyId")
|
||||
eventStreamSubscribed.set(false) ← Set to FALSE on disconnect
|
||||
eventStreamPartyId = null
|
||||
}
|
||||
```
|
||||
|
||||
**Line 506** - Reconnection check (FAILS because flag is false):
|
||||
```kotlin
|
||||
private fun reSubscribeStreams() {
|
||||
val needsResubscribe = eventStreamSubscribed.get() || activeMessageSubscription != null
|
||||
// ↑ Returns FALSE after disconnect
|
||||
|
||||
if (needsResubscribe) { ← This condition is FALSE
|
||||
Log.d(TAG, "Triggering stream re-subscription callback")
|
||||
...
|
||||
onReconnectedCallback?.invoke() ← NEVER REACHED
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Log Evidence
|
||||
|
||||
### Normal Reconnection (16:28:26) - WORKS ✅
|
||||
```
|
||||
16:28:26.082 D/GrpcClient: Connected successfully
|
||||
16:28:26.086 D/GrpcClient: Re-registering party: 7c72c28f...
|
||||
16:28:26.130 D/GrpcClient: Party registered: 7c72c28f...
|
||||
16:28:26.130 D/GrpcClient: Triggering stream re-subscription callback ← Present!
|
||||
16:28:26.130 D/GrpcClient: - Event stream: true, partyId: 7c72c28f...
|
||||
16:28:26.130 D/TssRepository: gRPC reconnected, restoring streams... ← Present!
|
||||
16:28:26.130 D/TssRepository: Restoring session event subscription ← Present!
|
||||
```
|
||||
|
||||
### Problem Reconnection (16:29:47) - FAILS ❌
|
||||
```
|
||||
16:29:47.090 D/GrpcClient: Connected successfully
|
||||
16:29:47.093 D/GrpcClient: Re-registering party: 7c72c28f...
|
||||
16:29:47.146 D/GrpcClient: Party registered: 7c72c28f...
|
||||
[MISSING]: "Triggering stream re-subscription callback" ← NOT PRESENT!
|
||||
[MISSING]: "gRPC reconnected, restoring streams..." ← NOT PRESENT!
|
||||
[MISSING]: "Restoring session event subscription" ← NOT PRESENT!
|
||||
|
||||
Result:
|
||||
16:30:47.198 W/GrpcClient: Has 539 pending messages - may have missed events
|
||||
16:31:17.237 W/GrpcClient: Has 539 pending messages - may have missed events
|
||||
```
|
||||
|
||||
## Why First Reconnection Worked
|
||||
|
||||
Looking at the timeline:
|
||||
```
|
||||
16:27:53 - App started, event subscription started
|
||||
16:28:26 - First reconnect (1 minute later)
|
||||
Event subscription was STILL ACTIVE
|
||||
eventStreamSubscribed = true ✅
|
||||
|
||||
16:29:15 - Network disconnect (49 seconds later)
|
||||
Flow closed → eventStreamSubscribed set to FALSE ❌
|
||||
|
||||
16:29:47 - Second reconnect
|
||||
eventStreamSubscribed = false ❌
|
||||
Callback NOT invoked ❌
|
||||
```
|
||||
|
||||
**Key Insight**: The first reconnection worked because the event stream Flow hadn't closed yet. The second reconnection failed because the Flow had closed and cleared the flag.
|
||||
|
||||
## The Design Flaw
|
||||
|
||||
The current design has a **state tracking inconsistency**:
|
||||
|
||||
```kotlin
|
||||
// When to subscribe?
|
||||
eventStreamSubscribed = true // "I am currently subscribed"
|
||||
|
||||
// When to unsubscribe?
|
||||
eventStreamSubscribed = false // "I am no longer subscribed"
|
||||
|
||||
// When to re-subscribe?
|
||||
if (eventStreamSubscribed) { ... } // ❌ WRONG - flag is already false!
|
||||
```
|
||||
|
||||
**Problem**: The flag tracks "am I currently subscribed?" but reconnection logic needs to know "should I re-subscribe?". These are two different concepts.
|
||||
|
||||
## Solution Options
|
||||
|
||||
### Option 1: Don't Clear Flag in awaitClose (Simple)
|
||||
|
||||
```kotlin
|
||||
awaitClose {
|
||||
Log.d(TAG, "subscribeSessionEvents: Flow closed for partyId=$partyId")
|
||||
// DON'T clear the flag - keep it for reconnection
|
||||
// eventStreamSubscribed.set(false) ← REMOVE THIS
|
||||
// eventStreamPartyId = null ← REMOVE THIS
|
||||
}
|
||||
```
|
||||
|
||||
**Pros**: Minimal change, preserves intent to re-subscribe
|
||||
**Cons**: Flag no longer accurately reflects current state
|
||||
|
||||
### Option 2: Add Separate "Should Restore" Flag (Better)
|
||||
|
||||
```kotlin
|
||||
// Two separate flags:
|
||||
private val eventStreamSubscribed = AtomicBoolean(false) // Current state
|
||||
private val shouldRestoreEventStream = AtomicBoolean(false) // Intent to restore
|
||||
|
||||
// When subscribing:
|
||||
eventStreamSubscribed.set(true)
|
||||
shouldRestoreEventStream.set(true) // Remember to restore
|
||||
|
||||
// In awaitClose:
|
||||
eventStreamSubscribed.set(false) // No longer subscribed
|
||||
// Keep shouldRestoreEventStream = true // But should restore on reconnect
|
||||
|
||||
// In reSubscribeStreams:
|
||||
val needsResubscribe = shouldRestoreEventStream.get() || activeMessageSubscription != null
|
||||
```
|
||||
|
||||
**Pros**: Clear separation of concerns, accurate state tracking
|
||||
**Cons**: More code, requires careful handling of clear conditions
|
||||
|
||||
### Option 3: Store Last Subscription State (Most Robust)
|
||||
|
||||
```kotlin
|
||||
// Store full subscription state for recovery
|
||||
private data class StreamState(
|
||||
val eventStreamPartyId: String?,
|
||||
val messageSessionId: String?,
|
||||
val messagePartyId: String?
|
||||
)
|
||||
|
||||
private val lastStreamState = AtomicReference<StreamState>(null)
|
||||
|
||||
// On subscribe, save state
|
||||
// On reconnect, restore from saved state
|
||||
```
|
||||
|
||||
**Pros**: Can restore exact previous state, handles complex scenarios
|
||||
**Cons**: Most complex implementation
|
||||
|
||||
## Recommended Fix
|
||||
|
||||
**Use Option 1 (simplest) with Option 2 concept (clearer intent)**:
|
||||
|
||||
1. Don't clear `eventStreamSubscribed` in `awaitClose`
|
||||
2. Only clear it when user explicitly unsubscribes or app shuts down
|
||||
3. This preserves the "I was subscribed, so re-subscribe on reconnect" behavior
|
||||
|
||||
**Alternative**: Add explicit unsubscribe call only when intentionally stopping (not on disconnect).
|
||||
|
||||
## Files to Modify
|
||||
|
||||
### GrpcClient.kt
|
||||
|
||||
**Line 923-927** - Remove flag clearing in awaitClose:
|
||||
```kotlin
|
||||
awaitClose {
|
||||
Log.d(TAG, "subscribeSessionEvents: Flow closed for partyId=$partyId")
|
||||
// Keep flags for reconnection - don't clear here
|
||||
// Only clear on explicit unsubscribe or disconnect
|
||||
}
|
||||
```
|
||||
|
||||
**Line 933-936** - Keep explicit unsubscribe as-is:
|
||||
```kotlin
|
||||
fun unsubscribeSessionEvents() {
|
||||
eventStreamSubscribed.set(false)
|
||||
eventStreamPartyId = null
|
||||
}
|
||||
```
|
||||
|
||||
## Testing Checklist
|
||||
|
||||
After fix:
|
||||
- [ ] Start app, subscribe to events
|
||||
- [ ] Simulate network disconnect (airplane mode)
|
||||
- [ ] Verify log shows: "Triggering stream re-subscription callback"
|
||||
- [ ] Verify log shows: "gRPC reconnected, restoring streams..."
|
||||
- [ ] Verify log shows: "Restoring session event subscription"
|
||||
- [ ] Verify pending messages start decreasing
|
||||
- [ ] Test 2-of-3 keygen succeeds after reconnection
|
||||
|
||||
## Why This Wasn't Caught Before
|
||||
|
||||
1. **Timing-dependent**: Only fails if Flow closes before reconnect
|
||||
2. **Works in most cases**: Quick reconnects (< 1 minute) often succeed before Flow timeout
|
||||
3. **No explicit test**: Didn't test scenario of "disconnect → wait for Flow to close → reconnect"
|
||||
4. **Silent failure**: No error logged, just missing callback invocation
|
||||
|
||||
## Conclusion
|
||||
|
||||
The safeLaunch optimization did NOT cause this bug. The bug exists because:
|
||||
1. `awaitClose` clears `eventStreamSubscribed` on disconnect
|
||||
2. Reconnection logic relies on this flag to decide if callback should be invoked
|
||||
3. After disconnect, flag is false, so callback is never invoked
|
||||
|
||||
**Fix**: Don't clear the subscription intent flag on temporary disconnection.
|
||||
|
|
@ -65,6 +65,13 @@ fun TssPartyApp(
|
|||
viewModel: MainViewModel = hiltViewModel(),
|
||||
onCopyToClipboard: (String) -> Unit = {}
|
||||
) {
|
||||
val context = LocalContext.current
|
||||
|
||||
// Setup network monitoring once during initialization
|
||||
LaunchedEffect(Unit) {
|
||||
viewModel.setupNetworkMonitoring(context)
|
||||
}
|
||||
|
||||
val navController = rememberNavController()
|
||||
val appState by viewModel.appState.collectAsState()
|
||||
val uiState by viewModel.uiState.collectAsState()
|
||||
|
|
@ -119,7 +126,7 @@ fun TssPartyApp(
|
|||
var transferWalletId by remember { mutableStateOf<Long?>(null) }
|
||||
|
||||
// Export/Import file handling
|
||||
val context = LocalContext.current
|
||||
// Note: context is already declared at the top of the function
|
||||
// Use rememberSaveable to persist across configuration changes (e.g., file picker activity)
|
||||
var pendingExportJson by rememberSaveable { mutableStateOf<String?>(null) }
|
||||
var pendingExportAddress by rememberSaveable { mutableStateOf<String?>(null) }
|
||||
|
|
|
|||
|
|
@ -1,5 +1,10 @@
|
|||
package com.durian.tssparty.data.remote
|
||||
|
||||
import android.content.Context
|
||||
import android.net.ConnectivityManager
|
||||
import android.net.Network
|
||||
import android.net.NetworkCapabilities
|
||||
import android.net.NetworkRequest
|
||||
import android.util.Base64
|
||||
import android.util.Log
|
||||
import com.durian.tssparty.domain.model.Participant
|
||||
|
|
@ -101,24 +106,100 @@ class GrpcClient @Inject constructor() {
|
|||
private var heartbeatJob: Job? = null
|
||||
private val heartbeatFailCount = AtomicInteger(0)
|
||||
|
||||
// Stream subscriptions (for recovery after reconnect)
|
||||
private var activeMessageSubscription: MessageSubscription? = null
|
||||
private var eventStreamSubscribed = AtomicBoolean(false)
|
||||
private var eventStreamPartyId: String? = null
|
||||
|
||||
// Stream version tracking (to detect stale stream events)
|
||||
private val messageStreamVersion = AtomicInteger(0)
|
||||
private val eventStreamVersion = AtomicInteger(0)
|
||||
|
||||
// Reconnection callback - called when streams need to be re-established
|
||||
private var onReconnectedCallback: (() -> Unit)? = null
|
||||
|
||||
// Channel state monitoring
|
||||
private var channelStateMonitorJob: Job? = null
|
||||
|
||||
// Coroutine scope for background tasks
|
||||
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
|
||||
|
||||
// Network monitoring
|
||||
private var networkCallback: ConnectivityManager.NetworkCallback? = null
|
||||
|
||||
/**
|
||||
* Setup network state monitoring to accelerate reconnection
|
||||
*
|
||||
* Android network state monitoring is critical for reliable gRPC connections:
|
||||
* 1. When network becomes available, call channel.resetConnectBackoff()
|
||||
* 2. This bypasses the default 60-second DNS resolution backoff
|
||||
* 3. Enables immediate reconnection instead of waiting
|
||||
*
|
||||
* Reference: https://github.com/grpc/grpc-java/issues/4011
|
||||
*
|
||||
* @param context Application or Activity context
|
||||
*/
|
||||
fun setupNetworkMonitoring(context: Context) {
|
||||
// Unregister old callback if exists
|
||||
networkCallback?.let { callback ->
|
||||
try {
|
||||
val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as? ConnectivityManager
|
||||
connectivityManager?.unregisterNetworkCallback(callback)
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Failed to unregister old network callback", e)
|
||||
}
|
||||
}
|
||||
|
||||
val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as? ConnectivityManager
|
||||
if (connectivityManager == null) {
|
||||
Log.e(TAG, "ConnectivityManager not available")
|
||||
return
|
||||
}
|
||||
|
||||
val callback = object : ConnectivityManager.NetworkCallback() {
|
||||
override fun onAvailable(network: Network) {
|
||||
Log.d(TAG, "Network available, resetting connect backoff for immediate reconnection")
|
||||
// CRITICAL: Reset backoff to avoid 60-second DNS resolution delay
|
||||
channel?.resetConnectBackoff()
|
||||
}
|
||||
|
||||
override fun onLost(network: Network) {
|
||||
Log.w(TAG, "Network lost")
|
||||
}
|
||||
|
||||
override fun onCapabilitiesChanged(network: Network, networkCapabilities: NetworkCapabilities) {
|
||||
val hasInternet = networkCapabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
|
||||
val isValidated = networkCapabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED)
|
||||
Log.d(TAG, "Network capabilities changed: hasInternet=$hasInternet, isValidated=$isValidated")
|
||||
|
||||
// Reset backoff when network becomes validated (has actual internet connectivity)
|
||||
if (hasInternet && isValidated) {
|
||||
channel?.resetConnectBackoff()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val request = NetworkRequest.Builder()
|
||||
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
|
||||
.build()
|
||||
|
||||
try {
|
||||
connectivityManager.registerNetworkCallback(request, callback)
|
||||
networkCallback = callback
|
||||
Log.d(TAG, "Network monitoring registered successfully")
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Failed to register network callback", e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop network monitoring (call in cleanup)
|
||||
*/
|
||||
fun stopNetworkMonitoring(context: Context) {
|
||||
networkCallback?.let { callback ->
|
||||
try {
|
||||
val connectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as? ConnectivityManager
|
||||
connectivityManager?.unregisterNetworkCallback(callback)
|
||||
networkCallback = null
|
||||
Log.d(TAG, "Network monitoring stopped")
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Failed to unregister network callback", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to the Message Router server
|
||||
*/
|
||||
|
|
@ -142,10 +223,11 @@ class GrpcClient @Inject constructor() {
|
|||
|
||||
val builder = ManagedChannelBuilder
|
||||
.forAddress(host, port)
|
||||
.keepAliveTime(30, TimeUnit.SECONDS)
|
||||
.keepAliveTimeout(10, TimeUnit.SECONDS)
|
||||
.keepAliveWithoutCalls(true)
|
||||
.idleTimeout(5, TimeUnit.MINUTES)
|
||||
// Keep-Alive configuration for stable long-lived connections
|
||||
.keepAliveTime(20, TimeUnit.SECONDS) // Send PING every 20 seconds
|
||||
.keepAliveTimeout(5, TimeUnit.SECONDS) // 5 seconds to wait for ACK
|
||||
.keepAliveWithoutCalls(true) // Keep pinging even without active RPCs
|
||||
.idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS) // Never timeout idle connections
|
||||
|
||||
// Use TLS for port 443, plaintext for other ports (like local development)
|
||||
if (port == 443) {
|
||||
|
|
@ -198,8 +280,8 @@ class GrpcClient @Inject constructor() {
|
|||
// Restart heartbeat
|
||||
startHeartbeat()
|
||||
|
||||
// Re-subscribe to streams
|
||||
reSubscribeStreams()
|
||||
// Emit reconnected event for StreamManager to restart streams
|
||||
_connectionEvents.tryEmit(GrpcConnectionEvent.Reconnected)
|
||||
|
||||
return@withTimeout
|
||||
}
|
||||
|
|
@ -498,65 +580,6 @@ class GrpcClient @Inject constructor() {
|
|||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-subscribe to streams after reconnection
|
||||
* Notifies the repository layer to re-establish message/event subscriptions
|
||||
*/
|
||||
private fun reSubscribeStreams() {
|
||||
val needsResubscribe = eventStreamSubscribed.get() || activeMessageSubscription != null
|
||||
|
||||
if (needsResubscribe) {
|
||||
Log.d(TAG, "Triggering stream re-subscription callback")
|
||||
Log.d(TAG, " - Event stream: ${eventStreamSubscribed.get()}, partyId: $eventStreamPartyId")
|
||||
Log.d(TAG, " - Message stream: ${activeMessageSubscription?.sessionId}")
|
||||
|
||||
// Notify repository to re-establish streams
|
||||
scope.launch {
|
||||
// Wait for channel to be fully ready instead of fixed delay
|
||||
if (waitForChannelReady()) {
|
||||
try {
|
||||
onReconnectedCallback?.invoke()
|
||||
// Emit reconnected event
|
||||
_connectionEvents.tryEmit(GrpcConnectionEvent.Reconnected)
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Reconnect callback failed: ${e.message}")
|
||||
// Don't let callback failure affect the connection state
|
||||
}
|
||||
} else {
|
||||
Log.w(TAG, "Channel not ready for stream re-subscription, skipping")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No streams to restore, still emit reconnected event
|
||||
_connectionEvents.tryEmit(GrpcConnectionEvent.Reconnected)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set callback for reconnection events
|
||||
* Called when connection is restored and streams need to be re-established
|
||||
*/
|
||||
fun setOnReconnectedCallback(callback: () -> Unit) {
|
||||
onReconnectedCallback = callback
|
||||
}
|
||||
|
||||
/**
|
||||
* Get active message subscription info (for recovery)
|
||||
*/
|
||||
fun getActiveMessageSubscription(): Pair<String, String>? {
|
||||
return activeMessageSubscription?.let { Pair(it.sessionId, it.partyId) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if event stream was subscribed (for recovery)
|
||||
*/
|
||||
fun wasEventStreamSubscribed(): Boolean = eventStreamSubscribed.get()
|
||||
|
||||
/**
|
||||
* Get event stream party ID (for recovery)
|
||||
*/
|
||||
fun getEventStreamPartyId(): String? = eventStreamPartyId
|
||||
|
||||
/**
|
||||
* Check if connected
|
||||
*/
|
||||
|
|
@ -764,9 +787,6 @@ class GrpcClient @Inject constructor() {
|
|||
* Subscribe to messages for a party (with auto-recovery)
|
||||
*/
|
||||
fun subscribeMessages(sessionId: String, partyId: String): Flow<IncomingMessage> = callbackFlow {
|
||||
// Save subscription for recovery
|
||||
activeMessageSubscription = MessageSubscription(sessionId, partyId)
|
||||
|
||||
// Capture current stream version to detect stale callbacks
|
||||
val streamVersion = messageStreamVersion.incrementAndGet()
|
||||
|
||||
|
|
@ -820,11 +840,8 @@ class GrpcClient @Inject constructor() {
|
|||
return
|
||||
}
|
||||
|
||||
// Stream ended unexpectedly - trigger reconnect if we should still be subscribed
|
||||
if (activeMessageSubscription != null && shouldReconnect.get()) {
|
||||
Log.w(TAG, "Message stream ended unexpectedly, triggering reconnect")
|
||||
triggerReconnect("Message stream ended")
|
||||
}
|
||||
// Stream ended - StreamManager will handle reconnection
|
||||
Log.d(TAG, "Message stream ended")
|
||||
close()
|
||||
}
|
||||
}
|
||||
|
|
@ -832,7 +849,7 @@ class GrpcClient @Inject constructor() {
|
|||
asyncStub?.subscribeMessages(request, observer)
|
||||
|
||||
awaitClose {
|
||||
activeMessageSubscription = null
|
||||
Log.d(TAG, "subscribeMessages: Flow closed for sessionId=$sessionId")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -840,10 +857,6 @@ class GrpcClient @Inject constructor() {
|
|||
* Subscribe to session events (with auto-recovery)
|
||||
*/
|
||||
fun subscribeSessionEvents(partyId: String): Flow<SessionEventData> = callbackFlow {
|
||||
// Save subscription for recovery
|
||||
eventStreamSubscribed.set(true)
|
||||
eventStreamPartyId = partyId
|
||||
|
||||
// Capture current stream version to detect stale callbacks
|
||||
val streamVersion = eventStreamVersion.incrementAndGet()
|
||||
|
||||
|
|
@ -900,11 +913,8 @@ class GrpcClient @Inject constructor() {
|
|||
return
|
||||
}
|
||||
|
||||
// Stream ended unexpectedly - trigger reconnect if we should still be subscribed
|
||||
if (eventStreamSubscribed.get() && shouldReconnect.get()) {
|
||||
Log.w(TAG, "Event stream ended unexpectedly, triggering reconnect")
|
||||
triggerReconnect("Event stream ended")
|
||||
}
|
||||
// Stream ended - StreamManager will handle reconnection
|
||||
Log.d(TAG, "Event stream ended")
|
||||
close()
|
||||
}
|
||||
}
|
||||
|
|
@ -922,26 +932,9 @@ class GrpcClient @Inject constructor() {
|
|||
|
||||
awaitClose {
|
||||
Log.d(TAG, "subscribeSessionEvents: Flow closed for partyId=$partyId")
|
||||
eventStreamSubscribed.set(false)
|
||||
eventStreamPartyId = null
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe from session events
|
||||
*/
|
||||
fun unsubscribeSessionEvents() {
|
||||
eventStreamSubscribed.set(false)
|
||||
eventStreamPartyId = null
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe from messages
|
||||
*/
|
||||
fun unsubscribeMessages() {
|
||||
activeMessageSubscription = null
|
||||
}
|
||||
|
||||
/**
|
||||
* Report completion
|
||||
*/
|
||||
|
|
@ -1009,13 +1002,6 @@ class GrpcClient @Inject constructor() {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Message subscription info
|
||||
*/
|
||||
private data class MessageSubscription(
|
||||
val sessionId: String,
|
||||
val partyId: String
|
||||
)
|
||||
|
||||
/**
|
||||
* Data class for join session response
|
||||
|
|
|
|||
|
|
@ -0,0 +1,282 @@
|
|||
package com.durian.tssparty.data.remote
|
||||
|
||||
import android.util.Log
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlin.math.min
|
||||
|
||||
/**
|
||||
* Manages gRPC stream lifecycle with automatic reconnection
|
||||
*
|
||||
* Key principles:
|
||||
* 1. Streams cannot be "restored" - they must be re-initiated after disconnection
|
||||
* 2. Each stream maintains its configuration for automatic restart
|
||||
* 3. Exponential backoff for failed stream attempts
|
||||
* 4. All streams are supervised - failure of one doesn't affect others
|
||||
*/
|
||||
class StreamManager(
|
||||
private val grpcClient: GrpcClient,
|
||||
private val scope: CoroutineScope
|
||||
) {
|
||||
companion object {
|
||||
private const val TAG = "StreamManager"
|
||||
private const val MAX_RETRY_DELAY_SECONDS = 30L
|
||||
}
|
||||
|
||||
// Active stream configurations
|
||||
private var eventStreamConfig: EventStreamConfig? = null
|
||||
private var messageStreamConfig: MessageStreamConfig? = null
|
||||
|
||||
// Active stream jobs
|
||||
private var eventStreamJob: Job? = null
|
||||
private var messageStreamJob: Job? = null
|
||||
|
||||
// Supervision flags
|
||||
private var shouldMaintainEventStream = false
|
||||
private var shouldMaintainMessageStream = false
|
||||
|
||||
/**
|
||||
* Start event stream with automatic reconnection
|
||||
*
|
||||
* @param partyId Party ID to subscribe events for
|
||||
* @param onEvent Callback for each event received
|
||||
* @param onError Callback for stream errors (before retry)
|
||||
*/
|
||||
fun startEventStream(
|
||||
partyId: String,
|
||||
onEvent: suspend (SessionEventData) -> Unit,
|
||||
onError: ((Throwable) -> Unit)? = null
|
||||
) {
|
||||
Log.d(TAG, "Starting event stream for partyId=$partyId")
|
||||
|
||||
// Save configuration
|
||||
eventStreamConfig = EventStreamConfig(partyId, onEvent, onError)
|
||||
shouldMaintainEventStream = true
|
||||
|
||||
// Stop existing stream if any
|
||||
eventStreamJob?.cancel()
|
||||
|
||||
// Launch new stream with retry logic
|
||||
eventStreamJob = launchEventStream(partyId, onEvent, onError)
|
||||
}
|
||||
|
||||
/**
|
||||
* Start message stream with automatic reconnection
|
||||
*
|
||||
* @param sessionId Session ID
|
||||
* @param partyId Party ID
|
||||
* @param partyIndex Party index in the session
|
||||
* @param onMessage Callback for each message received
|
||||
* @param onError Callback for stream errors (before retry)
|
||||
*/
|
||||
fun startMessageStream(
|
||||
sessionId: String,
|
||||
partyId: String,
|
||||
partyIndex: Int,
|
||||
onMessage: suspend (IncomingMessage) -> Unit,
|
||||
onError: ((Throwable) -> Unit)? = null
|
||||
) {
|
||||
Log.d(TAG, "Starting message stream for sessionId=$sessionId, partyId=$partyId")
|
||||
|
||||
// Save configuration
|
||||
messageStreamConfig = MessageStreamConfig(sessionId, partyId, partyIndex, onMessage, onError)
|
||||
shouldMaintainMessageStream = true
|
||||
|
||||
// Stop existing stream if any
|
||||
messageStreamJob?.cancel()
|
||||
|
||||
// Launch new stream with retry logic
|
||||
messageStreamJob = launchMessageStream(sessionId, partyId, partyIndex, onMessage, onError)
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop event stream
|
||||
*/
|
||||
fun stopEventStream() {
|
||||
Log.d(TAG, "Stopping event stream")
|
||||
shouldMaintainEventStream = false
|
||||
eventStreamConfig = null
|
||||
eventStreamJob?.cancel()
|
||||
eventStreamJob = null
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop message stream
|
||||
*/
|
||||
fun stopMessageStream() {
|
||||
Log.d(TAG, "Stopping message stream")
|
||||
shouldMaintainMessageStream = false
|
||||
messageStreamConfig = null
|
||||
messageStreamJob?.cancel()
|
||||
messageStreamJob = null
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart all active streams (called after reconnection)
|
||||
*/
|
||||
fun restartAllStreams() {
|
||||
Log.d(TAG, "Restarting all active streams")
|
||||
|
||||
// Restart event stream if it was active
|
||||
if (shouldMaintainEventStream) {
|
||||
eventStreamConfig?.let { config ->
|
||||
Log.d(TAG, "Restarting event stream for partyId=${config.partyId}")
|
||||
eventStreamJob?.cancel()
|
||||
eventStreamJob = launchEventStream(config.partyId, config.onEvent, config.onError)
|
||||
}
|
||||
}
|
||||
|
||||
// Restart message stream if it was active
|
||||
if (shouldMaintainMessageStream) {
|
||||
messageStreamConfig?.let { config ->
|
||||
Log.d(TAG, "Restarting message stream for sessionId=${config.sessionId}")
|
||||
messageStreamJob?.cancel()
|
||||
messageStreamJob = launchMessageStream(
|
||||
config.sessionId,
|
||||
config.partyId,
|
||||
config.partyIndex,
|
||||
config.onMessage,
|
||||
config.onError
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if event stream is active
|
||||
*/
|
||||
fun isEventStreamActive(): Boolean = shouldMaintainEventStream
|
||||
|
||||
/**
|
||||
* Check if message stream is active
|
||||
*/
|
||||
fun isMessageStreamActive(): Boolean = shouldMaintainMessageStream
|
||||
|
||||
/**
|
||||
* Get current event stream party ID
|
||||
*/
|
||||
fun getEventStreamPartyId(): String? = eventStreamConfig?.partyId
|
||||
|
||||
/**
|
||||
* Get current message stream session ID
|
||||
*/
|
||||
fun getMessageStreamSessionId(): String? = messageStreamConfig?.sessionId
|
||||
|
||||
// ===== Private Implementation =====
|
||||
|
||||
/**
|
||||
* Launch event stream with exponential backoff retry
|
||||
*/
|
||||
private fun launchEventStream(
|
||||
partyId: String,
|
||||
onEvent: suspend (SessionEventData) -> Unit,
|
||||
onError: ((Throwable) -> Unit)?
|
||||
): Job = scope.launch {
|
||||
Log.d(TAG, "Launching event stream flow with auto-retry for partyId=$partyId")
|
||||
|
||||
flow {
|
||||
// Re-initiate gRPC subscription (not "restore")
|
||||
grpcClient.subscribeSessionEvents(partyId).collect { event ->
|
||||
emit(event)
|
||||
}
|
||||
}
|
||||
.retryWhen { cause, attempt ->
|
||||
// Stop retrying if stream was explicitly stopped
|
||||
if (!shouldMaintainEventStream) {
|
||||
Log.d(TAG, "Event stream stopped, not retrying")
|
||||
return@retryWhen false
|
||||
}
|
||||
|
||||
// Calculate exponential backoff delay
|
||||
val delaySeconds = min(attempt + 1, MAX_RETRY_DELAY_SECONDS)
|
||||
Log.w(TAG, "Event stream failed (attempt ${attempt + 1}), retrying in ${delaySeconds}s: ${cause.message}")
|
||||
|
||||
// Notify error callback
|
||||
onError?.invoke(cause)
|
||||
|
||||
// Wait before retry
|
||||
delay(delaySeconds * 1000)
|
||||
true // Always retry if stream should be maintained
|
||||
}
|
||||
.catch { e ->
|
||||
// This should not happen due to retryWhen, but handle just in case
|
||||
Log.e(TAG, "Event stream terminated unexpectedly", e)
|
||||
onError?.invoke(e)
|
||||
}
|
||||
.collect { event ->
|
||||
try {
|
||||
onEvent(event)
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Error processing event", e)
|
||||
// Don't let handler errors kill the stream
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Launch message stream with exponential backoff retry
|
||||
*/
|
||||
private fun launchMessageStream(
|
||||
sessionId: String,
|
||||
partyId: String,
|
||||
partyIndex: Int,
|
||||
onMessage: suspend (IncomingMessage) -> Unit,
|
||||
onError: ((Throwable) -> Unit)?
|
||||
): Job = scope.launch {
|
||||
Log.d(TAG, "Launching message stream flow with auto-retry for sessionId=$sessionId")
|
||||
|
||||
flow {
|
||||
// Re-initiate gRPC subscription (not "restore")
|
||||
grpcClient.subscribeMessages(sessionId, partyId).collect { message ->
|
||||
emit(message)
|
||||
}
|
||||
}
|
||||
.retryWhen { cause, attempt ->
|
||||
// Stop retrying if stream was explicitly stopped
|
||||
if (!shouldMaintainMessageStream) {
|
||||
Log.d(TAG, "Message stream stopped, not retrying")
|
||||
return@retryWhen false
|
||||
}
|
||||
|
||||
// Calculate exponential backoff delay
|
||||
val delaySeconds = min(attempt + 1, MAX_RETRY_DELAY_SECONDS)
|
||||
Log.w(TAG, "Message stream failed (attempt ${attempt + 1}), retrying in ${delaySeconds}s: ${cause.message}")
|
||||
|
||||
// Notify error callback
|
||||
onError?.invoke(cause)
|
||||
|
||||
// Wait before retry
|
||||
delay(delaySeconds * 1000)
|
||||
true // Always retry if stream should be maintained
|
||||
}
|
||||
.catch { e ->
|
||||
// This should not happen due to retryWhen, but handle just in case
|
||||
Log.e(TAG, "Message stream terminated unexpectedly", e)
|
||||
onError?.invoke(e)
|
||||
}
|
||||
.collect { message ->
|
||||
try {
|
||||
onMessage(message)
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Error processing message", e)
|
||||
// Don't let handler errors kill the stream
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== Configuration Data Classes =====
|
||||
|
||||
private data class EventStreamConfig(
|
||||
val partyId: String,
|
||||
val onEvent: suspend (SessionEventData) -> Unit,
|
||||
val onError: ((Throwable) -> Unit)?
|
||||
)
|
||||
|
||||
private data class MessageStreamConfig(
|
||||
val sessionId: String,
|
||||
val partyId: String,
|
||||
val partyIndex: Int,
|
||||
val onMessage: suspend (IncomingMessage) -> Unit,
|
||||
val onError: ((Throwable) -> Unit)?
|
||||
)
|
||||
}
|
||||
|
|
@ -14,6 +14,7 @@ import com.durian.tssparty.data.remote.GrpcConnectionState
|
|||
import com.durian.tssparty.data.remote.IncomingMessage
|
||||
import com.durian.tssparty.data.remote.JoinSessionData
|
||||
import com.durian.tssparty.data.remote.SessionEventData
|
||||
import com.durian.tssparty.data.remote.StreamManager
|
||||
import com.durian.tssparty.domain.model.*
|
||||
import com.durian.tssparty.util.AddressUtils
|
||||
import com.durian.tssparty.util.TransactionUtils
|
||||
|
|
@ -213,6 +214,33 @@ class TssRepository @Inject constructor(
|
|||
coroutineExceptionHandler
|
||||
)
|
||||
|
||||
/**
|
||||
* StreamManager - 管理 gRPC 双向流的生命周期
|
||||
*
|
||||
* 【架构重构 - 可靠的流管理机制】
|
||||
*
|
||||
* 核心原则(来自 gRPC 官方文档):
|
||||
* - gRPC 流无法"恢复",必须重新发起 RPC 调用
|
||||
* - 流断开后需要重新调用 subscribeSessionEvents() / subscribeMessages()
|
||||
* - 保存流配置,自动重试失败的流(指数退避)
|
||||
*
|
||||
* 修复的关键问题:
|
||||
* 1. 旧设计尝试"恢复"已关闭的 Flow → 失败(Flow 不是持久化对象)
|
||||
* 2. 网络断开后 eventStreamSubscribed flag 被清除 → callback 不触发
|
||||
* 3. 流失败后没有自动重试机制 → 永久失败
|
||||
*
|
||||
* StreamManager 功能:
|
||||
* - 保存每个流的配置(partyId, sessionId 等)
|
||||
* - 流失败后自动重新发起 RPC(不是"恢复")
|
||||
* - 使用 Flow.retryWhen 实现指数退避重试
|
||||
* - 网络重连时,重启所有活跃的流
|
||||
*
|
||||
* 参考资料:
|
||||
* - https://github.com/grpc/grpc-java/issues/8177
|
||||
* - https://grpc.io/docs/guides/keepalive/
|
||||
*/
|
||||
private val streamManager = StreamManager(grpcClient, repositoryScope)
|
||||
|
||||
companion object {
|
||||
// Job 名称常量
|
||||
private const val JOB_MESSAGE_COLLECTION = "message_collection"
|
||||
|
|
@ -289,32 +317,16 @@ class TssRepository @Inject constructor(
|
|||
private var accountServiceUrl: String = "https://rwaapi.szaiai.com"
|
||||
|
||||
init {
|
||||
// Set up reconnection callback to restore streams
|
||||
grpcClient.setOnReconnectedCallback {
|
||||
android.util.Log.d("TssRepository", "gRPC reconnected, restoring streams...")
|
||||
restoreStreamsAfterReconnect()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Restore message and event streams after gRPC reconnection
|
||||
*/
|
||||
private fun restoreStreamsAfterReconnect() {
|
||||
val sessionId = currentMessageRoutingSessionId
|
||||
val partyIndex = currentMessageRoutingPartyIndex
|
||||
val routingPartyId = currentMessageRoutingPartyId
|
||||
|
||||
// Restore message routing if we had an active session
|
||||
if (sessionId != null && partyIndex != null) {
|
||||
android.util.Log.d("TssRepository", "Restoring message routing for session: $sessionId, routingPartyId: $routingPartyId")
|
||||
startMessageRouting(sessionId, partyIndex, routingPartyId)
|
||||
}
|
||||
|
||||
// Restore session event subscription with the correct partyId
|
||||
if (grpcClient.wasEventStreamSubscribed()) {
|
||||
val eventPartyId = currentSessionEventPartyId
|
||||
android.util.Log.d("TssRepository", "Restoring session event subscription with partyId: $eventPartyId")
|
||||
startSessionEventSubscription(eventPartyId)
|
||||
// Monitor gRPC reconnection events and restart streams
|
||||
// IMPORTANT: Don't use callback pattern - use event Flow for better reliability
|
||||
repositoryScope.launch {
|
||||
grpcConnectionEvents
|
||||
.filter { it is GrpcConnectionEvent.Reconnected }
|
||||
.collect {
|
||||
android.util.Log.d("TssRepository", "gRPC reconnected, restarting streams via StreamManager...")
|
||||
// StreamManager will re-initiate RPC calls (not "restore" closed Flows)
|
||||
streamManager.restartAllStreams()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -362,6 +374,18 @@ class TssRepository @Inject constructor(
|
|||
grpcClient.connect(host, port)
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup network state monitoring for reliable reconnection
|
||||
*
|
||||
* This should be called once during app initialization (e.g., in MainActivity.onCreate)
|
||||
* to enable immediate reconnection when network becomes available.
|
||||
*
|
||||
* @param context Application or Activity context
|
||||
*/
|
||||
fun setupNetworkMonitoring(context: android.content.Context) {
|
||||
grpcClient.setupNetworkMonitoring(context)
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from the server
|
||||
*/
|
||||
|
|
@ -483,9 +507,10 @@ class TssRepository @Inject constructor(
|
|||
currentSessionEventPartyId = effectivePartyId
|
||||
android.util.Log.d("TssRepository", "Starting session event subscription for partyId: $effectivePartyId (device partyId: $devicePartyId)")
|
||||
|
||||
// 使用 JobManager 启动(自动取消同名旧 Job)
|
||||
jobManager.launch(JOB_SESSION_EVENT) {
|
||||
grpcClient.subscribeSessionEvents(effectivePartyId).collect { event ->
|
||||
// Use StreamManager for reliable stream management with auto-reconnection
|
||||
streamManager.startEventStream(
|
||||
partyId = effectivePartyId,
|
||||
onEvent = { event ->
|
||||
android.util.Log.d("TssRepository", "=== Session event received ===")
|
||||
android.util.Log.d("TssRepository", " eventType: ${event.eventType}")
|
||||
android.util.Log.d("TssRepository", " sessionId: ${event.sessionId}")
|
||||
|
|
@ -570,8 +595,12 @@ class TssRepository @Inject constructor(
|
|||
android.util.Log.d("TssRepository", " Reason: sessionId mismatch (event: ${event.sessionId}, active: ${activeSession.sessionId})")
|
||||
}
|
||||
}
|
||||
},
|
||||
onError = { error ->
|
||||
android.util.Log.e("TssRepository", "Event stream error: ${error.message}")
|
||||
// StreamManager will automatically retry
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -585,8 +614,8 @@ class TssRepository @Inject constructor(
|
|||
* partyId from keygen (shareEntity.partyId).
|
||||
*/
|
||||
private fun ensureSessionEventSubscriptionActive(signingPartyId: String? = null) {
|
||||
// Check if the session event job is still active using JobManager
|
||||
val isActive = jobManager.isActive(JOB_SESSION_EVENT)
|
||||
// Check if the event stream is still active using StreamManager
|
||||
val isActive = streamManager.isEventStreamActive()
|
||||
val devicePartyId = requirePartyId() // Ensure partyId is initialized
|
||||
val effectivePartyId = signingPartyId ?: currentSessionEventPartyId ?: devicePartyId
|
||||
android.util.Log.d("TssRepository", "Checking session event subscription: isActive=$isActive, effectivePartyId=$effectivePartyId")
|
||||
|
|
@ -595,16 +624,14 @@ class TssRepository @Inject constructor(
|
|||
android.util.Log.w("TssRepository", "Session event subscription is not active, restarting...")
|
||||
startSessionEventSubscription(signingPartyId)
|
||||
} else {
|
||||
// Even if the job is "active", the gRPC stream may have silently disconnected
|
||||
// Force a restart to ensure we have a fresh connection
|
||||
// Also restart if we need to switch to a different partyId for signing
|
||||
// Check if we need to switch to a different partyId for signing
|
||||
val needsRestart = signingPartyId != null && signingPartyId != currentSessionEventPartyId
|
||||
if (needsRestart) {
|
||||
android.util.Log.d("TssRepository", "Switching session event subscription to signingPartyId: $signingPartyId")
|
||||
startSessionEventSubscription(signingPartyId)
|
||||
} else {
|
||||
android.util.Log.d("TssRepository", "Refreshing session event subscription to ensure fresh connection")
|
||||
android.util.Log.d("TssRepository", "Event stream is active with correct partyId, no restart needed")
|
||||
}
|
||||
startSessionEventSubscription(signingPartyId)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2038,40 +2065,50 @@ class TssRepository @Inject constructor(
|
|||
// Save for reconnection recovery
|
||||
currentMessageRoutingPartyId = effectivePartyId
|
||||
|
||||
android.util.Log.d("TssRepository", "Starting message routing: sessionId=$sessionId, routingPartyId=$effectivePartyId")
|
||||
|
||||
// Part 1: Collect outgoing messages from TSS and route via gRPC
|
||||
// This doesn't need StreamManager - it's a local Flow collection
|
||||
jobManager.launch(JOB_MESSAGE_COLLECTION) {
|
||||
android.util.Log.d("TssRepository", "Starting message routing: sessionId=$sessionId, routingPartyId=$effectivePartyId")
|
||||
|
||||
// Collect outgoing messages from TSS and route via gRPC
|
||||
launch {
|
||||
tssNativeBridge.outgoingMessages.collect { message ->
|
||||
val payload = Base64.decode(message.payload, Base64.NO_WRAP)
|
||||
grpcClient.routeMessage(
|
||||
sessionId = sessionId,
|
||||
fromParty = effectivePartyId, // Use the correct partyId for routing
|
||||
toParties = message.toParties ?: emptyList(),
|
||||
roundNumber = 0,
|
||||
messageType = if (message.isBroadcast) "broadcast" else "p2p",
|
||||
payload = payload
|
||||
)
|
||||
}
|
||||
tssNativeBridge.outgoingMessages.collect { message ->
|
||||
val payload = Base64.decode(message.payload, Base64.NO_WRAP)
|
||||
grpcClient.routeMessage(
|
||||
sessionId = sessionId,
|
||||
fromParty = effectivePartyId, // Use the correct partyId for routing
|
||||
toParties = message.toParties ?: emptyList(),
|
||||
roundNumber = 0,
|
||||
messageType = if (message.isBroadcast) "broadcast" else "p2p",
|
||||
payload = payload
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Collect incoming messages from gRPC and send to TSS
|
||||
launch {
|
||||
grpcClient.subscribeMessages(sessionId, effectivePartyId).collect { message ->
|
||||
// Find party index from party ID
|
||||
val session = _currentSession.value
|
||||
val fromPartyIndex = session?.participants?.find { it.partyId == message.fromParty }?.partyIndex
|
||||
?: return@collect
|
||||
// Part 2: Subscribe to incoming messages from gRPC and send to TSS
|
||||
// Use StreamManager for reliable gRPC stream management with auto-reconnection
|
||||
streamManager.startMessageStream(
|
||||
sessionId = sessionId,
|
||||
partyId = effectivePartyId,
|
||||
partyIndex = partyIndex,
|
||||
onMessage = { message ->
|
||||
// Find party index from party ID
|
||||
val session = _currentSession.value
|
||||
val fromPartyIndex = session?.participants?.find { it.partyId == message.fromParty }?.partyIndex
|
||||
|
||||
if (fromPartyIndex != null) {
|
||||
tssNativeBridge.sendIncomingMessage(
|
||||
fromPartyIndex = fromPartyIndex,
|
||||
isBroadcast = message.isBroadcast,
|
||||
payload = message.payload
|
||||
)
|
||||
} else {
|
||||
android.util.Log.w("TssRepository", "Unknown fromParty: ${message.fromParty}, skipping message")
|
||||
}
|
||||
},
|
||||
onError = { error ->
|
||||
android.util.Log.e("TssRepository", "Message stream error: ${error.message}")
|
||||
// StreamManager will automatically retry
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -231,6 +231,18 @@ class MainViewModel @Inject constructor(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup network state monitoring for reliable reconnection
|
||||
*
|
||||
* This enables immediate reconnection when network becomes available
|
||||
* instead of waiting for DNS resolution backoff (up to 60 seconds).
|
||||
*
|
||||
* Should be called once during app initialization.
|
||||
*/
|
||||
fun setupNetworkMonitoring(context: android.content.Context) {
|
||||
repository.setupNetworkMonitoring(context)
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to Message Router server
|
||||
*/
|
||||
|
|
|
|||
Loading…
Reference in New Issue