feat(android): strengthen gRPC connection reliability

Major improvements to Android gRPC client:
- Add automatic reconnection with exponential backoff (1s to 30s)
- Add heartbeat mechanism with failure detection (30s interval, 3 failures trigger reconnect)
- Add stream version tracking to filter stale callbacks
- Add channel state monitoring (every 5s)
- Add per-call deadline instead of one-time deadline for stubs
- Add SharedFlow for connection events (Connected, Disconnected, Reconnecting, Reconnected, PendingMessages)
- Add callback exception handling for robustness
- Add stream recovery after reconnection via callback mechanism

TssRepository changes:
- Save message routing params for recovery after reconnect
- Expose grpcConnectionEvents SharedFlow for UI notifications
- Auto-restore event subscriptions after reconnection

Other changes:
- Add QR code to Electron Create page for mobile scanning
- Auto version increment from version.properties
- SettingsScreen shows BuildConfig version info
- CreateWalletScreen tracks hasEnteredSession state

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-01 06:44:42 -08:00
parent a3ee831193
commit 444b720f8d
10 changed files with 864 additions and 68 deletions

View File

@ -94,3 +94,6 @@ Thumbs.db
# Signing configs - don't commit
signing.properties
keystore.properties
# Auto-generated version file
app/version.properties

View File

