From 840318f4496fe1d1da04a1613842d0d99f098e9b Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 23 Feb 2026 04:55:52 -0800 Subject: [PATCH] =?UTF-8?q?fix:=20Scheduler=20=E7=BC=BA=E5=B0=91=20tenant?= =?UTF-8?q?=20=E4=B8=8A=E4=B8=8B=E6=96=87=E5=AF=BC=E8=87=B4=20ops-service?= =?UTF-8?q?=20=E5=8D=A1=E6=AD=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 根因: @Cron 定时任务在 HTTP 请求上下文之外运行, TenantAwareRepository 需要 AsyncLocalStorage 中的 tenant 信息, 每分钟抛 "Tenant context not initialized" 错误。 修复: - scanCronOrders: 查 it0_shared.tenants 获取所有活跃租户, 在 TenantContextService.run() 上下文中逐一执行 - handleEventTrigger: 从 Redis event 中提取 tenantId, 同样包裹在 TenantContextService.run() 中 - 每个 tenant 循环加 try/catch 防止单个租户出错影响其他 Co-Authored-By: Claude Opus 4.6 --- .../standing-order-executor.service.ts | 121 +++++++++++++----- 1 file changed, 91 insertions(+), 30 deletions(-) diff --git a/packages/services/ops-service/src/domain/services/standing-order-executor.service.ts b/packages/services/ops-service/src/domain/services/standing-order-executor.service.ts index 9692b0d..702a196 100644 --- a/packages/services/ops-service/src/domain/services/standing-order-executor.service.ts +++ b/packages/services/ops-service/src/domain/services/standing-order-executor.service.ts @@ -3,8 +3,9 @@ import { ConfigService } from '@nestjs/config'; import { HttpService } from '@nestjs/axios'; import { Cron } from '@nestjs/schedule'; import { firstValueFrom } from 'rxjs'; +import { DataSource } from 'typeorm'; import { RedisEventBus } from '@it0/events'; -import { EventPatterns } from '@it0/common'; +import { EventPatterns, TenantContextService } from '@it0/common'; import { StandingOrderRepository } from '../../infrastructure/repositories/standing-order.repository'; import { StandingOrderExecutionRepository } from '../../infrastructure/repositories/standing-order-execution.repository'; import { StandingOrder } from '../entities/standing-order.entity'; @@ -21,6 +22,7 @@ export class StandingOrderExecutorService implements OnModuleInit { private readonly eventBus: RedisEventBus, private readonly configService: ConfigService, private readonly httpService: HttpService, + private readonly dataSource: DataSource, ) { this.agentServiceUrl = this.configService.get( 'AGENT_SERVICE_URL', @@ -54,52 +56,111 @@ export class StandingOrderExecutorService implements OnModuleInit { /** * Cron scanning: runs every minute to evaluate cron-based standing orders. + * Must iterate all tenants since cron jobs run outside HTTP request context. */ @Cron('*/1 * * * *') async scanCronOrders(): Promise { - const activeOrders = await this.standingOrderRepo.findByStatus('active'); - const cronOrders = activeOrders.filter( - (order) => order.trigger.type === 'cron' && order.trigger.cronExpression, - ); + let tenants: { id: string; name: string }[]; + try { + const result = await this.dataSource.query( + `SELECT id, name FROM it0_shared.tenants WHERE status = 'active'`, + ); + tenants = result; + } catch (err) { + this.logger.error(`Failed to query tenants: ${err}`); + return; + } + + if (tenants.length === 0) return; const now = new Date(); - for (const order of cronOrders) { - if (this.cronMatchesCurrentMinute(order.trigger.cronExpression!, now)) { - this.logger.log(`Cron match for standing order "${order.name}" (${order.id})`); - await this.executeOrder(order, { - triggerType: 'cron', - cronExpression: order.trigger.cronExpression, - firedAt: now.toISOString(), - }); + for (const tenant of tenants) { + try { + await TenantContextService.run( + { + tenantId: tenant.id, + tenantName: tenant.name, + plan: 'pro', + schemaName: `it0_t_${tenant.id}`, + }, + async () => { + const activeOrders = + await this.standingOrderRepo.findByStatus('active'); + const cronOrders = activeOrders.filter( + (order) => + order.trigger.type === 'cron' && order.trigger.cronExpression, + ); + + for (const order of cronOrders) { + if ( + this.cronMatchesCurrentMinute( + order.trigger.cronExpression!, + now, + ) + ) { + this.logger.log( + `Cron match for standing order "${order.name}" (${order.id}) tenant=${tenant.id}`, + ); + await this.executeOrder(order, { + triggerType: 'cron', + cronExpression: order.trigger.cronExpression, + firedAt: now.toISOString(), + }); + } + } + }, + ); + } catch (err) { + this.logger.error( + `Scheduler error for tenant ${tenant.id}: ${err}`, + ); } } } /** * Handles an incoming event and matches it against event-triggered standing orders. + * Events from Redis carry tenantId — wrap in tenant context. */ private async handleEventTrigger( eventType: string, event: Record, ): Promise { - const activeOrders = await this.standingOrderRepo.findByStatus('active'); - const matchingOrders = activeOrders.filter( - (order) => - order.trigger.type === 'event' && - order.trigger.eventType === eventType, - ); - - for (const order of matchingOrders) { - this.logger.log( - `Event match for standing order "${order.name}" (${order.id}) on event ${eventType}`, - ); - await this.executeOrder(order, { - triggerType: 'event', - eventType, - eventId: event.id, - eventPayload: event.payload, - }); + const tenantId = event.tenantId; + if (!tenantId) { + this.logger.warn(`Event ${event.id} has no tenantId, skipping`); + return; } + + await TenantContextService.run( + { + tenantId, + tenantName: tenantId, + plan: 'pro', + schemaName: `it0_t_${tenantId}`, + }, + async () => { + const activeOrders = + await this.standingOrderRepo.findByStatus('active'); + const matchingOrders = activeOrders.filter( + (order) => + order.trigger.type === 'event' && + order.trigger.eventType === eventType, + ); + + for (const order of matchingOrders) { + this.logger.log( + `Event match for standing order "${order.name}" (${order.id}) on event ${eventType}`, + ); + await this.executeOrder(order, { + triggerType: 'event', + eventType, + eventId: event.id, + eventPayload: event.payload, + }); + } + }, + ); } /**