fix: release QueryRunner connections to prevent pool exhaustion

TenantAwareRepository.getRepository() was calling createQueryRunner()
without ever releasing it, causing database connection pool exhaustion.
This caused ops-service (and eventually other services) to hang on
all API requests once the pool filled up.

Replaced getRepository() with withRepository() pattern that wraps
operations in try/finally to always release the QueryRunner.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-23 15:55:06 -08:00
parent a6cd3c20d9
commit 3cb9ebd407
19 changed files with 137 additions and 109 deletions

View File

@ -10,12 +10,14 @@ export class SessionRepository extends TenantAwareRepository<AgentSession> {
}
async findByTenant(tenantId: string): Promise<AgentSession[]> {
const repo = await this.getRepository();
return repo.find({ where: { tenantId } as any });
return this.withRepository((repo) =>
repo.find({ where: { tenantId } as any }),
);
}
async findByStatus(status: string): Promise<AgentSession[]> {
const repo = await this.getRepository();
return repo.find({ where: { status } as any });
return this.withRepository((repo) =>
repo.find({ where: { status } as any }),
);
}
}

View File

@ -10,7 +10,8 @@ export class TaskRepository extends TenantAwareRepository<AgentTask> {
}
async findBySessionId(sessionId: string): Promise<AgentTask[]> {
const repo = await this.getRepository();
return repo.find({ where: { sessionId } as any });
return this.withRepository((repo) =>
repo.find({ where: { sessionId } as any }),
);
}
}

View File

@ -21,58 +21,60 @@ export class AuditLogRepository extends TenantAwareRepository<AuditLog> {
}
async queryLogs(filters: AuditLogFilters): Promise<{ data: AuditLog[]; total: number }> {
const repo = await this.getRepository();
const qb = repo.createQueryBuilder('log');
return this.withRepository(async (repo) => {
const qb = repo.createQueryBuilder('log');
if (filters.actionType) {
qb.andWhere('log.actionType = :actionType', { actionType: filters.actionType });
}
if (filters.actionType) {
qb.andWhere('log.actionType = :actionType', { actionType: filters.actionType });
}
if (filters.actorType) {
qb.andWhere('log.actorType = :actorType', { actorType: filters.actorType });
}
if (filters.actorType) {
qb.andWhere('log.actorType = :actorType', { actorType: filters.actorType });
}
if (filters.actorId) {
qb.andWhere('log.actorId = :actorId', { actorId: filters.actorId });
}
if (filters.actorId) {
qb.andWhere('log.actorId = :actorId', { actorId: filters.actorId });
}
if (filters.resourceType) {
qb.andWhere('log.resourceType = :resourceType', { resourceType: filters.resourceType });
}
if (filters.resourceType) {
qb.andWhere('log.resourceType = :resourceType', { resourceType: filters.resourceType });
}
if (filters.from) {
qb.andWhere('log.createdAt >= :from', { from: new Date(filters.from) });
}
if (filters.from) {
qb.andWhere('log.createdAt >= :from', { from: new Date(filters.from) });
}
if (filters.to) {
qb.andWhere('log.createdAt <= :to', { to: new Date(filters.to) });
}
if (filters.to) {
qb.andWhere('log.createdAt <= :to', { to: new Date(filters.to) });
}
qb.orderBy('log.createdAt', 'DESC');
qb.orderBy('log.createdAt', 'DESC');
const page = filters.page ?? 1;
const limit = filters.limit ?? 50;
qb.skip((page - 1) * limit).take(limit);
const page = filters.page ?? 1;
const limit = filters.limit ?? 50;
qb.skip((page - 1) * limit).take(limit);
const [data, total] = await qb.getManyAndCount();
return { data, total };
const [data, total] = await qb.getManyAndCount();
return { data, total };
});
}
async exportLogs(format: 'json' | 'csv'): Promise<AuditLog[] | string> {
const repo = await this.getRepository();
const logs = await repo.find({ order: { createdAt: 'DESC' } as any });
return this.withRepository(async (repo) => {
const logs = await repo.find({ order: { createdAt: 'DESC' } as any });
if (format === 'csv') {
const headers = ['id', 'tenantId', 'actionType', 'actorType', 'actorId', 'resourceType', 'resourceId', 'ipAddress', 'createdAt'];
const rows = logs.map(log =>
headers.map(h => {
const value = (log as any)[h];
return value !== undefined && value !== null ? String(value) : '';
}).join(','),
);
return [headers.join(','), ...rows].join('\n');
}
if (format === 'csv') {
const headers = ['id', 'tenantId', 'actionType', 'actorType', 'actorId', 'resourceType', 'resourceId', 'ipAddress', 'createdAt'];
const rows = logs.map(log =>
headers.map(h => {
const value = (log as any)[h];
return value !== undefined && value !== null ? String(value) : '';
}).join(','),
);
return [headers.join(','), ...rows].join('\n');
}
return logs;
return logs;
});
}
}

