fix: Scheduler 缺少 tenant 上下文导致 ops-service 卡死
根因: @Cron 定时任务在 HTTP 请求上下文之外运行, TenantAwareRepository 需要 AsyncLocalStorage 中的 tenant 信息, 每分钟抛 "Tenant context not initialized" 错误。 修复: - scanCronOrders: 查 it0_shared.tenants 获取所有活跃租户, 在 TenantContextService.run() 上下文中逐一执行 - handleEventTrigger: 从 Redis event 中提取 tenantId, 同样包裹在 TenantContextService.run() 中 - 每个 tenant 循环加 try/catch 防止单个租户出错影响其他 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
666b173906
commit
840318f449
|
|
@ -3,8 +3,9 @@ import { ConfigService } from '@nestjs/config';
|
||||||
import { HttpService } from '@nestjs/axios';
|
import { HttpService } from '@nestjs/axios';
|
||||||
import { Cron } from '@nestjs/schedule';
|
import { Cron } from '@nestjs/schedule';
|
||||||
import { firstValueFrom } from 'rxjs';
|
import { firstValueFrom } from 'rxjs';
|
||||||
|
import { DataSource } from 'typeorm';
|
||||||
import { RedisEventBus } from '@it0/events';
|
import { RedisEventBus } from '@it0/events';
|
||||||
import { EventPatterns } from '@it0/common';
|
import { EventPatterns, TenantContextService } from '@it0/common';
|
||||||
import { StandingOrderRepository } from '../../infrastructure/repositories/standing-order.repository';
|
import { StandingOrderRepository } from '../../infrastructure/repositories/standing-order.repository';
|
||||||
import { StandingOrderExecutionRepository } from '../../infrastructure/repositories/standing-order-execution.repository';
|
import { StandingOrderExecutionRepository } from '../../infrastructure/repositories/standing-order-execution.repository';
|
||||||
import { StandingOrder } from '../entities/standing-order.entity';
|
import { StandingOrder } from '../entities/standing-order.entity';
|
||||||
|
|
@ -21,6 +22,7 @@ export class StandingOrderExecutorService implements OnModuleInit {
|
||||||
private readonly eventBus: RedisEventBus,
|
private readonly eventBus: RedisEventBus,
|
||||||
private readonly configService: ConfigService,
|
private readonly configService: ConfigService,
|
||||||
private readonly httpService: HttpService,
|
private readonly httpService: HttpService,
|
||||||
|
private readonly dataSource: DataSource,
|
||||||
) {
|
) {
|
||||||
this.agentServiceUrl = this.configService.get<string>(
|
this.agentServiceUrl = this.configService.get<string>(
|
||||||
'AGENT_SERVICE_URL',
|
'AGENT_SERVICE_URL',
|
||||||
|
|
@ -54,52 +56,111 @@ export class StandingOrderExecutorService implements OnModuleInit {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cron scanning: runs every minute to evaluate cron-based standing orders.
|
* Cron scanning: runs every minute to evaluate cron-based standing orders.
|
||||||
|
* Must iterate all tenants since cron jobs run outside HTTP request context.
|
||||||
*/
|
*/
|
||||||
@Cron('*/1 * * * *')
|
@Cron('*/1 * * * *')
|
||||||
async scanCronOrders(): Promise<void> {
|
async scanCronOrders(): Promise<void> {
|
||||||
const activeOrders = await this.standingOrderRepo.findByStatus('active');
|
let tenants: { id: string; name: string }[];
|
||||||
const cronOrders = activeOrders.filter(
|
try {
|
||||||
(order) => order.trigger.type === 'cron' && order.trigger.cronExpression,
|
const result = await this.dataSource.query(
|
||||||
);
|
`SELECT id, name FROM it0_shared.tenants WHERE status = 'active'`,
|
||||||
|
);
|
||||||
|
tenants = result;
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(`Failed to query tenants: ${err}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tenants.length === 0) return;
|
||||||
|
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
for (const order of cronOrders) {
|
for (const tenant of tenants) {
|
||||||
if (this.cronMatchesCurrentMinute(order.trigger.cronExpression!, now)) {
|
try {
|
||||||
this.logger.log(`Cron match for standing order "${order.name}" (${order.id})`);
|
await TenantContextService.run(
|
||||||
await this.executeOrder(order, {
|
{
|
||||||
triggerType: 'cron',
|
tenantId: tenant.id,
|
||||||
cronExpression: order.trigger.cronExpression,
|
tenantName: tenant.name,
|
||||||
firedAt: now.toISOString(),
|
plan: 'pro',
|
||||||
});
|
schemaName: `it0_t_${tenant.id}`,
|
||||||
|
},
|
||||||
|
async () => {
|
||||||
|
const activeOrders =
|
||||||
|
await this.standingOrderRepo.findByStatus('active');
|
||||||
|
const cronOrders = activeOrders.filter(
|
||||||
|
(order) =>
|
||||||
|
order.trigger.type === 'cron' && order.trigger.cronExpression,
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const order of cronOrders) {
|
||||||
|
if (
|
||||||
|
this.cronMatchesCurrentMinute(
|
||||||
|
order.trigger.cronExpression!,
|
||||||
|
now,
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
this.logger.log(
|
||||||
|
`Cron match for standing order "${order.name}" (${order.id}) tenant=${tenant.id}`,
|
||||||
|
);
|
||||||
|
await this.executeOrder(order, {
|
||||||
|
triggerType: 'cron',
|
||||||
|
cronExpression: order.trigger.cronExpression,
|
||||||
|
firedAt: now.toISOString(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(
|
||||||
|
`Scheduler error for tenant ${tenant.id}: ${err}`,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles an incoming event and matches it against event-triggered standing orders.
|
* Handles an incoming event and matches it against event-triggered standing orders.
|
||||||
|
* Events from Redis carry tenantId — wrap in tenant context.
|
||||||
*/
|
*/
|
||||||
private async handleEventTrigger(
|
private async handleEventTrigger(
|
||||||
eventType: string,
|
eventType: string,
|
||||||
event: Record<string, any>,
|
event: Record<string, any>,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const activeOrders = await this.standingOrderRepo.findByStatus('active');
|
const tenantId = event.tenantId;
|
||||||
const matchingOrders = activeOrders.filter(
|
if (!tenantId) {
|
||||||
(order) =>
|
this.logger.warn(`Event ${event.id} has no tenantId, skipping`);
|
||||||
order.trigger.type === 'event' &&
|
return;
|
||||||
order.trigger.eventType === eventType,
|
|
||||||
);
|
|
||||||
|
|
||||||
for (const order of matchingOrders) {
|
|
||||||
this.logger.log(
|
|
||||||
`Event match for standing order "${order.name}" (${order.id}) on event ${eventType}`,
|
|
||||||
);
|
|
||||||
await this.executeOrder(order, {
|
|
||||||
triggerType: 'event',
|
|
||||||
eventType,
|
|
||||||
eventId: event.id,
|
|
||||||
eventPayload: event.payload,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await TenantContextService.run(
|
||||||
|
{
|
||||||
|
tenantId,
|
||||||
|
tenantName: tenantId,
|
||||||
|
plan: 'pro',
|
||||||
|
schemaName: `it0_t_${tenantId}`,
|
||||||
|
},
|
||||||
|
async () => {
|
||||||
|
const activeOrders =
|
||||||
|
await this.standingOrderRepo.findByStatus('active');
|
||||||
|
const matchingOrders = activeOrders.filter(
|
||||||
|
(order) =>
|
||||||
|
order.trigger.type === 'event' &&
|
||||||
|
order.trigger.eventType === eventType,
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const order of matchingOrders) {
|
||||||
|
this.logger.log(
|
||||||
|
`Event match for standing order "${order.name}" (${order.id}) on event ${eventType}`,
|
||||||
|
);
|
||||||
|
await this.executeOrder(order, {
|
||||||
|
triggerType: 'event',
|
||||||
|
eventType,
|
||||||
|
eventId: event.id,
|
||||||
|
eventPayload: event.payload,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue