diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt index 95988b05..9e19dd06 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt @@ -57,17 +57,87 @@ class TssRepository @Inject constructor( // CRITICAL: For backup/restore to work, signing must use the original partyId from keygen private var currentSigningPartyId: String? = null - private var messageCollectionJob: Job? = null - private var sessionEventJob: Job? = null + /** + * JobManager - 统一管理后台协程任务 + * + * 【架构安全修复 - 防止协程泄漏】 + * + * 问题背景: + * - TssRepository 中有 4 个独立的 Job 变量(messageCollectionJob、sessionEventJob 等) + * - cleanup() 需要手动取消每个 Job,容易遗漏导致协程泄漏 + * - 没有统一的生命周期管理和错误处理 + * + * 修复的内存泄漏风险: + * 1. Activity 销毁时 Job 未取消 → 后台协程继续运行 → 内存泄漏 → OOM + * 2. 快速重启连接时旧 Job 未取消 → 多个 Job 并行运行 → 资源竞争 + * 3. 异常导致某个 Job 未取消 → 僵尸协程 → 内存累积 + * + * JobManager 功能: + * - 统一启动和取消所有后台 Job + * - 自动替换同名 Job(防止重复启动) + * - 一键清理所有 Job(防止遗漏) + * - 提供 Job 状态查询 + */ + private inner class JobManager { + private val jobs = mutableMapOf() + + /** + * 启动一个被管理的 Job + * 如果同名 Job 已存在,自动取消旧 Job + */ + fun launch(name: String, block: suspend CoroutineScope.() -> Unit): Job { + // 取消同名的旧 Job + jobs[name]?.cancel() + + // 启动新 Job + val job = repositoryScope.launch(block = block) + jobs[name] = job + + android.util.Log.d("TssRepository", "[JobManager] Launched job: $name (active jobs: ${jobs.size})") + return job + } + + /** + * 取消指定的 Job + */ + fun cancel(name: String) { + jobs[name]?.let { job -> + job.cancel() + jobs.remove(name) + android.util.Log.d("TssRepository", "[JobManager] Cancelled job: $name (remaining jobs: ${jobs.size})") + } + } + + /** + * 检查 Job 是否活跃 + */ + fun isActive(name: String): Boolean { + return jobs[name]?.isActive == true + } + + /** + * 取消所有 Job + */ + fun cancelAll() { + android.util.Log.d("TssRepository", "[JobManager] Cancelling all jobs (total: ${jobs.size})") + jobs.values.forEach { it.cancel() } + jobs.clear() + } + + /** + * 获取活跃的 Job 数量 + */ + fun getActiveJobCount(): Int { + return jobs.values.count { it.isActive } + } + } + + private val jobManager = JobManager() // Pre-registered session ID for event matching (set before joinSession to avoid race condition) // This allows session_started events to be matched even if _currentSession is not yet set private var pendingSessionId: String? = null - // Fallback polling job for session status (handles gRPC stream disconnection on Android) - // Android gRPC streams can disconnect when app goes to background, so we poll as backup - private var sessionStatusPollingJob: Job? = null - // Keygen timeout callback (set by ViewModel) // Called when 5-minute polling timeout is reached without session starting private var keygenTimeoutCallback: ((String) -> Unit)? = null @@ -80,14 +150,17 @@ class TssRepository @Inject constructor( // Called when TSS protocol progress updates (round/totalRounds) private var progressCallback: ((Int, Int) -> Unit)? = null - // Job for collecting progress from native bridge - private var progressCollectionJob: Job? = null - // Repository-level CoroutineScope for background tasks // Uses SupervisorJob so individual task failures don't cancel other tasks private val repositoryScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) companion object { + // Job 名称常量 + private const val JOB_MESSAGE_COLLECTION = "message_collection" + private const val JOB_SESSION_EVENT = "session_event" + private const val JOB_SESSION_STATUS_POLLING = "session_status_polling" + private const val JOB_PROGRESS_COLLECTION = "progress_collection" + // Polling interval for session status check (matching Electron's 2-second interval) private const val POLL_INTERVAL_MS = 2000L // Maximum wait time for keygen to start after all parties joined (5 minutes, matching Electron) @@ -210,20 +283,21 @@ class TssRepository @Inject constructor( * Disconnect from the server */ fun disconnect() { - messageCollectionJob?.cancel() - sessionEventJob?.cancel() - sessionStatusPollingJob?.cancel() + // 使用 JobManager 统一取消所有后台任务 + jobManager.cancelAll() grpcClient.disconnect() } /** * Cleanup all resources when the repository is destroyed. * Call this when the app is being destroyed to prevent memory leaks. + * + * 【架构安全修复 - 使用 JobManager 统一清理】 + * 替换手动取消每个 Job 的方式,防止遗漏导致内存泄漏 */ fun cleanup() { - messageCollectionJob?.cancel() - sessionEventJob?.cancel() - sessionStatusPollingJob?.cancel() + // 使用 JobManager 统一取消所有后台任务 + jobManager.cancelAll() repositoryScope.cancel() grpcClient.disconnect() } @@ -298,13 +372,14 @@ class TssRepository @Inject constructor( * from keygen (shareEntity.partyId) so that session events are received correctly. */ private fun startSessionEventSubscription(subscriptionPartyId: String? = null) { - sessionEventJob?.cancel() val devicePartyId = requirePartyId() // Ensure partyId is initialized val effectivePartyId = subscriptionPartyId ?: devicePartyId // Save for reconnection recovery currentSessionEventPartyId = effectivePartyId android.util.Log.d("TssRepository", "Starting session event subscription for partyId: $effectivePartyId (device partyId: $devicePartyId)") - sessionEventJob = repositoryScope.launch { + + // 使用 JobManager 启动(自动取消同名旧 Job) + jobManager.launch(JOB_SESSION_EVENT) { grpcClient.subscribeSessionEvents(effectivePartyId).collect { event -> android.util.Log.d("TssRepository", "=== Session event received ===") android.util.Log.d("TssRepository", " eventType: ${event.eventType}") @@ -405,8 +480,8 @@ class TssRepository @Inject constructor( * partyId from keygen (shareEntity.partyId). */ private fun ensureSessionEventSubscriptionActive(signingPartyId: String? = null) { - // Check if the session event job is still active - val isActive = sessionEventJob?.isActive == true + // Check if the session event job is still active using JobManager + val isActive = jobManager.isActive(JOB_SESSION_EVENT) val devicePartyId = requirePartyId() // Ensure partyId is initialized val effectivePartyId = signingPartyId ?: currentSessionEventPartyId ?: devicePartyId android.util.Log.d("TssRepository", "Checking session event subscription: isActive=$isActive, effectivePartyId=$effectivePartyId") @@ -473,12 +548,10 @@ class TssRepository @Inject constructor( * Called when keygen/sign session starts */ private fun startProgressCollection() { - // Cancel any existing progress collection - progressCollectionJob?.cancel() - android.util.Log.d("TssRepository", "[PROGRESS] Starting progress collection from native bridge") - progressCollectionJob = repositoryScope.launch { + // 使用 JobManager 启动(自动取消同名旧 Job) + jobManager.launch(JOB_PROGRESS_COLLECTION) { tssNativeBridge.progress.collect { (round, totalRounds) -> android.util.Log.d("TssRepository", "[PROGRESS] Round $round / $totalRounds") withContext(Dispatchers.Main) { @@ -493,8 +566,7 @@ class TssRepository @Inject constructor( * Called when session ends or is cancelled */ private fun stopProgressCollection() { - progressCollectionJob?.cancel() - progressCollectionJob = null + jobManager.cancel(JOB_PROGRESS_COLLECTION) android.util.Log.d("TssRepository", "[PROGRESS] Progress collection stopped") } @@ -513,15 +585,13 @@ class TssRepository @Inject constructor( * @param sessionType "keygen" or "sign" - determines which callback to invoke */ fun startSessionStatusPolling(sessionId: String, sessionType: String = "keygen") { - // Cancel any existing polling job - sessionStatusPollingJob?.cancel() - android.util.Log.d("TssRepository", "[POLLING] Starting session status polling for $sessionType session: $sessionId") // Notify UI that countdown has started (5 minutes = 300 seconds) countdownTickCallback?.invoke(MAX_WAIT_MS / 1000) - sessionStatusPollingJob = repositoryScope.launch { + // 使用 JobManager 启动(自动取消同名旧 Job) + jobManager.launch(JOB_SESSION_STATUS_POLLING) { val startTime = System.currentTimeMillis() var pollCount = 0 var lastTickSecond = MAX_WAIT_MS / 1000 @@ -631,8 +701,7 @@ class TssRepository @Inject constructor( * - User navigates away */ fun stopSessionStatusPolling() { - sessionStatusPollingJob?.cancel() - sessionStatusPollingJob = null + jobManager.cancel(JOB_SESSION_STATUS_POLLING) // Clear countdown display when polling is stopped countdownTickCallback?.invoke(-1L) android.util.Log.d("TssRepository", "[POLLING] Session status polling stopped") @@ -1421,7 +1490,7 @@ class TssRepository @Inject constructor( stopProgressCollection() _sessionStatus.value = SessionStatus.COMPLETED pendingSessionId = null // Clear pending session ID on completion - messageCollectionJob?.cancel() + jobManager.cancel(JOB_MESSAGE_COLLECTION) currentSigningPartyId = null // Clear after signing completes android.util.Log.d("TssRepository", "Sign as joiner completed: signature=${result.signature.take(20)}...") @@ -1543,7 +1612,7 @@ class TssRepository @Inject constructor( grpcClient.reportCompletion(apiJoinData.sessionId, partyId, publicKeyBytes) _sessionStatus.value = SessionStatus.COMPLETED - messageCollectionJob?.cancel() + jobManager.cancel(JOB_MESSAGE_COLLECTION) Result.success(shareEntity.copy(id = id).toShareRecord()) @@ -1728,7 +1797,7 @@ class TssRepository @Inject constructor( grpcClient.reportCompletion(apiJoinData.sessionId, signingPartyId, signature = signatureBytes) _sessionStatus.value = SessionStatus.COMPLETED - messageCollectionJob?.cancel() + jobManager.cancel(JOB_MESSAGE_COLLECTION) currentSigningPartyId = null // Clear after signing completes Result.success(result) @@ -1840,8 +1909,7 @@ class TssRepository @Inject constructor( // Save for reconnection recovery currentMessageRoutingPartyId = effectivePartyId - messageCollectionJob?.cancel() - messageCollectionJob = repositoryScope.launch { + jobManager.launch(JOB_MESSAGE_COLLECTION) { android.util.Log.d("TssRepository", "Starting message routing: sessionId=$sessionId, routingPartyId=$effectivePartyId") // Collect outgoing messages from TSS and route via gRPC @@ -1998,7 +2066,7 @@ class TssRepository @Inject constructor( stopProgressCollection() _sessionStatus.value = SessionStatus.COMPLETED pendingSessionId = null // Clear pending session ID on completion - sessionEventJob?.cancel() + jobManager.cancel(JOB_SESSION_EVENT) Result.success(shareEntity.copy(id = id).toShareRecord()) @@ -2016,8 +2084,8 @@ class TssRepository @Inject constructor( */ fun cancelSession() { tssNativeBridge.cancelSession() - messageCollectionJob?.cancel() - sessionEventJob?.cancel() + jobManager.cancel(JOB_MESSAGE_COLLECTION) + jobManager.cancel(JOB_SESSION_EVENT) stopSessionStatusPolling() // Stop polling when session is cancelled _currentSession.value = null _sessionStatus.value = SessionStatus.WAITING @@ -2680,7 +2748,7 @@ class TssRepository @Inject constructor( // Note: Message routing is already started in createSignSession after auto-join // Only start if not already running (for backward compatibility with old flow) - if (messageCollectionJob == null || messageCollectionJob?.isActive != true) { + if (!jobManager.isActive(JOB_MESSAGE_COLLECTION)) { // CRITICAL: Use signingPartyId for message routing startMessageRouting(sessionId, shareEntity.partyIndex, signingPartyId) } @@ -2722,7 +2790,7 @@ class TssRepository @Inject constructor( stopProgressCollection() _sessionStatus.value = SessionStatus.COMPLETED - messageCollectionJob?.cancel() + jobManager.cancel(JOB_MESSAGE_COLLECTION) currentSigningPartyId = null // Clear after signing completes Result.success(result)