import { Injectable } from '@nestjs/common'; import { InjectDataSource } from '@nestjs/typeorm'; import { DataSource } from 'typeorm'; import { Notification, NotificationType, TargetType, TagLogic } from '../../domain/entities/notification.entity'; export interface CreateNotificationDto { title: string; content: string; type: NotificationType; priority: string; targetType: TargetType; // Targeting payloads tenantIds?: string[]; // SPECIFIC_TENANTS userIds?: string[]; // SPECIFIC_USERS targetTagIds?: string[]; // BY_TENANT_TAG targetTagLogic?: TagLogic; // BY_TENANT_TAG: ANY | ALL targetPlans?: string[]; // BY_PLAN: ['free','pro','enterprise'] targetStatuses?: string[]; // BY_TENANT_STATUS: ['active','suspended'] targetSegment?: string; // BY_SEGMENT: segment key channelKey?: string | null; // Phase 2 opt-out channel imageUrl?: string | null; linkUrl?: string | null; requiresForceRead: boolean; isEnabled: boolean; publishedAt?: Date | null; expiresAt?: Date | null; createdBy?: string; } export interface NotificationWithReadStatus extends Notification { isRead: boolean; } @Injectable() export class NotificationRepository { constructor(@InjectDataSource() private readonly ds: DataSource) {} // ── Write Operations ────────────────────────────────────────────────────── async create(dto: CreateNotificationDto): Promise { const result = await this.ds.query( `INSERT INTO public.notifications (title, content, type, priority, target_type, target_tag_ids, target_tag_logic, target_plans, target_statuses, target_segment, channel_key, image_url, link_url, requires_force_read, is_enabled, published_at, expires_at, created_by) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18) RETURNING *`, [ dto.title, dto.content, dto.type, dto.priority, dto.targetType, dto.targetTagIds?.length ? dto.targetTagIds : null, dto.targetTagLogic ?? 'ANY', dto.targetPlans?.length ? dto.targetPlans : null, dto.targetStatuses?.length ? dto.targetStatuses : null, dto.targetSegment ?? null, dto.channelKey ?? null, dto.imageUrl ?? null, dto.linkUrl ?? null, dto.requiresForceRead, dto.isEnabled, dto.publishedAt ?? null, dto.expiresAt ?? null, dto.createdBy ?? null, ], ); const row = result[0]; if (dto.targetType === 'SPECIFIC_TENANTS' && dto.tenantIds?.length) { for (const tid of dto.tenantIds) { await this.ds.query( `INSERT INTO public.notification_tenant_targets (notification_id, tenant_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, [row.id, tid], ); } } if (dto.targetType === 'SPECIFIC_USERS' && dto.userIds?.length) { for (const uid of dto.userIds) { await this.ds.query( `INSERT INTO public.notification_user_targets (notification_id, user_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, [row.id, uid], ); } } return this.mapRow(row); } async update(id: string, dto: Partial): Promise { const fields: string[] = []; const values: any[] = []; let idx = 1; const add = (col: string, val: any) => { fields.push(`${col} = $${idx++}`); values.push(val); }; if (dto.title !== undefined) add('title', dto.title); if (dto.content !== undefined) add('content', dto.content); if (dto.type !== undefined) add('type', dto.type); if (dto.priority !== undefined) add('priority', dto.priority); if (dto.targetType !== undefined) add('target_type', dto.targetType); if (dto.targetTagIds !== undefined) add('target_tag_ids', dto.targetTagIds?.length ? dto.targetTagIds : null); if (dto.targetTagLogic !== undefined) add('target_tag_logic', dto.targetTagLogic); if (dto.targetPlans !== undefined) add('target_plans', dto.targetPlans?.length ? dto.targetPlans : null); if (dto.targetStatuses !== undefined) add('target_statuses', dto.targetStatuses?.length ? dto.targetStatuses : null); if (dto.targetSegment !== undefined) add('target_segment', dto.targetSegment); if (dto.channelKey !== undefined) add('channel_key', dto.channelKey); if (dto.imageUrl !== undefined) add('image_url', dto.imageUrl); if (dto.linkUrl !== undefined) add('link_url', dto.linkUrl); if (dto.requiresForceRead !== undefined) add('requires_force_read', dto.requiresForceRead); if (dto.isEnabled !== undefined) add('is_enabled', dto.isEnabled); if (dto.publishedAt !== undefined) add('published_at', dto.publishedAt); if (dto.expiresAt !== undefined) add('expires_at', dto.expiresAt); fields.push(`updated_at = NOW()`); if (fields.length === 1) return this.findById(id); values.push(id); const result = await this.ds.query( `UPDATE public.notifications SET ${fields.join(', ')} WHERE id = $${idx} RETURNING *`, values, ); if (!result.length) return null; // Re-sync SPECIFIC_TENANTS targets if (dto.targetType !== undefined || dto.tenantIds !== undefined) { await this.ds.query(`DELETE FROM public.notification_tenant_targets WHERE notification_id = $1`, [id]); const newType = dto.targetType ?? (await this.findById(id))?.targetType; if (newType === 'SPECIFIC_TENANTS' && dto.tenantIds?.length) { for (const tid of dto.tenantIds) { await this.ds.query( `INSERT INTO public.notification_tenant_targets (notification_id, tenant_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, [id, tid], ); } } } // Re-sync SPECIFIC_USERS targets if (dto.targetType !== undefined || dto.userIds !== undefined) { await this.ds.query(`DELETE FROM public.notification_user_targets WHERE notification_id = $1`, [id]); const newType = dto.targetType ?? (await this.findById(id))?.targetType; if (newType === 'SPECIFIC_USERS' && dto.userIds?.length) { for (const uid of dto.userIds) { await this.ds.query( `INSERT INTO public.notification_user_targets (notification_id, user_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, [id, uid], ); } } } return this.mapRow(result[0]); } async delete(id: string): Promise { await this.ds.query(`DELETE FROM public.notification_tenant_targets WHERE notification_id = $1`, [id]); await this.ds.query(`DELETE FROM public.notification_user_targets WHERE notification_id = $1`, [id]); await this.ds.query(`DELETE FROM public.notification_reads WHERE notification_id = $1`, [id]); await this.ds.query(`DELETE FROM public.notifications WHERE id = $1`, [id]); } async findById(id: string): Promise { const rows = await this.ds.query(`SELECT * FROM public.notifications WHERE id = $1`, [id]); return rows.length ? this.mapRow(rows[0]) : null; } async findAllAdmin(params: { type?: string; targetType?: string; limit: number; offset: number; }): Promise<{ items: (Notification & { tenantIds: string[]; userIds: string[] })[]; total: number }> { const conds: string[] = []; const baseArgs: any[] = []; let p = 1; if (params.type) { conds.push(`type = $${p++}`); baseArgs.push(params.type); } if (params.targetType) { conds.push(`target_type = $${p++}`); baseArgs.push(params.targetType); } const where = conds.length ? `WHERE ${conds.join(' AND ')}` : ''; const [rows, count] = await Promise.all([ this.ds.query( `SELECT * FROM public.notifications ${where} ORDER BY created_at DESC LIMIT $${p} OFFSET $${p + 1}`, [...baseArgs, params.limit, params.offset], ), this.ds.query(`SELECT COUNT(*)::int AS total FROM public.notifications ${where}`, baseArgs), ]); const items = await Promise.all( rows.map(async (r: any) => { const [tenants, users] = await Promise.all([ this.ds.query(`SELECT tenant_id FROM public.notification_tenant_targets WHERE notification_id = $1`, [r.id]), this.ds.query(`SELECT user_id FROM public.notification_user_targets WHERE notification_id = $1`, [r.id]), ]); return { ...this.mapRow(r), tenantIds: tenants.map((t: any) => t.tenant_id), userIds: users.map((u: any) => u.user_id), }; }), ); return { items, total: count[0]?.total ?? 0 }; } // ── User-facing delivery ────────────────────────────────────────────────── async findForUser(params: { tenantId: string; userId: string; tenantPlan?: string; tenantStatus?: string; limit: number; offset: number; }): Promise<{ items: NotificationWithReadStatus[]; total: number }> { const { tenantId, userId, tenantPlan, tenantStatus, limit, offset } = params; const targeting = this._targetingSQL(); const channelFilter = this._channelFilterSQL(); const baseWhere = ` n.is_enabled = true AND (n.published_at IS NULL OR n.published_at <= NOW()) AND (n.expires_at IS NULL OR n.expires_at > NOW()) AND ${targeting} AND ${channelFilter}`; const [rows, count] = await Promise.all([ this.ds.query( `SELECT n.*, CASE WHEN nr.id IS NOT NULL THEN true ELSE false END AS is_read FROM public.notifications n LEFT JOIN public.notification_reads nr ON nr.notification_id = n.id AND nr.user_id = $2 WHERE ${baseWhere} ORDER BY n.requires_force_read DESC, n.priority DESC, COALESCE(n.published_at, n.created_at) DESC LIMIT $5 OFFSET $6`, [tenantId, userId, tenantPlan ?? null, tenantStatus ?? null, limit, offset], ), this.ds.query( `SELECT COUNT(*)::int AS total FROM public.notifications n WHERE ${baseWhere}`, [tenantId, userId, tenantPlan ?? null, tenantStatus ?? null], ), ]); return { items: rows.map((r: any) => ({ ...this.mapRow(r), isRead: r.is_read })), total: count[0]?.total ?? 0, }; } async getUnreadCount(tenantId: string, userId: string, tenantPlan?: string, tenantStatus?: string): Promise { const targeting = this._targetingSQL(); const result = await this.ds.query( `SELECT COUNT(*)::int AS cnt FROM public.notifications n WHERE n.is_enabled = true AND (n.published_at IS NULL OR n.published_at <= NOW()) AND (n.expires_at IS NULL OR n.expires_at > NOW()) AND ${targeting} AND NOT EXISTS ( SELECT 1 FROM public.notification_reads nr WHERE nr.notification_id = n.id AND nr.user_id = $2 )`, [tenantId, userId, tenantPlan ?? null, tenantStatus ?? null], ); return result[0]?.cnt ?? 0; } async markRead(notificationId: string, userId: string, tenantId: string): Promise { await this.ds.query( `INSERT INTO public.notification_reads (notification_id, user_id, tenant_id) VALUES ($1, $2, $3) ON CONFLICT (notification_id, user_id) DO NOTHING`, [notificationId, userId, tenantId], ); } async markAllRead(tenantId: string, userId: string): Promise { await this.ds.query( `INSERT INTO public.notification_reads (notification_id, user_id, tenant_id) SELECT n.id, $2, $1 FROM public.notifications n WHERE n.is_enabled = true AND (n.published_at IS NULL OR n.published_at <= NOW()) AND (n.expires_at IS NULL OR n.expires_at > NOW()) AND (n.target_type = 'ALL' OR (n.target_type = 'SPECIFIC_TENANTS' AND EXISTS ( SELECT 1 FROM public.notification_tenant_targets t WHERE t.notification_id = n.id AND t.tenant_id = $1))) ON CONFLICT (notification_id, user_id) DO NOTHING`, [tenantId, userId], ); } // ── Analytics ───────────────────────────────────────────────────────────── async getDeliveryStats(notificationId: string): Promise<{ readCount: number }> { const result = await this.ds.query( `SELECT COUNT(*)::int AS cnt FROM public.notification_reads WHERE notification_id = $1`, [notificationId], ); return { readCount: result[0]?.cnt ?? 0 }; } // ── SQL helpers ─────────────────────────────────────────────────────────── private _targetingSQL(): string { return `( n.target_type = 'ALL' OR (n.target_type = 'SPECIFIC_TENANTS' AND EXISTS ( SELECT 1 FROM public.notification_tenant_targets t WHERE t.notification_id = n.id AND t.tenant_id = $1)) OR (n.target_type = 'SPECIFIC_USERS' AND EXISTS ( SELECT 1 FROM public.notification_user_targets u WHERE u.notification_id = n.id AND u.user_id = $2)) OR (n.target_type = 'BY_TENANT_TAG' AND n.target_tag_ids IS NOT NULL AND ( CASE WHEN n.target_tag_logic = 'ALL' THEN (SELECT COUNT(*) FROM public.tenant_tag_assignments ta WHERE ta.tenant_id = $1 AND ta.tag_id = ANY(n.target_tag_ids)) = array_length(n.target_tag_ids, 1) ELSE EXISTS (SELECT 1 FROM public.tenant_tag_assignments ta WHERE ta.tenant_id = $1 AND ta.tag_id = ANY(n.target_tag_ids)) END)) OR (n.target_type = 'BY_PLAN' AND $3::text IS NOT NULL AND n.target_plans IS NOT NULL AND $3 = ANY(n.target_plans)) OR (n.target_type = 'BY_TENANT_STATUS' AND $4::text IS NOT NULL AND n.target_statuses IS NOT NULL AND $4 = ANY(n.target_statuses)) OR (n.target_type = 'BY_SEGMENT' AND EXISTS ( SELECT 1 FROM public.notification_segment_members sm WHERE sm.segment_key = n.target_segment AND sm.tenant_id = $1)) )`; } private _channelFilterSQL(): string { return `( n.channel_key IS NULL OR n.channel_key IN ( SELECT c.channel_key FROM public.notification_channels c WHERE c.is_mandatory = true) OR NOT EXISTS ( SELECT 1 FROM public.user_notification_preferences pref WHERE pref.user_id = $2 AND pref.channel_key = n.channel_key AND pref.enabled = false) )`; } mapRow(r: any): Notification { return { id: r.id, title: r.title, content: r.content, type: r.type, priority: r.priority, targetType: r.target_type, targetTagIds: r.target_tag_ids, targetTagLogic: r.target_tag_logic ?? 'ANY', targetPlans: r.target_plans, targetStatuses: r.target_statuses, targetSegment: r.target_segment, channelKey: r.channel_key, imageUrl: r.image_url, linkUrl: r.link_url, requiresForceRead: r.requires_force_read, isEnabled: r.is_enabled, publishedAt: r.published_at, expiresAt: r.expires_at, createdBy: r.created_by, createdAt: r.created_at, updatedAt: r.updated_at, } as Notification; } }