it0/packages/shared/events/src/event-bus.ts

70 lines
1.8 KiB
TypeScript

import Redis from 'ioredis';
export class RedisEventBus {
private publisher: Redis;
private subscriber: Redis;
private handlers = new Map<string, ((event: any) => void)[]>();
constructor(redisUrl: string) {
this.publisher = new Redis(redisUrl);
this.subscriber = new Redis(redisUrl);
}
async publish(stream: string, event: Record<string, any>): Promise<string> {
const id = await this.publisher.xadd(
stream,
'*',
'data',
JSON.stringify(event),
);
return id!;
}
async subscribe(
stream: string,
group: string,
consumer: string,
handler: (event: any) => Promise<void>,
): Promise<void> {
// Create consumer group if not exists
try {
await this.subscriber.xgroup('CREATE', stream, group, '0', 'MKSTREAM');
} catch {
// Group already exists
}
const processMessages = async () => {
while (true) {
try {
const results = await this.subscriber.xreadgroup(
'GROUP', group, consumer,
'COUNT', '10',
'BLOCK', '5000',
'STREAMS', stream, '>',
);
if (results) {
for (const [, messages] of results as [string, [string, string[]][]][]) {
for (const [id, fields] of messages) {
const data = JSON.parse(fields[1]);
await handler(data);
await this.subscriber.xack(stream, group, id);
}
}
}
} catch (error) {
console.error(`Event bus error on stream ${stream}:`, error);
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
};
processMessages();
}
async disconnect(): Promise<void> {
await this.publisher.quit();
await this.subscriber.quit();
}
}