@ -1,3 +1,5 @@
import java.util.Properties
plugins {
id("com.android.application")
id("org.jetbrains.kotlin.android")
@ -6,6 +8,19 @@ plugins {
kotlin("kapt")
}
// Auto-increment version code from file
val versionFile = file("version.properties")
val versionProps = Properties()
if (versionFile.exists()) {
versionProps.load(versionFile.inputStream())
}
val autoVersionCode = (versionProps.getProperty("VERSION_CODE")?.toIntOrNull() ?: 0) + 1
val autoVersionName = "1.0.${autoVersionCode}"
// Save new version code
versionProps.setProperty("VERSION_CODE", autoVersionCode.toString())
versionFile.outputStream().use { versionProps.store(it, "Auto-generated version properties") }
android {
namespace = "com.durian.tssparty"
compileSdk = 34
@ -14,8 +29,8 @@ android {
applicationId = "com.durian.tssparty"
minSdk = 26
targetSdk = 34
versionCode = 1
versionName = "1.0.0"
versionCode = autoVersionCode
versionName = autoVersionName
testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"

View File

@ -67,6 +67,7 @@ fun TssPartyApp(
val sessionParticipants by viewModel.sessionParticipants.collectAsState()
val currentRound by viewModel.currentRound.collectAsState()
val publicKey by viewModel.publicKey.collectAsState()
val hasEnteredSession by viewModel.hasEnteredSession.collectAsState()
// Transfer state
val preparedTx by viewModel.preparedTx.collectAsState()
@ -232,6 +233,7 @@ fun TssPartyApp(
inviteCode = createdInviteCode,
sessionId = currentSessionId,
sessionStatus = sessionStatus,
hasEnteredSession = hasEnteredSession,
participants = sessionParticipants,
currentRound = currentRound,
totalRounds = 9,

View File

@ -1,64 +1,563 @@
package com.durian.tssparty.data.remote
import android.util.Base64
import android.util.Log
import com.durian.tssparty.domain.model.Participant
import com.durian.tssparty.grpc.*
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import io.grpc.ConnectivityState
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.withContext
import kotlinx.coroutines.flow.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.math.min
import kotlin.math.pow
/**
* Connection state for gRPC client
*/
sealed class GrpcConnectionState {
object Disconnected : GrpcConnectionState()
object Connecting : GrpcConnectionState()
object Connected : GrpcConnectionState()
data class Reconnecting(val attempt: Int, val maxAttempts: Int) : GrpcConnectionState()
data class Failed(val reason: String) : GrpcConnectionState()
}
/**
* Connection events for UI notification
*/
sealed class GrpcConnectionEvent {
data class Disconnected(val reason: String) : GrpcConnectionEvent()
object Reconnected : GrpcConnectionEvent()
data class ReconnectFailed(val reason: String) : GrpcConnectionEvent()
data class PendingMessages(val count: Int) : GrpcConnectionEvent()
}
/**
* Reconnect configuration
*/
data class ReconnectConfig(
val maxRetries: Int = 10,
val initialDelayMs: Long = 1000,
val maxDelayMs: Long = 30000,
val backoffMultiplier: Double = 2.0
)
/**
* gRPC client for Message Router service
*
* Features:
* - Automatic reconnection with exponential backoff
* - Heartbeat mechanism with failure detection
* - Stream auto-recovery on disconnect
* - Connection state notifications via StateFlow
*/
@Singleton
class GrpcClient @Inject constructor() {
companion object {
private const val TAG = "GrpcClient"
private const val HEARTBEAT_INTERVAL_MS = 30000L
private const val MAX_HEARTBEAT_FAILS = 3
private const val CONNECTION_TIMEOUT_SECONDS = 10L
private const val REQUEST_TIMEOUT_SECONDS = 30L
private const val MAX_REQUEST_RETRIES = 3
private const val REQUEST_RETRY_DELAY_MS = 500L
}
private var channel: ManagedChannel? = null
private var stub: MessageRouterGrpc.MessageRouterBlockingStub? = null
private var asyncStub: MessageRouterGrpc.MessageRouterStub? = null
// Connection state
private val _connectionState = MutableStateFlow<GrpcConnectionState>(GrpcConnectionState.Disconnected)
val connectionState: StateFlow<GrpcConnectionState> = _connectionState.asStateFlow()
// Connection events (for UI notifications)
private val _connectionEvents = MutableSharedFlow<GrpcConnectionEvent>(extraBufferCapacity = 10)
val connectionEvents: SharedFlow<GrpcConnectionEvent> = _connectionEvents.asSharedFlow()
// Reconnection state
private val reconnectConfig = ReconnectConfig()
private var currentHost: String? = null
private var currentPort: Int? = null
private val isReconnecting = AtomicBoolean(false)
private val reconnectAttempts = AtomicInteger(0)
private val shouldReconnect = AtomicBoolean(true)
private var reconnectJob: Job? = null
// Registration state (for re-registration after reconnect)
private var registeredPartyId: String? = null
private var registeredPartyRole: String? = null
// Heartbeat state
private var heartbeatJob: Job? = null
private val heartbeatFailCount = AtomicInteger(0)
// Stream subscriptions (for recovery after reconnect)
private var activeMessageSubscription: MessageSubscription? = null
private var eventStreamSubscribed = AtomicBoolean(false)
private var eventStreamPartyId: String? = null
// Stream version tracking (to detect stale stream events)
private val messageStreamVersion = AtomicInteger(0)
private val eventStreamVersion = AtomicInteger(0)
// Reconnection callback - called when streams need to be re-established
private var onReconnectedCallback: (() -> Unit)? = null
// Channel state monitoring
private var channelStateMonitorJob: Job? = null
// Coroutine scope for background tasks
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
/**
* Connect to the Message Router server
*/
fun connect(host: String, port: Int) {
disconnect()
// Save connection params for reconnection
currentHost = host
currentPort = port
shouldReconnect.set(true)
reconnectAttempts.set(0)
val builder = ManagedChannelBuilder
.forAddress(host, port)
.keepAliveTime(30, TimeUnit.SECONDS)
.keepAliveTimeout(10, TimeUnit.SECONDS)
doConnect(host, port)
}
// Use TLS for port 443, plaintext for other ports (like local development)
if (port == 443) {
builder.useTransportSecurity()
} else {
builder.usePlaintext()
private fun doConnect(host: String, port: Int) {
Log.d(TAG, "Connecting to $host:$port")
_connectionState.value = GrpcConnectionState.Connecting
try {
// Disconnect existing connection
cleanupConnection()
val builder = ManagedChannelBuilder
.forAddress(host, port)
.keepAliveTime(30, TimeUnit.SECONDS)
.keepAliveTimeout(10, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
.idleTimeout(5, TimeUnit.MINUTES)
// Use TLS for port 443, plaintext for other ports (like local development)
if (port == 443) {
builder.useTransportSecurity()
} else {
builder.usePlaintext()
}
channel = builder.build()
// Create stubs (deadline set per-call via getStubWithDeadline())
stub = MessageRouterGrpc.newBlockingStub(channel)
asyncStub = MessageRouterGrpc.newStub(channel)
// Wait for connection to be ready
scope.launch {
waitForConnection()
}
} catch (e: Exception) {
Log.e(TAG, "Connection failed: ${e.message}")
_connectionState.value = GrpcConnectionState.Failed(e.message ?: "Unknown error")
triggerReconnect("Connection failed: ${e.message}")
}
}
channel = builder.build()
private suspend fun waitForConnection() {
val ch = channel ?: return
stub = MessageRouterGrpc.newBlockingStub(channel)
asyncStub = MessageRouterGrpc.newStub(channel)
try {
withTimeout(CONNECTION_TIMEOUT_SECONDS * 1000) {
while (true) {
val state = ch.getState(true)
Log.d(TAG, "Channel state: $state")
when (state) {
ConnectivityState.READY -> {
Log.d(TAG, "Connected successfully")
_connectionState.value = GrpcConnectionState.Connected
reconnectAttempts.set(0)
heartbeatFailCount.set(0)
isReconnecting.set(false)
// Start channel state monitoring
startChannelStateMonitor()
// Re-register if we were registered before
reRegisterIfNeeded()
// Restart heartbeat
startHeartbeat()
// Re-subscribe to streams
reSubscribeStreams()
return@withTimeout
}
ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.SHUTDOWN -> {
throw Exception("Connection failed with state: $state")
}
else -> {
delay(100)
}
}
}
}
} catch (e: Exception) {
Log.e(TAG, "Wait for connection failed: ${e.message}")
_connectionState.value = GrpcConnectionState.Failed(e.message ?: "Connection timeout")
triggerReconnect("Connection timeout")
}
}
/**
* Disconnect from the server
* Disconnect from the server (won't auto-reconnect)
*/
fun disconnect() {
channel?.shutdown()
Log.d(TAG, "Disconnecting")
shouldReconnect.set(false)
cleanupConnection()
_connectionState.value = GrpcConnectionState.Disconnected
}
/**
* Cleanup all connection resources
*/
private fun cleanupConnection() {
// Cancel reconnect job
reconnectJob?.cancel()
reconnectJob = null
// Stop channel state monitor
channelStateMonitorJob?.cancel()
channelStateMonitorJob = null
// Stop heartbeat
stopHeartbeat()
// Increment stream versions to invalidate old stream callbacks
messageStreamVersion.incrementAndGet()
eventStreamVersion.incrementAndGet()
// Shutdown channel
channel?.let { ch ->
try {
ch.shutdown()
val terminated = ch.awaitTermination(2, TimeUnit.SECONDS)
if (!terminated) {
ch.shutdownNow()
}
} catch (e: Exception) {
Log.e(TAG, "Error shutting down channel: ${e.message}")
}
Unit
}
channel = null
stub = null
asyncStub = null
}
/**
* Start monitoring channel state for disconnections
*/
private fun startChannelStateMonitor() {
channelStateMonitorJob?.cancel()
channelStateMonitorJob = scope.launch {
val ch = channel ?: return@launch
var lastState = ConnectivityState.READY
while (isActive && _connectionState.value == GrpcConnectionState.Connected) {
delay(5000) // Check every 5 seconds
try {
val currentState = ch.getState(false)
if (currentState != lastState) {
Log.d(TAG, "Channel state changed: $lastState -> $currentState")
lastState = currentState
when (currentState) {
ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.SHUTDOWN -> {
Log.w(TAG, "Channel entered failure state: $currentState")
triggerReconnect("Channel state: $currentState")
}
ConnectivityState.IDLE -> {
// Try to reconnect if idle
Log.d(TAG, "Channel idle, requesting connection")
ch.getState(true) // Request connection
}
else -> { /* READY, CONNECTING - OK */ }
}
}
} catch (e: Exception) {
Log.e(TAG, "Error checking channel state: ${e.message}")
}
}
}
}
/**
* Trigger reconnection with exponential backoff
*/
private fun triggerReconnect(reason: String) {
if (!shouldReconnect.get() || isReconnecting.getAndSet(true)) {
return
}
val host = currentHost
val port = currentPort
if (host == null || port == null) {
isReconnecting.set(false)
return
}
Log.d(TAG, "Triggering reconnect: $reason")
// Emit disconnected event
_connectionEvents.tryEmit(GrpcConnectionEvent.Disconnected(reason))
reconnectJob?.cancel()
reconnectJob = scope.launch {
val attempt = reconnectAttempts.incrementAndGet()
if (attempt > reconnectConfig.maxRetries) {
Log.e(TAG, "Max reconnect attempts reached")
_connectionState.value = GrpcConnectionState.Failed("Max retries exceeded")
_connectionEvents.tryEmit(GrpcConnectionEvent.ReconnectFailed("Max retries exceeded"))
isReconnecting.set(false)
return@launch
}
_connectionState.value = GrpcConnectionState.Reconnecting(attempt, reconnectConfig.maxRetries)
// Calculate delay with exponential backoff
val delay = min(
(reconnectConfig.initialDelayMs * reconnectConfig.backoffMultiplier.pow(attempt - 1.0)).toLong(),
reconnectConfig.maxDelayMs
)
Log.d(TAG, "Reconnecting in ${delay}ms (attempt $attempt/${reconnectConfig.maxRetries})")
delay(delay)
isReconnecting.set(false)
doConnect(host, port)
}
}
/**
* Start heartbeat mechanism
*/
private fun startHeartbeat() {
stopHeartbeat()
val partyId = registeredPartyId ?: return
heartbeatJob = scope.launch {
while (isActive && _connectionState.value == GrpcConnectionState.Connected) {
delay(HEARTBEAT_INTERVAL_MS)
try {
val request = HeartbeatRequest.newBuilder()
.setPartyId(partyId)
.setTimestamp(System.currentTimeMillis())
.build()
val response = withContext(Dispatchers.IO) {
getStubWithDeadline()?.heartbeat(request)
}
if (response != null) {
heartbeatFailCount.set(0)
val pending = response.pendingMessages
Log.d(TAG, "Heartbeat OK, pending messages: $pending")
// Notify if there are pending messages (may have missed events)
if (pending > 0) {
Log.w(TAG, "Has $pending pending messages - may have missed events")
_connectionEvents.tryEmit(GrpcConnectionEvent.PendingMessages(pending))
}
} else {
handleHeartbeatFailure("No response")
}
} catch (e: Exception) {
handleHeartbeatFailure(e.message ?: "Unknown error")
}
}
}
}
private fun handleHeartbeatFailure(reason: String) {
val fails = heartbeatFailCount.incrementAndGet()
Log.w(TAG, "Heartbeat failed ($fails/$MAX_HEARTBEAT_FAILS): $reason")
if (fails >= MAX_HEARTBEAT_FAILS) {
Log.e(TAG, "Too many heartbeat failures, triggering reconnect")
triggerReconnect("Heartbeat failed")
}
}
private fun stopHeartbeat() {
heartbeatJob?.cancel()
heartbeatJob = null
heartbeatFailCount.set(0)
}
/**
* Re-register party after reconnection
*/
private suspend fun reRegisterIfNeeded() {
val partyId = registeredPartyId
val role = registeredPartyRole
if (partyId != null && role != null) {
Log.d(TAG, "Re-registering party: $partyId")
try {
registerPartyInternal(partyId, role, "1.0.0")
} catch (e: Exception) {
Log.e(TAG, "Re-registration failed: ${e.message}")
}
}
}
/**
* Wait for channel to be ready (with timeout)
*/
private suspend fun waitForChannelReady(timeoutMs: Long = 2000): Boolean {
val ch = channel ?: return false
val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < timeoutMs) {
if (ch.getState(false) == ConnectivityState.READY) {
return true
}
delay(50)
}
return false
}
/**
* Re-subscribe to streams after reconnection
* Notifies the repository layer to re-establish message/event subscriptions
*/
private fun reSubscribeStreams() {
val needsResubscribe = eventStreamSubscribed.get() || activeMessageSubscription != null
if (needsResubscribe) {
Log.d(TAG, "Triggering stream re-subscription callback")
Log.d(TAG, " - Event stream: ${eventStreamSubscribed.get()}, partyId: $eventStreamPartyId")
Log.d(TAG, " - Message stream: ${activeMessageSubscription?.sessionId}")
// Notify repository to re-establish streams
scope.launch {
// Wait for channel to be fully ready instead of fixed delay
if (waitForChannelReady()) {
try {
onReconnectedCallback?.invoke()
// Emit reconnected event
_connectionEvents.tryEmit(GrpcConnectionEvent.Reconnected)
} catch (e: Exception) {
Log.e(TAG, "Reconnect callback failed: ${e.message}")
// Don't let callback failure affect the connection state
}
} else {
Log.w(TAG, "Channel not ready for stream re-subscription, skipping")
}
}
} else {
// No streams to restore, still emit reconnected event
_connectionEvents.tryEmit(GrpcConnectionEvent.Reconnected)
}
}
/**
* Set callback for reconnection events
* Called when connection is restored and streams need to be re-established
*/
fun setOnReconnectedCallback(callback: () -> Unit) {
onReconnectedCallback = callback
}
/**
* Get active message subscription info (for recovery)
*/
fun getActiveMessageSubscription(): Pair<String, String>? {
return activeMessageSubscription?.let { Pair(it.sessionId, it.partyId) }
}
/**
* Check if event stream was subscribed (for recovery)
*/
fun wasEventStreamSubscribed(): Boolean = eventStreamSubscribed.get()
/**
* Get event stream party ID (for recovery)
*/
fun getEventStreamPartyId(): String? = eventStreamPartyId
/**
* Check if connected
*/
fun isConnected(): Boolean = _connectionState.value == GrpcConnectionState.Connected
/**
* Get stub with per-call deadline (fixes timeout issue)
*/
private fun getStubWithDeadline(): MessageRouterGrpc.MessageRouterBlockingStub? {
return stub?.withDeadlineAfter(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)
}
/**
* Execute an RPC call with retry logic for transient failures
*/
private suspend fun <T> withRetry(
operation: String,
maxRetries: Int = MAX_REQUEST_RETRIES,
block: suspend () -> T
): T {
var lastException: Exception? = null
repeat(maxRetries) { attempt ->
try {
return block()
} catch (e: Exception) {
lastException = e
val isRetryable = isRetryableError(e)
if (attempt < maxRetries - 1 && isRetryable) {
val delay = REQUEST_RETRY_DELAY_MS * (attempt + 1)
Log.w(TAG, "$operation failed (attempt ${attempt + 1}/$maxRetries), retrying in ${delay}ms: ${e.message}")
delay(delay)
} else {
Log.e(TAG, "$operation failed after ${attempt + 1} attempts: ${e.message}")
throw e
}
}
}
throw lastException ?: Exception("$operation failed after $maxRetries attempts")
}
/**
* Check if an error is retryable (transient network issues)
*/
private fun isRetryableError(e: Exception): Boolean {
val message = e.message.orEmpty().lowercase()
return message.contains("unavailable") ||
message.contains("deadline exceeded") ||
message.contains("connection") ||
message.contains("timeout") ||
message.contains("reset") ||
message.contains("broken pipe")
}
/**
* Register party with the router
*/
@ -67,16 +566,38 @@ class GrpcClient @Inject constructor() {
partyRole: String = "temporary",
version: String = "1.0.0"
): Result<Boolean> = withContext(Dispatchers.IO) {
try {
val request = RegisterPartyRequest.newBuilder()
.setPartyId(partyId)
.setPartyRole(partyRole)
.setVersion(version)
.build()
// Save for re-registration
registeredPartyId = partyId
registeredPartyRole = partyRole
val response = stub?.registerParty(request)
Result.success(response?.success ?: false)
registerPartyInternal(partyId, partyRole, version)
}
private suspend fun registerPartyInternal(
partyId: String,
partyRole: String,
version: String
): Result<Boolean> = withContext(Dispatchers.IO) {
try {
withRetry("RegisterParty") {
val request = RegisterPartyRequest.newBuilder()
.setPartyId(partyId)
.setPartyRole(partyRole)
.setVersion(version)
.build()
val response = getStubWithDeadline()?.registerParty(request)
if (response?.success == true) {
Log.d(TAG, "Party registered: $partyId")
startHeartbeat()
true
} else {
throw Exception("Registration failed")
}
}
Result.success(true)
} catch (e: Exception) {
Log.e(TAG, "Register party failed: ${e.message}")
Result.failure(e)
}
}
@ -90,21 +611,21 @@ class GrpcClient @Inject constructor() {
joinToken: String
): Result<JoinSessionData> = withContext(Dispatchers.IO) {
try {
// Match Electron behavior: don't send device_info
val request = JoinSessionRequest.newBuilder()
.setSessionId(sessionId)
.setPartyId(partyId)
.setJoinToken(joinToken)
.build()
val data = withRetry("JoinSession") {
// Match Electron behavior: don't send device_info
val request = JoinSessionRequest.newBuilder()
.setSessionId(sessionId)
.setPartyId(partyId)
.setJoinToken(joinToken)
.build()
val response = stub?.joinSession(request)
if (response?.success == true) {
val sessionInfo = response.sessionInfo
val participants = response.otherPartiesList.map { party ->
Participant(party.partyId, party.partyIndex)
}
val response = getStubWithDeadline()?.joinSession(request)
if (response?.success == true) {
val sessionInfo = response.sessionInfo
val participants = response.otherPartiesList.map { party ->
Participant(party.partyId, party.partyIndex)
}
Result.success(
JoinSessionData(
sessionId = sessionInfo.sessionId,
sessionType = sessionInfo.sessionType,
@ -116,11 +637,13 @@ class GrpcClient @Inject constructor() {
else Base64.encodeToString(sessionInfo.messageHash.toByteArray(), Base64.NO_WRAP),
sessionStatus = if (sessionInfo.status.isNullOrEmpty()) null else sessionInfo.status
)
)
} else {
Result.failure(Exception("Failed to join session"))
} else {
throw Exception("Failed to join session")
}
}
Result.success(data)
} catch (e: Exception) {
Log.e(TAG, "Join session failed: ${e.message}")
Result.failure(e)
}
}
@ -138,9 +661,10 @@ class GrpcClient @Inject constructor() {
.setPartyId(partyId)
.build()
val response = stub?.markPartyReady(request)
val response = getStubWithDeadline()?.markPartyReady(request)
Result.success(response?.allReady ?: false)
} catch (e: Exception) {
Log.e(TAG, "Mark party ready failed: ${e.message}")
Result.failure(e)
}
}
@ -157,30 +681,40 @@ class GrpcClient @Inject constructor() {
payload: ByteArray
): Result<String> = withContext(Dispatchers.IO) {
try {
val request = RouteMessageRequest.newBuilder()
.setSessionId(sessionId)
.setFromParty(fromParty)
.addAllToParties(toParties)
.setRoundNumber(roundNumber)
.setMessageType(messageType)
.setPayload(com.google.protobuf.ByteString.copyFrom(payload))
.build()
val messageId = withRetry("RouteMessage") {
val request = RouteMessageRequest.newBuilder()
.setSessionId(sessionId)
.setFromParty(fromParty)
.addAllToParties(toParties)
.setRoundNumber(roundNumber)
.setMessageType(messageType)
.setPayload(com.google.protobuf.ByteString.copyFrom(payload))
.build()
val response = stub?.routeMessage(request)
if (response?.success == true) {
Result.success(response.messageId)
} else {
Result.failure(Exception("Failed to route message"))
val response = getStubWithDeadline()?.routeMessage(request)
if (response?.success == true) {
response.messageId
} else {
throw Exception("Failed to route message")
}
}
Result.success(messageId)
} catch (e: Exception) {
Log.e(TAG, "Route message failed: ${e.message}")
Result.failure(e)
}
}
/**
* Subscribe to messages for a party
* Subscribe to messages for a party (with auto-recovery)
*/
fun subscribeMessages(sessionId: String, partyId: String): Flow<IncomingMessage> = callbackFlow {
// Save subscription for recovery
activeMessageSubscription = MessageSubscription(sessionId, partyId)
// Capture current stream version to detect stale callbacks
val streamVersion = messageStreamVersion.incrementAndGet()
val request = SubscribeMessagesRequest.newBuilder()
.setSessionId(sessionId)
.setPartyId(partyId)
@ -188,6 +722,12 @@ class GrpcClient @Inject constructor() {
val observer = object : StreamObserver<MPCMessage> {
override fun onNext(message: MPCMessage) {
// Ignore events from stale streams
if (messageStreamVersion.get() != streamVersion) {
Log.d(TAG, "Ignoring message from stale stream (v$streamVersion, current v${messageStreamVersion.get()})")
return
}
val incoming = IncomingMessage(
messageId = message.messageId,
fromParty = message.fromParty,
@ -199,29 +739,71 @@ class GrpcClient @Inject constructor() {
}
override fun onError(t: Throwable) {
Log.e(TAG, "Message stream error: ${t.message}")
// Ignore events from stale streams
if (messageStreamVersion.get() != streamVersion) {
Log.d(TAG, "Ignoring error from stale message stream")
close(t)
return
}
// Don't trigger reconnect for CANCELLED errors
if (!t.message.orEmpty().contains("CANCELLED")) {
triggerReconnect("Message stream error: ${t.message}")
}
close(t)
}
override fun onCompleted() {
Log.d(TAG, "Message stream completed")
// Ignore events from stale streams
if (messageStreamVersion.get() != streamVersion) {
Log.d(TAG, "Ignoring completion from stale message stream")
close()
return
}
// Stream ended unexpectedly - trigger reconnect if we should still be subscribed
if (activeMessageSubscription != null && shouldReconnect.get()) {
Log.w(TAG, "Message stream ended unexpectedly, triggering reconnect")
triggerReconnect("Message stream ended")
}
close()
}
}
asyncStub?.subscribeMessages(request, observer)
awaitClose { }
awaitClose {
activeMessageSubscription = null
}
}
/**
* Subscribe to session events
* Subscribe to session events (with auto-recovery)
*/
fun subscribeSessionEvents(partyId: String): Flow<SessionEventData> = callbackFlow {
// Save subscription for recovery
eventStreamSubscribed.set(true)
eventStreamPartyId = partyId
// Capture current stream version to detect stale callbacks
val streamVersion = eventStreamVersion.incrementAndGet()
val request = SubscribeSessionEventsRequest.newBuilder()
.setPartyId(partyId)
.build()
val observer = object : StreamObserver<SessionEvent> {
override fun onNext(event: SessionEvent) {
// Ignore events from stale streams
if (eventStreamVersion.get() != streamVersion) {
Log.d(TAG, "Ignoring event from stale stream (v$streamVersion, current v${eventStreamVersion.get()})")
return
}
val eventData = SessionEventData(
eventId = event.eventId,
eventType = event.eventType,
@ -237,17 +819,62 @@ class GrpcClient @Inject constructor() {
}
override fun onError(t: Throwable) {
Log.e(TAG, "Session event stream error: ${t.message}")
// Ignore events from stale streams
if (eventStreamVersion.get() != streamVersion) {
Log.d(TAG, "Ignoring error from stale event stream")
close(t)
return
}
// Don't trigger reconnect for CANCELLED errors
if (!t.message.orEmpty().contains("CANCELLED")) {
triggerReconnect("Event stream error: ${t.message}")
}
close(t)
}
override fun onCompleted() {
Log.d(TAG, "Session event stream completed")
// Ignore events from stale streams
if (eventStreamVersion.get() != streamVersion) {
Log.d(TAG, "Ignoring completion from stale event stream")
close()
return
}
// Stream ended unexpectedly - trigger reconnect if we should still be subscribed
if (eventStreamSubscribed.get() && shouldReconnect.get()) {
Log.w(TAG, "Event stream ended unexpectedly, triggering reconnect")
triggerReconnect("Event stream ended")
}
close()
}
}
asyncStub?.subscribeSessionEvents(request, observer)
awaitClose { }
awaitClose {
eventStreamSubscribed.set(false)
eventStreamPartyId = null
}
}
/**
* Unsubscribe from session events
*/
fun unsubscribeSessionEvents() {
eventStreamSubscribed.set(false)
eventStreamPartyId = null
}
/**
* Unsubscribe from messages
*/
fun unsubscribeMessages() {
activeMessageSubscription = null
}
/**
@ -271,15 +898,16 @@ class GrpcClient @Inject constructor() {
builder.setSignature(com.google.protobuf.ByteString.copyFrom(it))
}
val response = stub?.reportCompletion(builder.build())
val response = getStubWithDeadline()?.reportCompletion(builder.build())
Result.success(response?.allCompleted ?: false)
} catch (e: Exception) {
Log.e(TAG, "Report completion failed: ${e.message}")
Result.failure(e)
}
}
/**
* Send heartbeat
* Send heartbeat (manual call for testing)
*/
suspend fun heartbeat(partyId: String): Result<Int> = withContext(Dispatchers.IO) {
try {
@ -288,14 +916,42 @@ class GrpcClient @Inject constructor() {
.setTimestamp(System.currentTimeMillis())
.build()
val response = stub?.heartbeat(request)
val response = getStubWithDeadline()?.heartbeat(request)
Result.success(response?.pendingMessages ?: 0)
} catch (e: Exception) {
Log.e(TAG, "Heartbeat failed: ${e.message}")
Result.failure(e)
}
}
/**
* Force reconnect (for manual recovery)
*/
fun forceReconnect() {
if (currentHost != null && currentPort != null) {
reconnectAttempts.set(0)
triggerReconnect("Manual reconnect requested")
}
}
/**
* Get current connection parameters
*/
fun getConnectionInfo(): Pair<String, Int>? {
val host = currentHost
val port = currentPort
return if (host != null && port != null) Pair(host, port) else null
}
}
/**
* Message subscription info
*/
private data class MessageSubscription(
val sessionId: String,
val partyId: String
)
/**
* Data class for join session response
*/

View File

@ -5,6 +5,8 @@ import com.durian.tssparty.data.local.ShareRecordDao
import com.durian.tssparty.data.local.ShareRecordEntity
import com.durian.tssparty.data.local.TssNativeBridge
import com.durian.tssparty.data.remote.GrpcClient
import com.durian.tssparty.data.remote.GrpcConnectionEvent
import com.durian.tssparty.data.remote.GrpcConnectionState
import com.durian.tssparty.data.remote.IncomingMessage
import com.durian.tssparty.data.remote.JoinSessionData
import com.durian.tssparty.data.remote.SessionEventData
@ -34,13 +36,51 @@ class TssRepository @Inject constructor(
private val _sessionStatus = MutableStateFlow<SessionStatus>(SessionStatus.WAITING)
val sessionStatus: StateFlow<SessionStatus> = _sessionStatus.asStateFlow()
// Expose gRPC connection state for UI
val grpcConnectionState: StateFlow<GrpcConnectionState> = grpcClient.connectionState
// Expose gRPC connection events for UI notifications
val grpcConnectionEvents: SharedFlow<GrpcConnectionEvent> = grpcClient.connectionEvents
private var partyId: String = UUID.randomUUID().toString()
private var messageCollectionJob: Job? = null
private var sessionEventJob: Job? = null
// Track current message routing params for reconnection recovery
private var currentMessageRoutingSessionId: String? = null
private var currentMessageRoutingPartyIndex: Int? = null
// Account service URL (configurable via settings)
private var accountServiceUrl: String = "https://rwaapi.szaiai.com"
init {
// Set up reconnection callback to restore streams
grpcClient.setOnReconnectedCallback {
android.util.Log.d("TssRepository", "gRPC reconnected, restoring streams...")
restoreStreamsAfterReconnect()
}
}
/**
* Restore message and event streams after gRPC reconnection
*/
private fun restoreStreamsAfterReconnect() {
val sessionId = currentMessageRoutingSessionId
val partyIndex = currentMessageRoutingPartyIndex
// Restore message routing if we had an active session
if (sessionId != null && partyIndex != null) {
android.util.Log.d("TssRepository", "Restoring message routing for session: $sessionId")
startMessageRouting(sessionId, partyIndex)
}
// Restore session event subscription
if (grpcClient.wasEventStreamSubscribed()) {
android.util.Log.d("TssRepository", "Restoring session event subscription")
startSessionEventSubscription()
}
}
// HTTP client for API calls
private val httpClient = okhttp3.OkHttpClient.Builder()
.connectTimeout(30, java.util.concurrent.TimeUnit.SECONDS)
@ -70,6 +110,23 @@ class TssRepository @Inject constructor(
grpcClient.disconnect()
}
/**
* Check if gRPC is connected
*/
fun isGrpcConnected(): Boolean = grpcClient.isConnected()
/**
* Force reconnect to gRPC server
*/
fun forceReconnect() {
grpcClient.forceReconnect()
}
/**
* Get current gRPC connection info
*/
fun getGrpcConnectionInfo(): Pair<String, Int>? = grpcClient.getConnectionInfo()
// Session event callback (set by ViewModel)
private var sessionEventCallback: ((SessionEventData) -> Unit)? = null
@ -1096,6 +1153,10 @@ class TssRepository @Inject constructor(
* Start message routing between TSS and gRPC
*/
private fun startMessageRouting(sessionId: String, partyIndex: Int) {
// Save params for reconnection recovery
currentMessageRoutingSessionId = sessionId
currentMessageRoutingPartyIndex = partyIndex
messageCollectionJob?.cancel()
messageCollectionJob = CoroutineScope(Dispatchers.IO).launch {
// Collect outgoing messages from TSS and route via gRPC
@ -1219,6 +1280,10 @@ class TssRepository @Inject constructor(
sessionEventJob?.cancel()
_currentSession.value = null
_sessionStatus.value = SessionStatus.WAITING
// Clear reconnection recovery params
currentMessageRoutingSessionId = null
currentMessageRoutingPartyIndex = null
}
/**

View File

@ -44,6 +44,7 @@ fun CreateWalletScreen(
inviteCode: String?,
sessionId: String?,
sessionStatus: SessionStatus,
hasEnteredSession: Boolean = false,
participants: List<String> = emptyList(),
currentRound: Int = 0,
totalRounds: Int = 9,
@ -61,8 +62,10 @@ fun CreateWalletScreen(
var validationError by remember { mutableStateOf<String?>(null) }
// Determine current step based on state
// Show session screen if user clicked "进入会话" OR if keygen is in progress/completed/failed
val step = when {
sessionStatus == SessionStatus.IN_PROGRESS || sessionStatus == SessionStatus.COMPLETED || sessionStatus == SessionStatus.FAILED -> "session"
hasEnteredSession && inviteCode != null -> "session"
inviteCode != null -> "created"
isLoading -> "creating"
else -> "config"

View File

@ -11,6 +11,7 @@ import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.text.font.FontWeight
import androidx.compose.ui.unit.dp
import com.durian.tssparty.BuildConfig
import com.durian.tssparty.domain.model.AppSettings
import com.durian.tssparty.domain.model.NetworkType
@ -479,7 +480,11 @@ fun SettingsScreen(
) {
AboutRow("应用名称", "TSS Party")
Divider(modifier = Modifier.padding(vertical = 8.dp))
AboutRow("版本", "1.0.0")
AboutRow("版本", BuildConfig.VERSION_NAME)
Divider(modifier = Modifier.padding(vertical = 8.dp))
AboutRow("版本号", BuildConfig.VERSION_CODE.toString())
Divider(modifier = Modifier.padding(vertical = 8.dp))
AboutRow("构建类型", if (BuildConfig.DEBUG) "Debug" else "Release")
Divider(modifier = Modifier.padding(vertical = 8.dp))
AboutRow("TSS 协议", "GG20")
Divider(modifier = Modifier.padding(vertical = 8.dp))

View File

@ -40,6 +40,10 @@ class MainViewModel @Inject constructor(
private val _createdInviteCode = MutableStateFlow<String?>(null)
val createdInviteCode: StateFlow<String?> = _createdInviteCode.asStateFlow()
// Flag to track if user has entered the session (clicked "进入会话")
private val _hasEnteredSession = MutableStateFlow(false)
val hasEnteredSession: StateFlow<Boolean> = _hasEnteredSession.asStateFlow()
init {
// Start initialization on app launch
checkAllServices()
@ -331,9 +335,10 @@ class MainViewModel @Inject constructor(
*/
fun enterSession() {
// Session events are already being listened to via the callback set in init
// This just transitions the UI to the session view
// This transitions the UI to the session view
val sessionId = _currentSessionId.value
android.util.Log.d("MainViewModel", "Entering session: $sessionId")
_hasEnteredSession.value = true
}
/**
@ -383,6 +388,7 @@ class MainViewModel @Inject constructor(
_currentRound.value = 0
_publicKey.value = null
_createdInviteCode.value = null
_hasEnteredSession.value = false
}
// ========== Join Keygen State ==========

View File

@ -318,3 +318,28 @@
.copyButton:hover {
background-color: var(--primary-light);
}
/* QR Code Section */
.qrSection {
display: flex;
flex-direction: column;
align-items: center;
gap: var(--spacing-sm);
padding: var(--spacing-lg);
background-color: var(--background-color);
border-radius: var(--radius-lg);
margin-bottom: var(--spacing-md);
}
.qrCodeWrapper {
padding: var(--spacing-md);
background-color: white;
border-radius: var(--radius-md);
box-shadow: var(--shadow-sm);
}
.qrHint {
font-size: 13px;
color: var(--text-secondary);
margin: 0;
}

View File

@ -1,5 +1,6 @@
import { useState } from 'react';
import { useNavigate } from 'react-router-dom';
import { QRCodeSVG } from 'qrcode.react';
import styles from './Create.module.css';
interface CreateSessionResult {
@ -220,6 +221,21 @@ export default function Create() {
<div className={styles.successIcon}></div>
<h3 className={styles.successTitle}></h3>
{/* QR Code for mobile scanning */}
<div className={styles.qrSection}>
<div className={styles.qrCodeWrapper}>
<QRCodeSVG
value={result.inviteCode || ''}
size={180}
level="M"
includeMargin={true}
bgColor="#ffffff"
fgColor="#000000"
/>
</div>
<p className={styles.qrHint}>使 App </p>
</div>
<div className={styles.inviteSection}>
<label className={styles.label}></label>
<div className={styles.inviteCodeWrapper}>