feat(wallet-service): add Redis caching for wallet queries

- Add ioredis dependency for Redis connectivity
- Create Redis service and module with DB 1 configuration
- Implement WalletCacheService for wallet data caching (60s TTL)
- Integrate cache-aside pattern in getMyWallet query
- Add cache invalidation on all wallet mutations:
  - handleDeposit, deductForPlanting, addRewards
  - claimRewards, settleRewards

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-06 18:37:13 -08:00
parent c459387c42
commit ba91a89b16
9 changed files with 299 additions and 4 deletions

View File

@ -44,7 +44,8 @@
"passport-jwt": "^4.0.1",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.8.1",
"uuid": "^9.0.0"
"uuid": "^9.0.0",
"ioredis": "^5.3.2"
},
"devDependencies": {
"@nestjs/cli": "^10.0.0",

View File

@ -1,4 +1,4 @@
import { Injectable, Inject } from '@nestjs/common';
import { Injectable, Inject, Logger } from '@nestjs/common';
import {
IWalletAccountRepository, WALLET_ACCOUNT_REPOSITORY,
ILedgerEntryRepository, LEDGER_ENTRY_REPOSITORY,
@ -15,6 +15,7 @@ import {
} from '@/application/commands';
import { GetMyWalletQuery, GetMyLedgerQuery } from '@/application/queries';
import { DuplicateTransactionError, WalletNotFoundError } from '@/shared/exceptions/domain.exception';
import { WalletCacheService } from '@/infrastructure/redis';
export interface WalletDTO {
walletId: string;
@ -63,6 +64,8 @@ export interface PaginatedLedgerDTO {
@Injectable()
export class WalletApplicationService {
private readonly logger = new Logger(WalletApplicationService.name);
constructor(
@Inject(WALLET_ACCOUNT_REPOSITORY)
private readonly walletRepo: IWalletAccountRepository,
@ -72,6 +75,7 @@ export class WalletApplicationService {
private readonly depositRepo: IDepositOrderRepository,
@Inject(SETTLEMENT_ORDER_REPOSITORY)
private readonly settlementRepo: ISettlementOrderRepository,
private readonly walletCacheService: WalletCacheService,
) {}
// =============== Commands ===============
@ -117,6 +121,9 @@ export class WalletApplicationService {
memo: `Deposit from ${command.chainType}`,
});
await this.ledgerRepo.save(ledgerEntry);
// Invalidate wallet cache after deposit
await this.walletCacheService.invalidateWallet(userId);
}
async deductForPlanting(command: DeductForPlantingCommand): Promise<void> {
@ -142,6 +149,9 @@ export class WalletApplicationService {
memo: 'Plant payment',
});
await this.ledgerRepo.save(ledgerEntry);
// Invalidate wallet cache after deduction
await this.walletCacheService.invalidateWallet(userId);
}
async addRewards(command: AddRewardsCommand): Promise<void> {
@ -180,6 +190,9 @@ export class WalletApplicationService {
});
await this.ledgerRepo.save(hpEntry);
}
// Invalidate wallet cache after adding rewards
await this.walletCacheService.invalidateWallet(userId);
}
async claimRewards(command: ClaimRewardsCommand): Promise<void> {
@ -217,6 +230,9 @@ export class WalletApplicationService {
});
await this.ledgerRepo.save(hpEntry);
}
// Invalidate wallet cache after claiming rewards
await this.walletCacheService.invalidateWallet(userId);
}
async settleRewards(command: SettleRewardsCommand): Promise<string> {
@ -266,6 +282,9 @@ export class WalletApplicationService {
});
await this.ledgerRepo.save(ledgerEntry);
// Invalidate wallet cache after settlement
await this.walletCacheService.invalidateWallet(userId);
return savedOrder.id.toString();
}
@ -273,9 +292,25 @@ export class WalletApplicationService {
async getMyWallet(query: GetMyWalletQuery): Promise<WalletDTO> {
const userId = BigInt(query.userId);
// Try to get from cache first
const cached = await this.walletCacheService.getWallet(userId);
if (cached) {
this.logger.debug(`Returning cached wallet for user: ${userId}`);
return {
walletId: cached.walletId,
userId: cached.userId,
balances: cached.balances,
hashpower: cached.hashpower,
rewards: cached.rewards,
status: cached.status,
};
}
// Cache miss - fetch from database
const wallet = await this.walletRepo.getOrCreate(userId);
return {
const walletDTO: WalletDTO = {
walletId: wallet.walletId.toString(),
userId: wallet.userId.toString(),
balances: {
@ -314,6 +349,13 @@ export class WalletApplicationService {
},
status: wallet.status,
};
// Store in cache (fire and forget, don't block response)
this.walletCacheService.setWallet(userId, walletDTO).catch((err) => {
this.logger.warn(`Failed to cache wallet for user ${userId}: ${err.message}`);
});
return walletDTO;
}
async getMyLedger(query: GetMyLedgerQuery): Promise<PaginatedLedgerDTO> {

View File

@ -0,0 +1 @@
export * from './redis.config';

View File

@ -0,0 +1,15 @@
import { registerAs } from '@nestjs/config';
export const redisConfig = registerAs('redis', () => ({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379', 10),
password: process.env.REDIS_PASSWORD || undefined,
db: parseInt(process.env.REDIS_DB || '1', 10), // wallet-service uses DB 1
}));
export interface RedisConfig {
host: string;
port: number;
password?: string;
db: number;
}

View File

@ -12,6 +12,7 @@ import {
DEPOSIT_ORDER_REPOSITORY,
SETTLEMENT_ORDER_REPOSITORY,
} from '@/domain/repositories';
import { RedisModule } from './redis';
const repositories = [
{
@ -34,7 +35,8 @@ const repositories = [
@Global()
@Module({
imports: [RedisModule],
providers: [PrismaService, ...repositories],
exports: [PrismaService, ...repositories],
exports: [PrismaService, RedisModule, ...repositories],
})
export class InfrastructureModule {}

View File

@ -0,0 +1,3 @@
export * from './redis.service';
export * from './redis.module';
export * from './wallet-cache.service';

View File

@ -0,0 +1,12 @@
import { Module, Global } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { RedisService } from './redis.service';
import { WalletCacheService } from './wallet-cache.service';
@Global()
@Module({
imports: [ConfigModule],
providers: [RedisService, WalletCacheService],
exports: [RedisService, WalletCacheService],
})
export class RedisModule {}

View File

@ -0,0 +1,90 @@
import { Injectable, OnModuleDestroy, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import Redis from 'ioredis';
@Injectable()
export class RedisService implements OnModuleDestroy {
private readonly logger = new Logger(RedisService.name);
private readonly client: Redis;
constructor(private readonly configService: ConfigService) {
const host = this.configService.get<string>('REDIS_HOST') || 'localhost';
const port = this.configService.get<number>('REDIS_PORT') || 6379;
const password = this.configService.get<string>('REDIS_PASSWORD');
const db = this.configService.get<number>('REDIS_DB') || 1;
this.client = new Redis({
host,
port,
password: password || undefined,
db,
});
this.client.on('connect', () => {
this.logger.log(`Connected to Redis at ${host}:${port}, DB: ${db}`);
});
this.client.on('error', (err) => {
this.logger.error('Redis connection error:', err);
});
}
async onModuleDestroy() {
await this.client.quit();
this.logger.log('Redis connection closed');
}
getClient(): Redis {
return this.client;
}
async get(key: string): Promise<string | null> {
return this.client.get(key);
}
async set(key: string, value: string, ttlSeconds?: number): Promise<void> {
if (ttlSeconds) {
await this.client.setex(key, ttlSeconds, value);
} else {
await this.client.set(key, value);
}
}
async del(key: string): Promise<void> {
await this.client.del(key);
}
async delByPattern(pattern: string): Promise<number> {
const keys = await this.client.keys(pattern);
if (keys.length === 0) {
return 0;
}
return this.client.del(...keys);
}
async exists(key: string): Promise<boolean> {
const result = await this.client.exists(key);
return result === 1;
}
async setJson<T>(key: string, value: T, ttlSeconds?: number): Promise<void> {
await this.set(key, JSON.stringify(value), ttlSeconds);
}
async getJson<T>(key: string): Promise<T | null> {
const value = await this.get(key);
if (!value) {
return null;
}
try {
return JSON.parse(value) as T;
} catch {
this.logger.warn(`Failed to parse JSON for key: ${key}`);
return null;
}
}
async ttl(key: string): Promise<number> {
return this.client.ttl(key);
}
}

View File

@ -0,0 +1,129 @@
import { Injectable, Logger } from '@nestjs/common';
import { RedisService } from './redis.service';
/**
*
*/
export interface CachedWalletDTO {
walletId: string;
userId: string;
balances: {
usdt: { available: number; frozen: number };
dst: { available: number; frozen: number };
bnb: { available: number; frozen: number };
og: { available: number; frozen: number };
rwad: { available: number; frozen: number };
};
hashpower: number;
rewards: {
pendingUsdt: number;
pendingHashpower: number;
pendingExpireAt: string | null;
settleableUsdt: number;
settleableHashpower: number;
settledTotalUsdt: number;
settledTotalHashpower: number;
expiredTotalUsdt: number;
expiredTotalHashpower: number;
};
status: string;
cachedAt: string;
}
@Injectable()
export class WalletCacheService {
private readonly logger = new Logger(WalletCacheService.name);
// 缓存前缀
private readonly CACHE_PREFIX = 'wallet:user:';
// 默认缓存时间60秒钱包数据变动时会主动失效
private readonly DEFAULT_TTL_SECONDS = 60;
constructor(private readonly redisService: RedisService) {}
/**
* key
*/
private getCacheKey(userId: string | bigint): string {
return `${this.CACHE_PREFIX}${userId.toString()}`;
}
/**
*
*/
async getWallet(userId: string | bigint): Promise<CachedWalletDTO | null> {
const key = this.getCacheKey(userId);
const cached = await this.redisService.getJson<CachedWalletDTO>(key);
if (cached) {
this.logger.debug(`Cache HIT for wallet: ${userId}`);
} else {
this.logger.debug(`Cache MISS for wallet: ${userId}`);
}
return cached;
}
/**
*
*/
async setWallet(
userId: string | bigint,
wallet: Omit<CachedWalletDTO, 'cachedAt'>,
ttlSeconds: number = this.DEFAULT_TTL_SECONDS,
): Promise<void> {
const key = this.getCacheKey(userId);
const cachedData: CachedWalletDTO = {
...wallet,
cachedAt: new Date().toISOString(),
};
await this.redisService.setJson(key, cachedData, ttlSeconds);
this.logger.debug(`Cached wallet for user: ${userId}, TTL: ${ttlSeconds}s`);
}
/**
* 使
*/
async invalidateWallet(userId: string | bigint): Promise<void> {
const key = this.getCacheKey(userId);
await this.redisService.del(key);
this.logger.debug(`Invalidated wallet cache for user: ${userId}`);
}
/**
* 使
*/
async invalidateWallets(userIds: (string | bigint)[]): Promise<void> {
for (const userId of userIds) {
await this.invalidateWallet(userId);
}
}
/**
* 使
*/
async invalidateAllWallets(): Promise<number> {
const pattern = `${this.CACHE_PREFIX}*`;
const count = await this.redisService.delByPattern(pattern);
this.logger.warn(`Invalidated all wallet caches, count: ${count}`);
return count;
}
/**
*
*/
async hasCache(userId: string | bigint): Promise<boolean> {
const key = this.getCacheKey(userId);
return this.redisService.exists(key);
}
/**
* TTL
*/
async getCacheTTL(userId: string | bigint): Promise<number> {
const key = this.getCacheKey(userId);
return this.redisService.ttl(key);
}
}