423 lines
11 KiB
TypeScript
423 lines
11 KiB
TypeScript
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
||
import { ConfigService } from '@nestjs/config';
|
||
import neo4j, { Driver, Session, Transaction } from 'neo4j-driver';
|
||
|
||
/**
|
||
* Neo4j知识图谱服务
|
||
* 用于存储和查询用户时间线、实体关系等
|
||
*/
|
||
@Injectable()
|
||
export class Neo4jService implements OnModuleInit, OnModuleDestroy {
|
||
private driver: Driver;
|
||
|
||
constructor(private configService: ConfigService) {}
|
||
|
||
async onModuleInit() {
|
||
const uri = this.configService.get<string>('NEO4J_URI') || 'bolt://localhost:7687';
|
||
const user = this.configService.get<string>('NEO4J_USER') || 'neo4j';
|
||
const password = this.configService.get<string>('NEO4J_PASSWORD') || 'password';
|
||
|
||
this.driver = neo4j.driver(uri, neo4j.auth.basic(user, password), {
|
||
maxConnectionPoolSize: 50,
|
||
connectionAcquisitionTimeout: 30000,
|
||
});
|
||
|
||
// 验证连接
|
||
try {
|
||
await this.driver.verifyConnectivity();
|
||
console.log('[Neo4j] Connected successfully');
|
||
|
||
// 初始化schema
|
||
await this.initializeSchema();
|
||
} catch (error) {
|
||
console.error('[Neo4j] Connection failed:', error);
|
||
}
|
||
}
|
||
|
||
async onModuleDestroy() {
|
||
await this.driver?.close();
|
||
}
|
||
|
||
/**
|
||
* 获取会话
|
||
*/
|
||
getSession(): Session {
|
||
return this.driver.session();
|
||
}
|
||
|
||
/**
|
||
* 执行读查询
|
||
*/
|
||
async read<T>(query: string, params?: Record<string, unknown>): Promise<T[]> {
|
||
const session = this.getSession();
|
||
try {
|
||
const result = await session.run(query, params);
|
||
return result.records.map(record => record.toObject() as T);
|
||
} finally {
|
||
await session.close();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 执行写查询
|
||
*/
|
||
async write<T>(query: string, params?: Record<string, unknown>): Promise<T[]> {
|
||
const session = this.getSession();
|
||
try {
|
||
const result = await session.executeWrite(tx => tx.run(query, params));
|
||
return result.records.map(record => record.toObject() as T);
|
||
} finally {
|
||
await session.close();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 初始化图谱Schema(约束和索引)
|
||
*/
|
||
private async initializeSchema() {
|
||
const session = this.getSession();
|
||
try {
|
||
// 用户节点约束
|
||
await session.run(`
|
||
CREATE CONSTRAINT user_id IF NOT EXISTS
|
||
FOR (u:User) REQUIRE u.id IS UNIQUE
|
||
`);
|
||
|
||
// 对话节点约束
|
||
await session.run(`
|
||
CREATE CONSTRAINT conversation_id IF NOT EXISTS
|
||
FOR (c:Conversation) REQUIRE c.id IS UNIQUE
|
||
`);
|
||
|
||
// 知识实体约束
|
||
await session.run(`
|
||
CREATE CONSTRAINT entity_id IF NOT EXISTS
|
||
FOR (e:Entity) REQUIRE e.id IS UNIQUE
|
||
`);
|
||
|
||
// 事件节点约束
|
||
await session.run(`
|
||
CREATE CONSTRAINT event_id IF NOT EXISTS
|
||
FOR (e:Event) REQUIRE e.id IS UNIQUE
|
||
`);
|
||
|
||
// 时间索引
|
||
await session.run(`
|
||
CREATE INDEX event_timestamp IF NOT EXISTS
|
||
FOR (e:Event) ON (e.timestamp)
|
||
`);
|
||
|
||
// 类别索引
|
||
await session.run(`
|
||
CREATE INDEX entity_category IF NOT EXISTS
|
||
FOR (e:Entity) ON (e.category)
|
||
`);
|
||
|
||
console.log('[Neo4j] Schema initialized');
|
||
} catch (error) {
|
||
// 约束可能已存在,忽略错误
|
||
console.log('[Neo4j] Schema initialization skipped (may already exist)');
|
||
} finally {
|
||
await session.close();
|
||
}
|
||
}
|
||
|
||
// ========== 用户时间线操作 ==========
|
||
|
||
/**
|
||
* 创建用户节点
|
||
*/
|
||
async createUserNode(userId: string, properties?: Record<string, unknown>): Promise<void> {
|
||
await this.write(
|
||
`
|
||
MERGE (u:User {id: $userId})
|
||
SET u += $properties, u.updatedAt = datetime()
|
||
`,
|
||
{ userId, properties: properties || {} },
|
||
);
|
||
}
|
||
|
||
/**
|
||
* 记录用户事件(时间线)
|
||
*/
|
||
async recordUserEvent(params: {
|
||
userId: string;
|
||
eventId: string;
|
||
eventType: string;
|
||
content: string;
|
||
timestamp?: Date;
|
||
metadata?: Record<string, unknown>;
|
||
}): Promise<void> {
|
||
const { userId, eventId, eventType, content, metadata } = params;
|
||
const timestamp = params.timestamp || new Date();
|
||
|
||
await this.write(
|
||
`
|
||
MATCH (u:User {id: $userId})
|
||
CREATE (e:Event {
|
||
id: $eventId,
|
||
type: $eventType,
|
||
content: $content,
|
||
timestamp: datetime($timestamp),
|
||
metadata: $metadata
|
||
})
|
||
CREATE (u)-[:HAS_EVENT]->(e)
|
||
|
||
WITH u, e
|
||
OPTIONAL MATCH (u)-[:HAS_EVENT]->(prev:Event)
|
||
WHERE prev.timestamp < e.timestamp AND prev.id <> e.id
|
||
WITH e, prev
|
||
ORDER BY prev.timestamp DESC
|
||
LIMIT 1
|
||
FOREACH (_ IN CASE WHEN prev IS NOT NULL THEN [1] ELSE [] END |
|
||
CREATE (prev)-[:FOLLOWED_BY]->(e)
|
||
)
|
||
`,
|
||
{
|
||
userId,
|
||
eventId,
|
||
eventType,
|
||
content,
|
||
timestamp: timestamp.toISOString(),
|
||
metadata: JSON.stringify(metadata || {}),
|
||
},
|
||
);
|
||
}
|
||
|
||
/**
|
||
* 获取用户时间线
|
||
*/
|
||
async getUserTimeline(
|
||
userId: string,
|
||
options?: { limit?: number; beforeDate?: Date; eventTypes?: string[] },
|
||
): Promise<Array<{
|
||
id: string;
|
||
type: string;
|
||
content: string;
|
||
timestamp: Date;
|
||
metadata: Record<string, unknown>;
|
||
}>> {
|
||
let query = `
|
||
MATCH (u:User {id: $userId})-[:HAS_EVENT]->(e:Event)
|
||
`;
|
||
|
||
const params: Record<string, unknown> = { userId };
|
||
|
||
if (options?.beforeDate) {
|
||
query += ` WHERE e.timestamp < datetime($beforeDate)`;
|
||
params.beforeDate = options.beforeDate.toISOString();
|
||
}
|
||
|
||
if (options?.eventTypes?.length) {
|
||
const typeCondition = options.beforeDate ? ' AND' : ' WHERE';
|
||
query += `${typeCondition} e.type IN $eventTypes`;
|
||
params.eventTypes = options.eventTypes;
|
||
}
|
||
|
||
query += `
|
||
RETURN e.id as id, e.type as type, e.content as content,
|
||
e.timestamp as timestamp, e.metadata as metadata
|
||
ORDER BY e.timestamp DESC
|
||
LIMIT $limit
|
||
`;
|
||
params.limit = options?.limit || 20;
|
||
|
||
const results = await this.read<{
|
||
id: string;
|
||
type: string;
|
||
content: string;
|
||
timestamp: { toString: () => string };
|
||
metadata: string;
|
||
}>(query, params);
|
||
|
||
return results.map(r => ({
|
||
id: r.id,
|
||
type: r.type,
|
||
content: r.content,
|
||
timestamp: new Date(r.timestamp.toString()),
|
||
metadata: JSON.parse(r.metadata || '{}'),
|
||
}));
|
||
}
|
||
|
||
// ========== 知识图谱操作 ==========
|
||
|
||
/**
|
||
* 创建知识实体
|
||
*/
|
||
async createEntity(params: {
|
||
id: string;
|
||
name: string;
|
||
type: string;
|
||
category?: string;
|
||
properties?: Record<string, unknown>;
|
||
}): Promise<void> {
|
||
await this.write(
|
||
`
|
||
MERGE (e:Entity {id: $id})
|
||
SET e.name = $name,
|
||
e.type = $type,
|
||
e.category = $category,
|
||
e += $properties,
|
||
e.updatedAt = datetime()
|
||
`,
|
||
{
|
||
id: params.id,
|
||
name: params.name,
|
||
type: params.type,
|
||
category: params.category || '',
|
||
properties: params.properties || {},
|
||
},
|
||
);
|
||
}
|
||
|
||
/**
|
||
* 创建实体关系
|
||
*/
|
||
async createRelation(params: {
|
||
fromId: string;
|
||
toId: string;
|
||
relationType: string;
|
||
properties?: Record<string, unknown>;
|
||
}): Promise<void> {
|
||
// 动态创建关系类型
|
||
const relationQuery = `
|
||
MATCH (from:Entity {id: $fromId})
|
||
MATCH (to:Entity {id: $toId})
|
||
MERGE (from)-[r:${params.relationType}]->(to)
|
||
SET r += $properties, r.updatedAt = datetime()
|
||
`;
|
||
|
||
await this.write(relationQuery, {
|
||
fromId: params.fromId,
|
||
toId: params.toId,
|
||
properties: params.properties || {},
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 查询相关实体
|
||
*/
|
||
async findRelatedEntities(
|
||
entityId: string,
|
||
options?: { relationTypes?: string[]; maxDepth?: number; limit?: number },
|
||
): Promise<Array<{
|
||
entity: { id: string; name: string; type: string };
|
||
relation: string;
|
||
depth: number;
|
||
}>> {
|
||
const maxDepth = options?.maxDepth || 2;
|
||
const limit = options?.limit || 20;
|
||
|
||
let relationPattern = '*1..' + maxDepth;
|
||
if (options?.relationTypes?.length) {
|
||
relationPattern = options.relationTypes.join('|') + relationPattern;
|
||
}
|
||
|
||
const query = `
|
||
MATCH path = (start:Entity {id: $entityId})-[r:${relationPattern}]-(related:Entity)
|
||
WHERE start <> related
|
||
RETURN related.id as id, related.name as name, related.type as type,
|
||
type(r) as relation, length(path) as depth
|
||
ORDER BY depth ASC
|
||
LIMIT $limit
|
||
`;
|
||
|
||
const results = await this.read<{
|
||
id: string;
|
||
name: string;
|
||
type: string;
|
||
relation: string;
|
||
depth: number;
|
||
}>(query, { entityId, limit });
|
||
|
||
return results.map(r => ({
|
||
entity: { id: r.id, name: r.name, type: r.type },
|
||
relation: r.relation,
|
||
depth: r.depth,
|
||
}));
|
||
}
|
||
|
||
/**
|
||
* 搜索实体
|
||
*/
|
||
async searchEntities(
|
||
keyword: string,
|
||
options?: { category?: string; type?: string; limit?: number },
|
||
): Promise<Array<{ id: string; name: string; type: string; score: number }>> {
|
||
let query = `
|
||
MATCH (e:Entity)
|
||
WHERE e.name CONTAINS $keyword
|
||
`;
|
||
|
||
const params: Record<string, unknown> = { keyword };
|
||
|
||
if (options?.category) {
|
||
query += ` AND e.category = $category`;
|
||
params.category = options.category;
|
||
}
|
||
|
||
if (options?.type) {
|
||
query += ` AND e.type = $type`;
|
||
params.type = options.type;
|
||
}
|
||
|
||
query += `
|
||
RETURN e.id as id, e.name as name, e.type as type,
|
||
CASE WHEN e.name STARTS WITH $keyword THEN 1.0 ELSE 0.5 END as score
|
||
ORDER BY score DESC
|
||
LIMIT $limit
|
||
`;
|
||
params.limit = options?.limit || 10;
|
||
|
||
return this.read(query, params);
|
||
}
|
||
|
||
// ========== 对话关联 ==========
|
||
|
||
/**
|
||
* 关联对话与实体
|
||
*/
|
||
async linkConversationToEntity(
|
||
conversationId: string,
|
||
entityId: string,
|
||
mentionType: 'MENTIONED' | 'DISCUSSED' | 'ASKED_ABOUT',
|
||
): Promise<void> {
|
||
await this.write(
|
||
`
|
||
MERGE (c:Conversation {id: $conversationId})
|
||
WITH c
|
||
MATCH (e:Entity {id: $entityId})
|
||
MERGE (c)-[r:${mentionType}]->(e)
|
||
SET r.timestamp = datetime()
|
||
`,
|
||
{ conversationId, entityId },
|
||
);
|
||
}
|
||
|
||
/**
|
||
* 获取对话相关实体
|
||
*/
|
||
async getConversationEntities(conversationId: string): Promise<Array<{
|
||
entity: { id: string; name: string; type: string };
|
||
mentionType: string;
|
||
}>> {
|
||
const results = await this.read<{
|
||
id: string;
|
||
name: string;
|
||
type: string;
|
||
mentionType: string;
|
||
}>(
|
||
`
|
||
MATCH (c:Conversation {id: $conversationId})-[r]->(e:Entity)
|
||
RETURN e.id as id, e.name as name, e.type as type, type(r) as mentionType
|
||
`,
|
||
{ conversationId },
|
||
);
|
||
|
||
return results.map(r => ({
|
||
entity: { id: r.id, name: r.name, type: r.type },
|
||
mentionType: r.mentionType,
|
||
}));
|
||
}
|
||
}
|