View File

@ -10,7 +10,8 @@ export class ContactRepository extends TenantAwareRepository<Contact> {
}
async findByUserId(userId: string): Promise<Contact[]> {
const repo = await this.getRepository();
return repo.find({ where: { userId } as any });
return this.withRepository((repo) =>
repo.find({ where: { userId } as any }),
);
}
}

View File

@ -10,17 +10,18 @@ export class EscalationPolicyRepository extends TenantAwareRepository<Escalation
}
async findBySeverity(severity: string): Promise<EscalationPolicy[]> {
const repo = await this.getRepository();
return repo.find({ where: { severity } as any });
return this.withRepository((repo) =>
repo.find({ where: { severity } as any }),
);
}
async findDefault(): Promise<EscalationPolicy | null> {
const repo = await this.getRepository();
return repo.findOne({ where: { isDefault: true } as any });
return this.withRepository((repo) =>
repo.findOne({ where: { isDefault: true } as any }),
);
}
async delete(id: string): Promise<void> {
const repo = await this.getRepository();
await repo.delete(id);
await this.withRepository((repo) => repo.delete(id));
}
}

View File

@ -10,12 +10,14 @@ export class MessageRepository extends TenantAwareRepository<Message> {
}
async findByDirection(direction: string): Promise<Message[]> {
const repo = await this.getRepository();
return repo.find({ where: { direction } as any });
return this.withRepository((repo) =>
repo.find({ where: { direction } as any }),
);
}
async findByContactId(contactId: string): Promise<Message[]> {
const repo = await this.getRepository();
return repo.find({ where: { contactId } as any });
return this.withRepository((repo) =>
repo.find({ where: { contactId } as any }),
);
}
}

View File

@ -10,7 +10,8 @@ export class ClusterRepository extends TenantAwareRepository<Cluster> {
}
async findByEnvironment(env: string): Promise<Cluster[]> {
const repo = await this.getRepository();
return repo.find({ where: { environment: env } as any });
return this.withRepository((repo) =>
repo.find({ where: { environment: env } as any }),
);
}
}

View File

@ -10,7 +10,8 @@ export class CredentialRepository extends TenantAwareRepository<Credential> {
}
async findByType(type: string): Promise<Credential[]> {
const repo = await this.getRepository();
return repo.find({ where: { type } as any });
return this.withRepository((repo) =>
repo.find({ where: { type } as any }),
);
}
}

View File

@ -10,12 +10,14 @@ export class ServerRepository extends TenantAwareRepository<Server> {
}
async findByEnvironment(env: string): Promise<Server[]> {
const repo = await this.getRepository();
return repo.find({ where: { environment: env } as any });
return this.withRepository((repo) =>
repo.find({ where: { environment: env } as any }),
);
}
async findByClusterId(clusterId: string): Promise<Server[]> {
const repo = await this.getRepository();
return repo.find({ where: { clusterId } as any });
return this.withRepository((repo) =>
repo.find({ where: { clusterId } as any }),
);
}
}

View File

@ -10,12 +10,14 @@ export class AlertEventRepository extends TenantAwareRepository<AlertEvent> {
}
async findByStatus(status: string): Promise<AlertEvent[]> {
const repo = await this.getRepository();
return repo.find({ where: { status } as any });
return this.withRepository((repo) =>
repo.find({ where: { status } as any }),
);
}
async findByRuleId(ruleId: string): Promise<AlertEvent[]> {
const repo = await this.getRepository();
return repo.find({ where: { ruleId } as any });
return this.withRepository((repo) =>
repo.find({ where: { ruleId } as any }),
);
}
}

View File

@ -10,7 +10,8 @@ export class AlertRuleRepository extends TenantAwareRepository<AlertRule> {
}
async findActive(): Promise<AlertRule[]> {
const repo = await this.getRepository();
return repo.find({ where: { isActive: true } as any });
return this.withRepository((repo) =>
repo.find({ where: { isActive: true } as any }),
);
}
}

View File

@ -10,12 +10,14 @@ export class HealthCheckResultRepository extends TenantAwareRepository<HealthChe
}
async findByServerId(serverId: string): Promise<HealthCheckResult[]> {
const repo = await this.getRepository();
return repo.find({ where: { serverId } as any, order: { checkedAt: 'DESC' } as any });
return this.withRepository((repo) =>
repo.find({ where: { serverId } as any, order: { checkedAt: 'DESC' } as any }),
);
}
async findRecent(limit = 50): Promise<HealthCheckResult[]> {
const repo = await this.getRepository();
return repo.find({ order: { checkedAt: 'DESC' } as any, take: limit });
return this.withRepository((repo) =>
repo.find({ order: { checkedAt: 'DESC' } as any, take: limit }),
);
}
}

View File

