113 lines
3.1 KiB
TypeScript
113 lines
3.1 KiB
TypeScript
import { Injectable, OnModuleDestroy } from '@nestjs/common';
|
||
import { ConfigService } from '@nestjs/config';
|
||
import Redis from 'ioredis';
|
||
|
||
@Injectable()
|
||
export class RedisService implements OnModuleDestroy {
|
||
private readonly client: Redis;
|
||
|
||
constructor(private readonly configService: ConfigService) {
|
||
this.client = new Redis({
|
||
host: this.configService.get<string>('REDIS_HOST', 'localhost'),
|
||
port: this.configService.get<number>('REDIS_PORT', 6379),
|
||
password: this.configService.get<string>('REDIS_PASSWORD') || undefined,
|
||
db: this.configService.get<number>('REDIS_DB', 0),
|
||
});
|
||
}
|
||
|
||
async get(key: string): Promise<string | null> {
|
||
return this.client.get(key);
|
||
}
|
||
|
||
async set(key: string, value: string, ttlSeconds?: number): Promise<void> {
|
||
if (ttlSeconds) {
|
||
await this.client.set(key, value, 'EX', ttlSeconds);
|
||
} else {
|
||
await this.client.set(key, value);
|
||
}
|
||
}
|
||
|
||
async delete(key: string): Promise<void> {
|
||
await this.client.del(key);
|
||
}
|
||
|
||
async exists(key: string): Promise<boolean> {
|
||
const result = await this.client.exists(key);
|
||
return result === 1;
|
||
}
|
||
|
||
async incr(key: string): Promise<number> {
|
||
return this.client.incr(key);
|
||
}
|
||
|
||
async expire(key: string, seconds: number): Promise<void> {
|
||
await this.client.expire(key, seconds);
|
||
}
|
||
|
||
/**
|
||
* 原子更新 keygen 状态
|
||
* 使用 Lua 脚本确保状态只能向前推进: pending < generating < deriving < completed
|
||
* failed 状态只有在当前不是 completed 时才能设置
|
||
*
|
||
* @returns true 如果更新成功, false 如果当前状态优先级更高
|
||
*/
|
||
async updateKeygenStatusAtomic(
|
||
key: string,
|
||
newStatusData: string,
|
||
newStatus: string,
|
||
ttlSeconds: number,
|
||
): Promise<boolean> {
|
||
// 状态优先级:completed > failed > deriving > generating > pending
|
||
// 数字越大优先级越高
|
||
const luaScript = `
|
||
local key = KEYS[1]
|
||
local newData = ARGV[1]
|
||
local newStatus = ARGV[2]
|
||
local ttl = tonumber(ARGV[3])
|
||
|
||
-- 定义状态优先级
|
||
local priority = {
|
||
pending = 1,
|
||
generating = 2,
|
||
deriving = 3,
|
||
failed = 4,
|
||
completed = 5
|
||
}
|
||
|
||
local newPriority = priority[newStatus] or 0
|
||
|
||
-- 获取当前状态
|
||
local currentData = redis.call('GET', key)
|
||
if currentData then
|
||
local current = cjson.decode(currentData)
|
||
local currentStatus = current.status or 'pending'
|
||
local currentPriority = priority[currentStatus] or 0
|
||
|
||
-- 只有新状态优先级更高时才更新
|
||
if newPriority <= currentPriority then
|
||
return 0 -- 不更新
|
||
end
|
||
end
|
||
|
||
-- 更新状态
|
||
redis.call('SET', key, newData, 'EX', ttl)
|
||
return 1 -- 更新成功
|
||
`;
|
||
|
||
const result = await this.client.eval(
|
||
luaScript,
|
||
1,
|
||
key,
|
||
newStatusData,
|
||
newStatus,
|
||
ttlSeconds.toString(),
|
||
);
|
||
|
||
return result === 1;
|
||
}
|
||
|
||
onModuleDestroy() {
|
||
this.client.disconnect();
|
||
}
|
||
}
|