iconsulting/packages/services/knowledge-service/src/infrastructure/database/neo4j/neo4j.service.ts

423 lines
11 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import neo4j, { Driver, Session, Transaction } from 'neo4j-driver';
/**
* Neo4j知识图谱服务
* 用于存储和查询用户时间线、实体关系等
*/
@Injectable()
export class Neo4jService implements OnModuleInit, OnModuleDestroy {
private driver: Driver;
constructor(private configService: ConfigService) {}
async onModuleInit() {
const uri = this.configService.get<string>('NEO4J_URI') || 'bolt://localhost:7687';
const user = this.configService.get<string>('NEO4J_USER') || 'neo4j';
const password = this.configService.get<string>('NEO4J_PASSWORD') || 'password';
this.driver = neo4j.driver(uri, neo4j.auth.basic(user, password), {
maxConnectionPoolSize: 50,
connectionAcquisitionTimeout: 30000,
});
// 验证连接
try {
await this.driver.verifyConnectivity();
console.log('[Neo4j] Connected successfully');
// 初始化schema
await this.initializeSchema();
} catch (error) {
console.error('[Neo4j] Connection failed:', error);
}
}
async onModuleDestroy() {
await this.driver?.close();
}
/**
* 获取会话
*/
getSession(): Session {
return this.driver.session();
}
/**
* 执行读查询
*/
async read<T>(query: string, params?: Record<string, unknown>): Promise<T[]> {
const session = this.getSession();
try {
const result = await session.run(query, params);
return result.records.map(record => record.toObject() as T);
} finally {
await session.close();
}
}
/**
* 执行写查询
*/
async write<T>(query: string, params?: Record<string, unknown>): Promise<T[]> {
const session = this.getSession();
try {
const result = await session.executeWrite(tx => tx.run(query, params));
return result.records.map(record => record.toObject() as T);
} finally {
await session.close();
}
}
/**
* 初始化图谱Schema约束和索引
*/
private async initializeSchema() {
const session = this.getSession();
try {
// 用户节点约束
await session.run(`
CREATE CONSTRAINT user_id IF NOT EXISTS
FOR (u:User) REQUIRE u.id IS UNIQUE
`);
// 对话节点约束
await session.run(`
CREATE CONSTRAINT conversation_id IF NOT EXISTS
FOR (c:Conversation) REQUIRE c.id IS UNIQUE
`);
// 知识实体约束
await session.run(`
CREATE CONSTRAINT entity_id IF NOT EXISTS
FOR (e:Entity) REQUIRE e.id IS UNIQUE
`);
// 事件节点约束
await session.run(`
CREATE CONSTRAINT event_id IF NOT EXISTS
FOR (e:Event) REQUIRE e.id IS UNIQUE
`);
// 时间索引
await session.run(`
CREATE INDEX event_timestamp IF NOT EXISTS
FOR (e:Event) ON (e.timestamp)
`);
// 类别索引
await session.run(`
CREATE INDEX entity_category IF NOT EXISTS
FOR (e:Entity) ON (e.category)
`);
console.log('[Neo4j] Schema initialized');
} catch (error) {
// 约束可能已存在,忽略错误
console.log('[Neo4j] Schema initialization skipped (may already exist)');
} finally {
await session.close();
}
}
// ========== 用户时间线操作 ==========
/**
* 创建用户节点
*/
async createUserNode(userId: string, properties?: Record<string, unknown>): Promise<void> {
await this.write(
`
MERGE (u:User {id: $userId})
SET u += $properties, u.updatedAt = datetime()
`,
{ userId, properties: properties || {} },
);
}
/**
* 记录用户事件(时间线)
*/
async recordUserEvent(params: {
userId: string;
eventId: string;
eventType: string;
content: string;
timestamp?: Date;
metadata?: Record<string, unknown>;
}): Promise<void> {
const { userId, eventId, eventType, content, metadata } = params;
const timestamp = params.timestamp || new Date();
await this.write(
`
MATCH (u:User {id: $userId})
CREATE (e:Event {
id: $eventId,
type: $eventType,
content: $content,
timestamp: datetime($timestamp),
metadata: $metadata
})
CREATE (u)-[:HAS_EVENT]->(e)
WITH u, e
OPTIONAL MATCH (u)-[:HAS_EVENT]->(prev:Event)
WHERE prev.timestamp < e.timestamp AND prev.id <> e.id
WITH e, prev
ORDER BY prev.timestamp DESC
LIMIT 1
FOREACH (_ IN CASE WHEN prev IS NOT NULL THEN [1] ELSE [] END |
CREATE (prev)-[:FOLLOWED_BY]->(e)
)
`,
{
userId,
eventId,
eventType,
content,
timestamp: timestamp.toISOString(),
metadata: JSON.stringify(metadata || {}),
},
);
}
/**
* 获取用户时间线
*/
async getUserTimeline(
userId: string,
options?: { limit?: number; beforeDate?: Date; eventTypes?: string[] },
): Promise<Array<{
id: string;
type: string;
content: string;
timestamp: Date;
metadata: Record<string, unknown>;
}>> {
let query = `
MATCH (u:User {id: $userId})-[:HAS_EVENT]->(e:Event)
`;
const params: Record<string, unknown> = { userId };
if (options?.beforeDate) {
query += ` WHERE e.timestamp < datetime($beforeDate)`;
params.beforeDate = options.beforeDate.toISOString();
}
if (options?.eventTypes?.length) {
const typeCondition = options.beforeDate ? ' AND' : ' WHERE';
query += `${typeCondition} e.type IN $eventTypes`;
params.eventTypes = options.eventTypes;
}
query += `
RETURN e.id as id, e.type as type, e.content as content,
e.timestamp as timestamp, e.metadata as metadata
ORDER BY e.timestamp DESC
LIMIT $limit
`;
params.limit = options?.limit || 20;
const results = await this.read<{
id: string;
type: string;
content: string;
timestamp: { toString: () => string };
metadata: string;
}>(query, params);
return results.map(r => ({
id: r.id,
type: r.type,
content: r.content,
timestamp: new Date(r.timestamp.toString()),
metadata: JSON.parse(r.metadata || '{}'),
}));
}
// ========== 知识图谱操作 ==========
/**
* 创建知识实体
*/
async createEntity(params: {
id: string;
name: string;
type: string;
category?: string;
properties?: Record<string, unknown>;
}): Promise<void> {
await this.write(
`
MERGE (e:Entity {id: $id})
SET e.name = $name,
e.type = $type,
e.category = $category,
e += $properties,
e.updatedAt = datetime()
`,
{
id: params.id,
name: params.name,
type: params.type,
category: params.category || '',
properties: params.properties || {},
},
);
}
/**
* 创建实体关系
*/
async createRelation(params: {
fromId: string;
toId: string;
relationType: string;
properties?: Record<string, unknown>;
}): Promise<void> {
// 动态创建关系类型
const relationQuery = `
MATCH (from:Entity {id: $fromId})
MATCH (to:Entity {id: $toId})
MERGE (from)-[r:${params.relationType}]->(to)
SET r += $properties, r.updatedAt = datetime()
`;
await this.write(relationQuery, {
fromId: params.fromId,
toId: params.toId,
properties: params.properties || {},
});
}
/**
* 查询相关实体
*/
async findRelatedEntities(
entityId: string,
options?: { relationTypes?: string[]; maxDepth?: number; limit?: number },
): Promise<Array<{
entity: { id: string; name: string; type: string };
relation: string;
depth: number;
}>> {
const maxDepth = options?.maxDepth || 2;
const limit = options?.limit || 20;
let relationPattern = '*1..' + maxDepth;
if (options?.relationTypes?.length) {
relationPattern = options.relationTypes.join('|') + relationPattern;
}
const query = `
MATCH path = (start:Entity {id: $entityId})-[r:${relationPattern}]-(related:Entity)
WHERE start <> related
RETURN related.id as id, related.name as name, related.type as type,
type(r) as relation, length(path) as depth
ORDER BY depth ASC
LIMIT $limit
`;
const results = await this.read<{
id: string;
name: string;
type: string;
relation: string;
depth: number;
}>(query, { entityId, limit });
return results.map(r => ({
entity: { id: r.id, name: r.name, type: r.type },
relation: r.relation,
depth: r.depth,
}));
}
/**
* 搜索实体
*/
async searchEntities(
keyword: string,
options?: { category?: string; type?: string; limit?: number },
): Promise<Array<{ id: string; name: string; type: string; score: number }>> {
let query = `
MATCH (e:Entity)
WHERE e.name CONTAINS $keyword
`;
const params: Record<string, unknown> = { keyword };
if (options?.category) {
query += ` AND e.category = $category`;
params.category = options.category;
}
if (options?.type) {
query += ` AND e.type = $type`;
params.type = options.type;
}
query += `
RETURN e.id as id, e.name as name, e.type as type,
CASE WHEN e.name STARTS WITH $keyword THEN 1.0 ELSE 0.5 END as score
ORDER BY score DESC
LIMIT $limit
`;
params.limit = options?.limit || 10;
return this.read(query, params);
}
// ========== 对话关联 ==========
/**
* 关联对话与实体
*/
async linkConversationToEntity(
conversationId: string,
entityId: string,
mentionType: 'MENTIONED' | 'DISCUSSED' | 'ASKED_ABOUT',
): Promise<void> {
await this.write(
`
MERGE (c:Conversation {id: $conversationId})
WITH c
MATCH (e:Entity {id: $entityId})
MERGE (c)-[r:${mentionType}]->(e)
SET r.timestamp = datetime()
`,
{ conversationId, entityId },
);
}
/**
* 获取对话相关实体
*/
async getConversationEntities(conversationId: string): Promise<Array<{
entity: { id: string; name: string; type: string };
mentionType: string;
}>> {
const results = await this.read<{
id: string;
name: string;
type: string;
mentionType: string;
}>(
`
MATCH (c:Conversation {id: $conversationId})-[r]->(e:Entity)
RETURN e.id as id, e.name as name, e.type as type, type(r) as mentionType
`,
{ conversationId },
);
return results.map(r => ({
entity: { id: r.id, name: r.name, type: r.type },
mentionType: r.mentionType,
}));
}
}