@ -10,17 +10,19 @@ export class MetricSnapshotRepository extends TenantAwareRepository<MetricSnapsh
}
async findByServerId(serverId: string): Promise<MetricSnapshot[]> {
const repo = await this.getRepository();
return repo.find({ where: { serverId } as any, order: { recordedAt: 'DESC' } as any, take: 100 });
return this.withRepository((repo) =>
repo.find({ where: { serverId } as any, order: { recordedAt: 'DESC' } as any, take: 100 }),
);
}
async findRecent(serverId: string, metricType: string, since: Date): Promise<MetricSnapshot[]> {
const repo = await this.getRepository();
const qb = repo.createQueryBuilder('ms');
qb.where('ms.serverId = :serverId', { serverId });
qb.andWhere('ms.metricType = :metricType', { metricType });
qb.andWhere('ms.recordedAt >= :since', { since });
qb.orderBy('ms.recordedAt', 'ASC');
return qb.getMany();
return this.withRepository((repo) => {
const qb = repo.createQueryBuilder('ms');
qb.where('ms.serverId = :serverId', { serverId });
qb.andWhere('ms.metricType = :metricType', { metricType });
qb.andWhere('ms.recordedAt >= :since', { since });
qb.orderBy('ms.recordedAt', 'ASC');
return qb.getMany();
});
}
}

View File

@ -10,12 +10,14 @@ export class ApprovalRepository extends TenantAwareRepository<ApprovalRequest> {
}
async findPending(): Promise<ApprovalRequest[]> {
const repo = await this.getRepository();
return repo.find({ where: { status: 'pending' } as any });
return this.withRepository((repo) =>
repo.find({ where: { status: 'pending' } as any }),
);
}
async findByTaskId(taskId: string): Promise<ApprovalRequest[]> {
const repo = await this.getRepository();
return repo.find({ where: { taskId } as any });
return this.withRepository((repo) =>
repo.find({ where: { taskId } as any }),
);
}
}

View File

@ -10,7 +10,8 @@ export class RunbookRepository extends TenantAwareRepository<Runbook> {
}
async findActive(): Promise<Runbook[]> {
const repo = await this.getRepository();
return repo.find({ where: { isActive: true } as any });
return this.withRepository((repo) =>
repo.find({ where: { isActive: true } as any }),
);
}
}

View File

@ -10,7 +10,8 @@ export class StandingOrderExecutionRepository extends TenantAwareRepository<Stan
}
async findByOrderId(orderId: string): Promise<StandingOrderExecution[]> {
const repo = await this.getRepository();
return repo.find({ where: { orderId } as any });
return this.withRepository((repo) =>
repo.find({ where: { orderId } as any }),
);
}
}

View File

@ -10,7 +10,8 @@ export class StandingOrderRepository extends TenantAwareRepository<StandingOrder
}
async findByStatus(status: string): Promise<StandingOrder[]> {
const repo = await this.getRepository();
return repo.find({ where: { status } as any });
return this.withRepository((repo) =>
repo.find({ where: { status } as any }),
);
}
}

View File

@ -10,12 +10,14 @@ export class TaskRepository extends TenantAwareRepository<OpsTask> {
}
async findByStatus(status: string): Promise<OpsTask[]> {
const repo = await this.getRepository();
return repo.find({ where: { status } as any });
return this.withRepository((repo) =>
repo.find({ where: { status } as any }),
);
}
async findByCreatedBy(createdBy: string): Promise<OpsTask[]> {
const repo = await this.getRepository();
return repo.find({ where: { createdBy } as any });
return this.withRepository((repo) =>
repo.find({ where: { createdBy } as any }),
);
}
}

View File

@ -7,30 +7,31 @@ export abstract class TenantAwareRepository<T extends ObjectLiteral> {
protected readonly entity: EntityTarget<T>,
) {}
protected async getRepository(): Promise<Repository<T>> {
protected async withRepository<R>(fn: (repo: Repository<T>) => Promise<R>): Promise<R> {
const schema = TenantContextService.getSchemaName();
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.query(`SET search_path TO "${schema}", public`);
return queryRunner.manager.getRepository(this.entity);
try {
await queryRunner.query(`SET search_path TO "${schema}", public`);
const repo = queryRunner.manager.getRepository(this.entity);
return await fn(repo);
} finally {
await queryRunner.release();
}
}
async findById(id: string): Promise<T | null> {
const repo = await this.getRepository();
return repo.findOneBy({ id } as any);
return this.withRepository((repo) => repo.findOneBy({ id } as any));
}
async save(entity: T): Promise<T> {
const repo = await this.getRepository();
return repo.save(entity);
return this.withRepository((repo) => repo.save(entity));
}
async findAll(): Promise<T[]> {
const repo = await this.getRepository();
return repo.find();
return this.withRepository((repo) => repo.find());
}
async remove(entity: T): Promise<T> {
const repo = await this.getRepository();
return repo.remove(entity);
return this.withRepository((repo) => repo.remove(entity));
}
}