diff --git a/backend/mpc-system/services/service-party-android/.gitignore b/backend/mpc-system/services/service-party-android/.gitignore index 08067624..38cd5a89 100644 --- a/backend/mpc-system/services/service-party-android/.gitignore +++ b/backend/mpc-system/services/service-party-android/.gitignore @@ -94,3 +94,6 @@ Thumbs.db # Signing configs - don't commit signing.properties keystore.properties + +# Auto-generated version file +app/version.properties diff --git a/backend/mpc-system/services/service-party-android/app/build.gradle.kts b/backend/mpc-system/services/service-party-android/app/build.gradle.kts index 1eabf5dd..2d6dceef 100644 --- a/backend/mpc-system/services/service-party-android/app/build.gradle.kts +++ b/backend/mpc-system/services/service-party-android/app/build.gradle.kts @@ -1,3 +1,5 @@ +import java.util.Properties + plugins { id("com.android.application") id("org.jetbrains.kotlin.android") @@ -6,6 +8,19 @@ plugins { kotlin("kapt") } +// Auto-increment version code from file +val versionFile = file("version.properties") +val versionProps = Properties() +if (versionFile.exists()) { + versionProps.load(versionFile.inputStream()) +} +val autoVersionCode = (versionProps.getProperty("VERSION_CODE")?.toIntOrNull() ?: 0) + 1 +val autoVersionName = "1.0.${autoVersionCode}" + +// Save new version code +versionProps.setProperty("VERSION_CODE", autoVersionCode.toString()) +versionFile.outputStream().use { versionProps.store(it, "Auto-generated version properties") } + android { namespace = "com.durian.tssparty" compileSdk = 34 @@ -14,8 +29,8 @@ android { applicationId = "com.durian.tssparty" minSdk = 26 targetSdk = 34 - versionCode = 1 - versionName = "1.0.0" + versionCode = autoVersionCode + versionName = autoVersionName testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner" diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/MainActivity.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/MainActivity.kt index 20717a10..b41f067a 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/MainActivity.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/MainActivity.kt @@ -67,6 +67,7 @@ fun TssPartyApp( val sessionParticipants by viewModel.sessionParticipants.collectAsState() val currentRound by viewModel.currentRound.collectAsState() val publicKey by viewModel.publicKey.collectAsState() + val hasEnteredSession by viewModel.hasEnteredSession.collectAsState() // Transfer state val preparedTx by viewModel.preparedTx.collectAsState() @@ -232,6 +233,7 @@ fun TssPartyApp( inviteCode = createdInviteCode, sessionId = currentSessionId, sessionStatus = sessionStatus, + hasEnteredSession = hasEnteredSession, participants = sessionParticipants, currentRound = currentRound, totalRounds = 9, diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt index 5a740fdc..daa75d21 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt @@ -1,64 +1,563 @@ package com.durian.tssparty.data.remote import android.util.Base64 +import android.util.Log import com.durian.tssparty.domain.model.Participant import com.durian.tssparty.grpc.* import io.grpc.ManagedChannel import io.grpc.ManagedChannelBuilder +import io.grpc.ConnectivityState import io.grpc.stub.StreamObserver -import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.* import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.callbackFlow -import kotlinx.coroutines.withContext +import kotlinx.coroutines.flow.* import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger import javax.inject.Inject import javax.inject.Singleton +import kotlin.math.min +import kotlin.math.pow + +/** + * Connection state for gRPC client + */ +sealed class GrpcConnectionState { + object Disconnected : GrpcConnectionState() + object Connecting : GrpcConnectionState() + object Connected : GrpcConnectionState() + data class Reconnecting(val attempt: Int, val maxAttempts: Int) : GrpcConnectionState() + data class Failed(val reason: String) : GrpcConnectionState() +} + +/** + * Connection events for UI notification + */ +sealed class GrpcConnectionEvent { + data class Disconnected(val reason: String) : GrpcConnectionEvent() + object Reconnected : GrpcConnectionEvent() + data class ReconnectFailed(val reason: String) : GrpcConnectionEvent() + data class PendingMessages(val count: Int) : GrpcConnectionEvent() +} + +/** + * Reconnect configuration + */ +data class ReconnectConfig( + val maxRetries: Int = 10, + val initialDelayMs: Long = 1000, + val maxDelayMs: Long = 30000, + val backoffMultiplier: Double = 2.0 +) /** * gRPC client for Message Router service + * + * Features: + * - Automatic reconnection with exponential backoff + * - Heartbeat mechanism with failure detection + * - Stream auto-recovery on disconnect + * - Connection state notifications via StateFlow */ @Singleton class GrpcClient @Inject constructor() { + companion object { + private const val TAG = "GrpcClient" + private const val HEARTBEAT_INTERVAL_MS = 30000L + private const val MAX_HEARTBEAT_FAILS = 3 + private const val CONNECTION_TIMEOUT_SECONDS = 10L + private const val REQUEST_TIMEOUT_SECONDS = 30L + private const val MAX_REQUEST_RETRIES = 3 + private const val REQUEST_RETRY_DELAY_MS = 500L + } + private var channel: ManagedChannel? = null private var stub: MessageRouterGrpc.MessageRouterBlockingStub? = null private var asyncStub: MessageRouterGrpc.MessageRouterStub? = null + // Connection state + private val _connectionState = MutableStateFlow(GrpcConnectionState.Disconnected) + val connectionState: StateFlow = _connectionState.asStateFlow() + + // Connection events (for UI notifications) + private val _connectionEvents = MutableSharedFlow(extraBufferCapacity = 10) + val connectionEvents: SharedFlow = _connectionEvents.asSharedFlow() + + // Reconnection state + private val reconnectConfig = ReconnectConfig() + private var currentHost: String? = null + private var currentPort: Int? = null + private val isReconnecting = AtomicBoolean(false) + private val reconnectAttempts = AtomicInteger(0) + private val shouldReconnect = AtomicBoolean(true) + private var reconnectJob: Job? = null + + // Registration state (for re-registration after reconnect) + private var registeredPartyId: String? = null + private var registeredPartyRole: String? = null + + // Heartbeat state + private var heartbeatJob: Job? = null + private val heartbeatFailCount = AtomicInteger(0) + + // Stream subscriptions (for recovery after reconnect) + private var activeMessageSubscription: MessageSubscription? = null + private var eventStreamSubscribed = AtomicBoolean(false) + private var eventStreamPartyId: String? = null + + // Stream version tracking (to detect stale stream events) + private val messageStreamVersion = AtomicInteger(0) + private val eventStreamVersion = AtomicInteger(0) + + // Reconnection callback - called when streams need to be re-established + private var onReconnectedCallback: (() -> Unit)? = null + + // Channel state monitoring + private var channelStateMonitorJob: Job? = null + + // Coroutine scope for background tasks + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + /** * Connect to the Message Router server */ fun connect(host: String, port: Int) { - disconnect() + // Save connection params for reconnection + currentHost = host + currentPort = port + shouldReconnect.set(true) + reconnectAttempts.set(0) - val builder = ManagedChannelBuilder - .forAddress(host, port) - .keepAliveTime(30, TimeUnit.SECONDS) - .keepAliveTimeout(10, TimeUnit.SECONDS) + doConnect(host, port) + } - // Use TLS for port 443, plaintext for other ports (like local development) - if (port == 443) { - builder.useTransportSecurity() - } else { - builder.usePlaintext() + private fun doConnect(host: String, port: Int) { + Log.d(TAG, "Connecting to $host:$port") + _connectionState.value = GrpcConnectionState.Connecting + + try { + // Disconnect existing connection + cleanupConnection() + + val builder = ManagedChannelBuilder + .forAddress(host, port) + .keepAliveTime(30, TimeUnit.SECONDS) + .keepAliveTimeout(10, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .idleTimeout(5, TimeUnit.MINUTES) + + // Use TLS for port 443, plaintext for other ports (like local development) + if (port == 443) { + builder.useTransportSecurity() + } else { + builder.usePlaintext() + } + + channel = builder.build() + + // Create stubs (deadline set per-call via getStubWithDeadline()) + stub = MessageRouterGrpc.newBlockingStub(channel) + asyncStub = MessageRouterGrpc.newStub(channel) + + // Wait for connection to be ready + scope.launch { + waitForConnection() + } + + } catch (e: Exception) { + Log.e(TAG, "Connection failed: ${e.message}") + _connectionState.value = GrpcConnectionState.Failed(e.message ?: "Unknown error") + triggerReconnect("Connection failed: ${e.message}") } + } - channel = builder.build() + private suspend fun waitForConnection() { + val ch = channel ?: return - stub = MessageRouterGrpc.newBlockingStub(channel) - asyncStub = MessageRouterGrpc.newStub(channel) + try { + withTimeout(CONNECTION_TIMEOUT_SECONDS * 1000) { + while (true) { + val state = ch.getState(true) + Log.d(TAG, "Channel state: $state") + + when (state) { + ConnectivityState.READY -> { + Log.d(TAG, "Connected successfully") + _connectionState.value = GrpcConnectionState.Connected + reconnectAttempts.set(0) + heartbeatFailCount.set(0) + isReconnecting.set(false) + + // Start channel state monitoring + startChannelStateMonitor() + + // Re-register if we were registered before + reRegisterIfNeeded() + + // Restart heartbeat + startHeartbeat() + + // Re-subscribe to streams + reSubscribeStreams() + + return@withTimeout + } + ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.SHUTDOWN -> { + throw Exception("Connection failed with state: $state") + } + else -> { + delay(100) + } + } + } + } + } catch (e: Exception) { + Log.e(TAG, "Wait for connection failed: ${e.message}") + _connectionState.value = GrpcConnectionState.Failed(e.message ?: "Connection timeout") + triggerReconnect("Connection timeout") + } } /** - * Disconnect from the server + * Disconnect from the server (won't auto-reconnect) */ fun disconnect() { - channel?.shutdown() + Log.d(TAG, "Disconnecting") + shouldReconnect.set(false) + cleanupConnection() + _connectionState.value = GrpcConnectionState.Disconnected + } + + /** + * Cleanup all connection resources + */ + private fun cleanupConnection() { + // Cancel reconnect job + reconnectJob?.cancel() + reconnectJob = null + + // Stop channel state monitor + channelStateMonitorJob?.cancel() + channelStateMonitorJob = null + + // Stop heartbeat + stopHeartbeat() + + // Increment stream versions to invalidate old stream callbacks + messageStreamVersion.incrementAndGet() + eventStreamVersion.incrementAndGet() + + // Shutdown channel + channel?.let { ch -> + try { + ch.shutdown() + val terminated = ch.awaitTermination(2, TimeUnit.SECONDS) + if (!terminated) { + ch.shutdownNow() + } + } catch (e: Exception) { + Log.e(TAG, "Error shutting down channel: ${e.message}") + } + Unit + } channel = null stub = null asyncStub = null } + /** + * Start monitoring channel state for disconnections + */ + private fun startChannelStateMonitor() { + channelStateMonitorJob?.cancel() + channelStateMonitorJob = scope.launch { + val ch = channel ?: return@launch + var lastState = ConnectivityState.READY + + while (isActive && _connectionState.value == GrpcConnectionState.Connected) { + delay(5000) // Check every 5 seconds + + try { + val currentState = ch.getState(false) + if (currentState != lastState) { + Log.d(TAG, "Channel state changed: $lastState -> $currentState") + lastState = currentState + + when (currentState) { + ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.SHUTDOWN -> { + Log.w(TAG, "Channel entered failure state: $currentState") + triggerReconnect("Channel state: $currentState") + } + ConnectivityState.IDLE -> { + // Try to reconnect if idle + Log.d(TAG, "Channel idle, requesting connection") + ch.getState(true) // Request connection + } + else -> { /* READY, CONNECTING - OK */ } + } + } + } catch (e: Exception) { + Log.e(TAG, "Error checking channel state: ${e.message}") + } + } + } + } + + /** + * Trigger reconnection with exponential backoff + */ + private fun triggerReconnect(reason: String) { + if (!shouldReconnect.get() || isReconnecting.getAndSet(true)) { + return + } + + val host = currentHost + val port = currentPort + if (host == null || port == null) { + isReconnecting.set(false) + return + } + + Log.d(TAG, "Triggering reconnect: $reason") + + // Emit disconnected event + _connectionEvents.tryEmit(GrpcConnectionEvent.Disconnected(reason)) + + reconnectJob?.cancel() + reconnectJob = scope.launch { + val attempt = reconnectAttempts.incrementAndGet() + + if (attempt > reconnectConfig.maxRetries) { + Log.e(TAG, "Max reconnect attempts reached") + _connectionState.value = GrpcConnectionState.Failed("Max retries exceeded") + _connectionEvents.tryEmit(GrpcConnectionEvent.ReconnectFailed("Max retries exceeded")) + isReconnecting.set(false) + return@launch + } + + _connectionState.value = GrpcConnectionState.Reconnecting(attempt, reconnectConfig.maxRetries) + + // Calculate delay with exponential backoff + val delay = min( + (reconnectConfig.initialDelayMs * reconnectConfig.backoffMultiplier.pow(attempt - 1.0)).toLong(), + reconnectConfig.maxDelayMs + ) + + Log.d(TAG, "Reconnecting in ${delay}ms (attempt $attempt/${reconnectConfig.maxRetries})") + delay(delay) + + isReconnecting.set(false) + doConnect(host, port) + } + } + + /** + * Start heartbeat mechanism + */ + private fun startHeartbeat() { + stopHeartbeat() + + val partyId = registeredPartyId ?: return + + heartbeatJob = scope.launch { + while (isActive && _connectionState.value == GrpcConnectionState.Connected) { + delay(HEARTBEAT_INTERVAL_MS) + + try { + val request = HeartbeatRequest.newBuilder() + .setPartyId(partyId) + .setTimestamp(System.currentTimeMillis()) + .build() + + val response = withContext(Dispatchers.IO) { + getStubWithDeadline()?.heartbeat(request) + } + + if (response != null) { + heartbeatFailCount.set(0) + val pending = response.pendingMessages + Log.d(TAG, "Heartbeat OK, pending messages: $pending") + + // Notify if there are pending messages (may have missed events) + if (pending > 0) { + Log.w(TAG, "Has $pending pending messages - may have missed events") + _connectionEvents.tryEmit(GrpcConnectionEvent.PendingMessages(pending)) + } + } else { + handleHeartbeatFailure("No response") + } + } catch (e: Exception) { + handleHeartbeatFailure(e.message ?: "Unknown error") + } + } + } + } + + private fun handleHeartbeatFailure(reason: String) { + val fails = heartbeatFailCount.incrementAndGet() + Log.w(TAG, "Heartbeat failed ($fails/$MAX_HEARTBEAT_FAILS): $reason") + + if (fails >= MAX_HEARTBEAT_FAILS) { + Log.e(TAG, "Too many heartbeat failures, triggering reconnect") + triggerReconnect("Heartbeat failed") + } + } + + private fun stopHeartbeat() { + heartbeatJob?.cancel() + heartbeatJob = null + heartbeatFailCount.set(0) + } + + /** + * Re-register party after reconnection + */ + private suspend fun reRegisterIfNeeded() { + val partyId = registeredPartyId + val role = registeredPartyRole + + if (partyId != null && role != null) { + Log.d(TAG, "Re-registering party: $partyId") + try { + registerPartyInternal(partyId, role, "1.0.0") + } catch (e: Exception) { + Log.e(TAG, "Re-registration failed: ${e.message}") + } + } + } + + /** + * Wait for channel to be ready (with timeout) + */ + private suspend fun waitForChannelReady(timeoutMs: Long = 2000): Boolean { + val ch = channel ?: return false + val startTime = System.currentTimeMillis() + + while (System.currentTimeMillis() - startTime < timeoutMs) { + if (ch.getState(false) == ConnectivityState.READY) { + return true + } + delay(50) + } + return false + } + + /** + * Re-subscribe to streams after reconnection + * Notifies the repository layer to re-establish message/event subscriptions + */ + private fun reSubscribeStreams() { + val needsResubscribe = eventStreamSubscribed.get() || activeMessageSubscription != null + + if (needsResubscribe) { + Log.d(TAG, "Triggering stream re-subscription callback") + Log.d(TAG, " - Event stream: ${eventStreamSubscribed.get()}, partyId: $eventStreamPartyId") + Log.d(TAG, " - Message stream: ${activeMessageSubscription?.sessionId}") + + // Notify repository to re-establish streams + scope.launch { + // Wait for channel to be fully ready instead of fixed delay + if (waitForChannelReady()) { + try { + onReconnectedCallback?.invoke() + // Emit reconnected event + _connectionEvents.tryEmit(GrpcConnectionEvent.Reconnected) + } catch (e: Exception) { + Log.e(TAG, "Reconnect callback failed: ${e.message}") + // Don't let callback failure affect the connection state + } + } else { + Log.w(TAG, "Channel not ready for stream re-subscription, skipping") + } + } + } else { + // No streams to restore, still emit reconnected event + _connectionEvents.tryEmit(GrpcConnectionEvent.Reconnected) + } + } + + /** + * Set callback for reconnection events + * Called when connection is restored and streams need to be re-established + */ + fun setOnReconnectedCallback(callback: () -> Unit) { + onReconnectedCallback = callback + } + + /** + * Get active message subscription info (for recovery) + */ + fun getActiveMessageSubscription(): Pair? { + return activeMessageSubscription?.let { Pair(it.sessionId, it.partyId) } + } + + /** + * Check if event stream was subscribed (for recovery) + */ + fun wasEventStreamSubscribed(): Boolean = eventStreamSubscribed.get() + + /** + * Get event stream party ID (for recovery) + */ + fun getEventStreamPartyId(): String? = eventStreamPartyId + + /** + * Check if connected + */ + fun isConnected(): Boolean = _connectionState.value == GrpcConnectionState.Connected + + /** + * Get stub with per-call deadline (fixes timeout issue) + */ + private fun getStubWithDeadline(): MessageRouterGrpc.MessageRouterBlockingStub? { + return stub?.withDeadlineAfter(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS) + } + + /** + * Execute an RPC call with retry logic for transient failures + */ + private suspend fun withRetry( + operation: String, + maxRetries: Int = MAX_REQUEST_RETRIES, + block: suspend () -> T + ): T { + var lastException: Exception? = null + + repeat(maxRetries) { attempt -> + try { + return block() + } catch (e: Exception) { + lastException = e + val isRetryable = isRetryableError(e) + + if (attempt < maxRetries - 1 && isRetryable) { + val delay = REQUEST_RETRY_DELAY_MS * (attempt + 1) + Log.w(TAG, "$operation failed (attempt ${attempt + 1}/$maxRetries), retrying in ${delay}ms: ${e.message}") + delay(delay) + } else { + Log.e(TAG, "$operation failed after ${attempt + 1} attempts: ${e.message}") + throw e + } + } + } + + throw lastException ?: Exception("$operation failed after $maxRetries attempts") + } + + /** + * Check if an error is retryable (transient network issues) + */ + private fun isRetryableError(e: Exception): Boolean { + val message = e.message.orEmpty().lowercase() + return message.contains("unavailable") || + message.contains("deadline exceeded") || + message.contains("connection") || + message.contains("timeout") || + message.contains("reset") || + message.contains("broken pipe") + } + /** * Register party with the router */ @@ -67,16 +566,38 @@ class GrpcClient @Inject constructor() { partyRole: String = "temporary", version: String = "1.0.0" ): Result = withContext(Dispatchers.IO) { - try { - val request = RegisterPartyRequest.newBuilder() - .setPartyId(partyId) - .setPartyRole(partyRole) - .setVersion(version) - .build() + // Save for re-registration + registeredPartyId = partyId + registeredPartyRole = partyRole - val response = stub?.registerParty(request) - Result.success(response?.success ?: false) + registerPartyInternal(partyId, partyRole, version) + } + + private suspend fun registerPartyInternal( + partyId: String, + partyRole: String, + version: String + ): Result = withContext(Dispatchers.IO) { + try { + withRetry("RegisterParty") { + val request = RegisterPartyRequest.newBuilder() + .setPartyId(partyId) + .setPartyRole(partyRole) + .setVersion(version) + .build() + + val response = getStubWithDeadline()?.registerParty(request) + if (response?.success == true) { + Log.d(TAG, "Party registered: $partyId") + startHeartbeat() + true + } else { + throw Exception("Registration failed") + } + } + Result.success(true) } catch (e: Exception) { + Log.e(TAG, "Register party failed: ${e.message}") Result.failure(e) } } @@ -90,21 +611,21 @@ class GrpcClient @Inject constructor() { joinToken: String ): Result = withContext(Dispatchers.IO) { try { - // Match Electron behavior: don't send device_info - val request = JoinSessionRequest.newBuilder() - .setSessionId(sessionId) - .setPartyId(partyId) - .setJoinToken(joinToken) - .build() + val data = withRetry("JoinSession") { + // Match Electron behavior: don't send device_info + val request = JoinSessionRequest.newBuilder() + .setSessionId(sessionId) + .setPartyId(partyId) + .setJoinToken(joinToken) + .build() - val response = stub?.joinSession(request) - if (response?.success == true) { - val sessionInfo = response.sessionInfo - val participants = response.otherPartiesList.map { party -> - Participant(party.partyId, party.partyIndex) - } + val response = getStubWithDeadline()?.joinSession(request) + if (response?.success == true) { + val sessionInfo = response.sessionInfo + val participants = response.otherPartiesList.map { party -> + Participant(party.partyId, party.partyIndex) + } - Result.success( JoinSessionData( sessionId = sessionInfo.sessionId, sessionType = sessionInfo.sessionType, @@ -116,11 +637,13 @@ class GrpcClient @Inject constructor() { else Base64.encodeToString(sessionInfo.messageHash.toByteArray(), Base64.NO_WRAP), sessionStatus = if (sessionInfo.status.isNullOrEmpty()) null else sessionInfo.status ) - ) - } else { - Result.failure(Exception("Failed to join session")) + } else { + throw Exception("Failed to join session") + } } + Result.success(data) } catch (e: Exception) { + Log.e(TAG, "Join session failed: ${e.message}") Result.failure(e) } } @@ -138,9 +661,10 @@ class GrpcClient @Inject constructor() { .setPartyId(partyId) .build() - val response = stub?.markPartyReady(request) + val response = getStubWithDeadline()?.markPartyReady(request) Result.success(response?.allReady ?: false) } catch (e: Exception) { + Log.e(TAG, "Mark party ready failed: ${e.message}") Result.failure(e) } } @@ -157,30 +681,40 @@ class GrpcClient @Inject constructor() { payload: ByteArray ): Result = withContext(Dispatchers.IO) { try { - val request = RouteMessageRequest.newBuilder() - .setSessionId(sessionId) - .setFromParty(fromParty) - .addAllToParties(toParties) - .setRoundNumber(roundNumber) - .setMessageType(messageType) - .setPayload(com.google.protobuf.ByteString.copyFrom(payload)) - .build() + val messageId = withRetry("RouteMessage") { + val request = RouteMessageRequest.newBuilder() + .setSessionId(sessionId) + .setFromParty(fromParty) + .addAllToParties(toParties) + .setRoundNumber(roundNumber) + .setMessageType(messageType) + .setPayload(com.google.protobuf.ByteString.copyFrom(payload)) + .build() - val response = stub?.routeMessage(request) - if (response?.success == true) { - Result.success(response.messageId) - } else { - Result.failure(Exception("Failed to route message")) + val response = getStubWithDeadline()?.routeMessage(request) + if (response?.success == true) { + response.messageId + } else { + throw Exception("Failed to route message") + } } + Result.success(messageId) } catch (e: Exception) { + Log.e(TAG, "Route message failed: ${e.message}") Result.failure(e) } } /** - * Subscribe to messages for a party + * Subscribe to messages for a party (with auto-recovery) */ fun subscribeMessages(sessionId: String, partyId: String): Flow = callbackFlow { + // Save subscription for recovery + activeMessageSubscription = MessageSubscription(sessionId, partyId) + + // Capture current stream version to detect stale callbacks + val streamVersion = messageStreamVersion.incrementAndGet() + val request = SubscribeMessagesRequest.newBuilder() .setSessionId(sessionId) .setPartyId(partyId) @@ -188,6 +722,12 @@ class GrpcClient @Inject constructor() { val observer = object : StreamObserver { override fun onNext(message: MPCMessage) { + // Ignore events from stale streams + if (messageStreamVersion.get() != streamVersion) { + Log.d(TAG, "Ignoring message from stale stream (v$streamVersion, current v${messageStreamVersion.get()})") + return + } + val incoming = IncomingMessage( messageId = message.messageId, fromParty = message.fromParty, @@ -199,29 +739,71 @@ class GrpcClient @Inject constructor() { } override fun onError(t: Throwable) { + Log.e(TAG, "Message stream error: ${t.message}") + + // Ignore events from stale streams + if (messageStreamVersion.get() != streamVersion) { + Log.d(TAG, "Ignoring error from stale message stream") + close(t) + return + } + + // Don't trigger reconnect for CANCELLED errors + if (!t.message.orEmpty().contains("CANCELLED")) { + triggerReconnect("Message stream error: ${t.message}") + } close(t) } override fun onCompleted() { + Log.d(TAG, "Message stream completed") + + // Ignore events from stale streams + if (messageStreamVersion.get() != streamVersion) { + Log.d(TAG, "Ignoring completion from stale message stream") + close() + return + } + + // Stream ended unexpectedly - trigger reconnect if we should still be subscribed + if (activeMessageSubscription != null && shouldReconnect.get()) { + Log.w(TAG, "Message stream ended unexpectedly, triggering reconnect") + triggerReconnect("Message stream ended") + } close() } } asyncStub?.subscribeMessages(request, observer) - awaitClose { } + awaitClose { + activeMessageSubscription = null + } } /** - * Subscribe to session events + * Subscribe to session events (with auto-recovery) */ fun subscribeSessionEvents(partyId: String): Flow = callbackFlow { + // Save subscription for recovery + eventStreamSubscribed.set(true) + eventStreamPartyId = partyId + + // Capture current stream version to detect stale callbacks + val streamVersion = eventStreamVersion.incrementAndGet() + val request = SubscribeSessionEventsRequest.newBuilder() .setPartyId(partyId) .build() val observer = object : StreamObserver { override fun onNext(event: SessionEvent) { + // Ignore events from stale streams + if (eventStreamVersion.get() != streamVersion) { + Log.d(TAG, "Ignoring event from stale stream (v$streamVersion, current v${eventStreamVersion.get()})") + return + } + val eventData = SessionEventData( eventId = event.eventId, eventType = event.eventType, @@ -237,17 +819,62 @@ class GrpcClient @Inject constructor() { } override fun onError(t: Throwable) { + Log.e(TAG, "Session event stream error: ${t.message}") + + // Ignore events from stale streams + if (eventStreamVersion.get() != streamVersion) { + Log.d(TAG, "Ignoring error from stale event stream") + close(t) + return + } + + // Don't trigger reconnect for CANCELLED errors + if (!t.message.orEmpty().contains("CANCELLED")) { + triggerReconnect("Event stream error: ${t.message}") + } close(t) } override fun onCompleted() { + Log.d(TAG, "Session event stream completed") + + // Ignore events from stale streams + if (eventStreamVersion.get() != streamVersion) { + Log.d(TAG, "Ignoring completion from stale event stream") + close() + return + } + + // Stream ended unexpectedly - trigger reconnect if we should still be subscribed + if (eventStreamSubscribed.get() && shouldReconnect.get()) { + Log.w(TAG, "Event stream ended unexpectedly, triggering reconnect") + triggerReconnect("Event stream ended") + } close() } } asyncStub?.subscribeSessionEvents(request, observer) - awaitClose { } + awaitClose { + eventStreamSubscribed.set(false) + eventStreamPartyId = null + } + } + + /** + * Unsubscribe from session events + */ + fun unsubscribeSessionEvents() { + eventStreamSubscribed.set(false) + eventStreamPartyId = null + } + + /** + * Unsubscribe from messages + */ + fun unsubscribeMessages() { + activeMessageSubscription = null } /** @@ -271,15 +898,16 @@ class GrpcClient @Inject constructor() { builder.setSignature(com.google.protobuf.ByteString.copyFrom(it)) } - val response = stub?.reportCompletion(builder.build()) + val response = getStubWithDeadline()?.reportCompletion(builder.build()) Result.success(response?.allCompleted ?: false) } catch (e: Exception) { + Log.e(TAG, "Report completion failed: ${e.message}") Result.failure(e) } } /** - * Send heartbeat + * Send heartbeat (manual call for testing) */ suspend fun heartbeat(partyId: String): Result = withContext(Dispatchers.IO) { try { @@ -288,14 +916,42 @@ class GrpcClient @Inject constructor() { .setTimestamp(System.currentTimeMillis()) .build() - val response = stub?.heartbeat(request) + val response = getStubWithDeadline()?.heartbeat(request) Result.success(response?.pendingMessages ?: 0) } catch (e: Exception) { + Log.e(TAG, "Heartbeat failed: ${e.message}") Result.failure(e) } } + + /** + * Force reconnect (for manual recovery) + */ + fun forceReconnect() { + if (currentHost != null && currentPort != null) { + reconnectAttempts.set(0) + triggerReconnect("Manual reconnect requested") + } + } + + /** + * Get current connection parameters + */ + fun getConnectionInfo(): Pair? { + val host = currentHost + val port = currentPort + return if (host != null && port != null) Pair(host, port) else null + } } +/** + * Message subscription info + */ +private data class MessageSubscription( + val sessionId: String, + val partyId: String +) + /** * Data class for join session response */ diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt index 8081519c..77151808 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt @@ -5,6 +5,8 @@ import com.durian.tssparty.data.local.ShareRecordDao import com.durian.tssparty.data.local.ShareRecordEntity import com.durian.tssparty.data.local.TssNativeBridge import com.durian.tssparty.data.remote.GrpcClient +import com.durian.tssparty.data.remote.GrpcConnectionEvent +import com.durian.tssparty.data.remote.GrpcConnectionState import com.durian.tssparty.data.remote.IncomingMessage import com.durian.tssparty.data.remote.JoinSessionData import com.durian.tssparty.data.remote.SessionEventData @@ -34,13 +36,51 @@ class TssRepository @Inject constructor( private val _sessionStatus = MutableStateFlow(SessionStatus.WAITING) val sessionStatus: StateFlow = _sessionStatus.asStateFlow() + // Expose gRPC connection state for UI + val grpcConnectionState: StateFlow = grpcClient.connectionState + + // Expose gRPC connection events for UI notifications + val grpcConnectionEvents: SharedFlow = grpcClient.connectionEvents + private var partyId: String = UUID.randomUUID().toString() private var messageCollectionJob: Job? = null private var sessionEventJob: Job? = null + // Track current message routing params for reconnection recovery + private var currentMessageRoutingSessionId: String? = null + private var currentMessageRoutingPartyIndex: Int? = null + // Account service URL (configurable via settings) private var accountServiceUrl: String = "https://rwaapi.szaiai.com" + init { + // Set up reconnection callback to restore streams + grpcClient.setOnReconnectedCallback { + android.util.Log.d("TssRepository", "gRPC reconnected, restoring streams...") + restoreStreamsAfterReconnect() + } + } + + /** + * Restore message and event streams after gRPC reconnection + */ + private fun restoreStreamsAfterReconnect() { + val sessionId = currentMessageRoutingSessionId + val partyIndex = currentMessageRoutingPartyIndex + + // Restore message routing if we had an active session + if (sessionId != null && partyIndex != null) { + android.util.Log.d("TssRepository", "Restoring message routing for session: $sessionId") + startMessageRouting(sessionId, partyIndex) + } + + // Restore session event subscription + if (grpcClient.wasEventStreamSubscribed()) { + android.util.Log.d("TssRepository", "Restoring session event subscription") + startSessionEventSubscription() + } + } + // HTTP client for API calls private val httpClient = okhttp3.OkHttpClient.Builder() .connectTimeout(30, java.util.concurrent.TimeUnit.SECONDS) @@ -70,6 +110,23 @@ class TssRepository @Inject constructor( grpcClient.disconnect() } + /** + * Check if gRPC is connected + */ + fun isGrpcConnected(): Boolean = grpcClient.isConnected() + + /** + * Force reconnect to gRPC server + */ + fun forceReconnect() { + grpcClient.forceReconnect() + } + + /** + * Get current gRPC connection info + */ + fun getGrpcConnectionInfo(): Pair? = grpcClient.getConnectionInfo() + // Session event callback (set by ViewModel) private var sessionEventCallback: ((SessionEventData) -> Unit)? = null @@ -1096,6 +1153,10 @@ class TssRepository @Inject constructor( * Start message routing between TSS and gRPC */ private fun startMessageRouting(sessionId: String, partyIndex: Int) { + // Save params for reconnection recovery + currentMessageRoutingSessionId = sessionId + currentMessageRoutingPartyIndex = partyIndex + messageCollectionJob?.cancel() messageCollectionJob = CoroutineScope(Dispatchers.IO).launch { // Collect outgoing messages from TSS and route via gRPC @@ -1219,6 +1280,10 @@ class TssRepository @Inject constructor( sessionEventJob?.cancel() _currentSession.value = null _sessionStatus.value = SessionStatus.WAITING + + // Clear reconnection recovery params + currentMessageRoutingSessionId = null + currentMessageRoutingPartyIndex = null } /** diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/screens/CreateWalletScreen.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/screens/CreateWalletScreen.kt index 965d4638..0241365a 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/screens/CreateWalletScreen.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/screens/CreateWalletScreen.kt @@ -44,6 +44,7 @@ fun CreateWalletScreen( inviteCode: String?, sessionId: String?, sessionStatus: SessionStatus, + hasEnteredSession: Boolean = false, participants: List = emptyList(), currentRound: Int = 0, totalRounds: Int = 9, @@ -61,8 +62,10 @@ fun CreateWalletScreen( var validationError by remember { mutableStateOf(null) } // Determine current step based on state + // Show session screen if user clicked "进入会话" OR if keygen is in progress/completed/failed val step = when { sessionStatus == SessionStatus.IN_PROGRESS || sessionStatus == SessionStatus.COMPLETED || sessionStatus == SessionStatus.FAILED -> "session" + hasEnteredSession && inviteCode != null -> "session" inviteCode != null -> "created" isLoading -> "creating" else -> "config" diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/screens/SettingsScreen.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/screens/SettingsScreen.kt index b2440ea3..c1e3abff 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/screens/SettingsScreen.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/screens/SettingsScreen.kt @@ -11,6 +11,7 @@ import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.text.font.FontWeight import androidx.compose.ui.unit.dp +import com.durian.tssparty.BuildConfig import com.durian.tssparty.domain.model.AppSettings import com.durian.tssparty.domain.model.NetworkType @@ -479,7 +480,11 @@ fun SettingsScreen( ) { AboutRow("应用名称", "TSS Party") Divider(modifier = Modifier.padding(vertical = 8.dp)) - AboutRow("版本", "1.0.0") + AboutRow("版本", BuildConfig.VERSION_NAME) + Divider(modifier = Modifier.padding(vertical = 8.dp)) + AboutRow("版本号", BuildConfig.VERSION_CODE.toString()) + Divider(modifier = Modifier.padding(vertical = 8.dp)) + AboutRow("构建类型", if (BuildConfig.DEBUG) "Debug" else "Release") Divider(modifier = Modifier.padding(vertical = 8.dp)) AboutRow("TSS 协议", "GG20") Divider(modifier = Modifier.padding(vertical = 8.dp)) diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt index 02016c68..e62b2882 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt @@ -40,6 +40,10 @@ class MainViewModel @Inject constructor( private val _createdInviteCode = MutableStateFlow(null) val createdInviteCode: StateFlow = _createdInviteCode.asStateFlow() + // Flag to track if user has entered the session (clicked "进入会话") + private val _hasEnteredSession = MutableStateFlow(false) + val hasEnteredSession: StateFlow = _hasEnteredSession.asStateFlow() + init { // Start initialization on app launch checkAllServices() @@ -331,9 +335,10 @@ class MainViewModel @Inject constructor( */ fun enterSession() { // Session events are already being listened to via the callback set in init - // This just transitions the UI to the session view + // This transitions the UI to the session view val sessionId = _currentSessionId.value android.util.Log.d("MainViewModel", "Entering session: $sessionId") + _hasEnteredSession.value = true } /** @@ -383,6 +388,7 @@ class MainViewModel @Inject constructor( _currentRound.value = 0 _publicKey.value = null _createdInviteCode.value = null + _hasEnteredSession.value = false } // ========== Join Keygen State ========== diff --git a/backend/mpc-system/services/service-party-app/src/pages/Create.module.css b/backend/mpc-system/services/service-party-app/src/pages/Create.module.css index 48681d4a..32b06736 100644 --- a/backend/mpc-system/services/service-party-app/src/pages/Create.module.css +++ b/backend/mpc-system/services/service-party-app/src/pages/Create.module.css @@ -318,3 +318,28 @@ .copyButton:hover { background-color: var(--primary-light); } + +/* QR Code Section */ +.qrSection { + display: flex; + flex-direction: column; + align-items: center; + gap: var(--spacing-sm); + padding: var(--spacing-lg); + background-color: var(--background-color); + border-radius: var(--radius-lg); + margin-bottom: var(--spacing-md); +} + +.qrCodeWrapper { + padding: var(--spacing-md); + background-color: white; + border-radius: var(--radius-md); + box-shadow: var(--shadow-sm); +} + +.qrHint { + font-size: 13px; + color: var(--text-secondary); + margin: 0; +} diff --git a/backend/mpc-system/services/service-party-app/src/pages/Create.tsx b/backend/mpc-system/services/service-party-app/src/pages/Create.tsx index 76e7226a..9404d491 100644 --- a/backend/mpc-system/services/service-party-app/src/pages/Create.tsx +++ b/backend/mpc-system/services/service-party-app/src/pages/Create.tsx @@ -1,5 +1,6 @@ import { useState } from 'react'; import { useNavigate } from 'react-router-dom'; +import { QRCodeSVG } from 'qrcode.react'; import styles from './Create.module.css'; interface CreateSessionResult { @@ -220,6 +221,21 @@ export default function Create() {

会话创建成功

+ {/* QR Code for mobile scanning */} +
+
+ +
+

使用手机 App 扫码加入

+
+