import { v } from "convex/values"; import { normalizeDomain, normalizeEmailAddress, normalizePhone, normalizeText, } from "../lib/lead-discovery-google"; import { normalizeListLimit } from "./domain"; import { internal } from "./_generated/api"; import type { Doc } from "./_generated/dataModel"; import { internalMutation, mutation, query, type MutationCtx } from "./_generated/server"; const blacklistType = v.union( v.literal("domain"), v.literal("email"), v.literal("phone"), v.literal("company"), v.literal("google_place_id"), ); type BlacklistType = | "domain" | "email" | "phone" | "company" | "google_place_id"; const BLACKLIST_APPLY_BATCH_SIZE = 100; const BLACKLIST_REVIEW_NOTE_PREFIX = "Lead automatisch durch Sperrlisteneintrag blockiert."; type BlacklistReason = { type: BlacklistType; normalizedValue: string; reason: string; }; type LeadIdAndBlacklistPatch = Pick< Doc<"leads">, "blacklistStatus" | "priority" | "contactStatus" | "blacklistReason" | "priorityReason" | "contactStatusReason" > & { updatedAt: number; }; type LeadMatchingFieldsPatch = Partial< Pick< Doc<"leads">, | "normalizedEmail" | "normalizedPhone" | "normalizedCompanyName" | "normalizedAddress" | "normalizedGooglePlaceId" > > & { updatedAt: number; }; type LeadIdRow = Pick, "_id">; type LeadMatchQuery = { order: (direction: "asc" | "desc") => { paginate: (args: { numItems: number; cursor: string | null; }) => Promise<{ page: LeadIdRow[]; isDone: boolean; continueCursor: string | null; }>; }; }; function buildBlacklistReason(entry: { type: BlacklistType; value: string; note?: string }) { const normalizedNote = entry.note?.trim(); return normalizedNote ? `${BLACKLIST_REVIEW_NOTE_PREFIX} ${entry.type}: ${entry.value}. ${normalizedNote}` : `${BLACKLIST_REVIEW_NOTE_PREFIX} ${entry.type}: ${entry.value}.`; } function buildReasonPatch(reason: string) { const patch: LeadIdAndBlacklistPatch = { blacklistStatus: "blocked" as const, priority: "blocked" as const, contactStatus: "do_not_contact" as const, blacklistReason: reason, priorityReason: reason, contactStatusReason: reason, updatedAt: Date.now(), }; return patch; } function getLeadMatchQuery( ctx: MutationCtx, type: BlacklistType, normalizedValue: string, ): (() => LeadMatchQuery) | null { if (!normalizedValue) { return null; } switch (type) { case "domain": return () => ctx.db .query("leads") .withIndex("by_websiteDomain", (q) => q.eq("websiteDomain", normalizedValue), ); case "email": return () => ctx.db .query("leads") .withIndex("by_normalizedEmail", (q) => q.eq("normalizedEmail", normalizedValue), ); case "phone": return () => ctx.db .query("leads") .withIndex("by_normalizedPhone", (q) => q.eq("normalizedPhone", normalizedValue), ); case "company": return () => ctx.db .query("leads") .withIndex("by_normalizedCompanyName", (q) => q.eq("normalizedCompanyName", normalizedValue), ); case "google_place_id": return () => ctx.db .query("leads") .withIndex("by_normalizedGooglePlaceId", (q) => q.eq("normalizedGooglePlaceId", normalizedValue), ); default: return null; } } function buildLeadMatchingFieldsPatch(lead: Doc<"leads">) { const patch: LeadMatchingFieldsPatch = { updatedAt: Date.now(), }; const normalizedEmail = normalizeEmailAddress(lead.email); const normalizedPhone = normalizePhone(lead.phone); const normalizedCompanyName = normalizeText(lead.companyName); const normalizedAddress = normalizeText(lead.address); const normalizedGooglePlaceId = normalizeDomain(lead.googlePlaceId); if (!lead.normalizedEmail && normalizedEmail) { patch.normalizedEmail = normalizedEmail; } if (!lead.normalizedPhone && normalizedPhone) { patch.normalizedPhone = normalizedPhone; } if (!lead.normalizedCompanyName && normalizedCompanyName) { patch.normalizedCompanyName = normalizedCompanyName; } if (!lead.normalizedAddress && normalizedAddress) { patch.normalizedAddress = normalizedAddress; } if (!lead.normalizedGooglePlaceId && normalizedGooglePlaceId) { patch.normalizedGooglePlaceId = normalizedGooglePlaceId; } return Object.keys(patch).length > 1 ? patch : null; } async function scheduleBackfillThenBlacklistApply( ctx: MutationCtx, reason: BlacklistReason, ) { await ctx.scheduler.runAfter( 0, internal.blacklist.backfillLeadMatchingFieldsForBlacklist, { ...reason, cursor: null, }, ); } function normalizeBlacklistValue(type: BlacklistType, value: string) { const trimmed = value.trim(); if (!trimmed) { return null; } switch (type) { case "email": return normalizeEmailAddress(trimmed); case "phone": return normalizePhone(trimmed); case "domain": case "google_place_id": return normalizeDomain(trimmed); case "company": return normalizeText(trimmed); default: return null; } } export const create = mutation({ args: { type: blacklistType, value: v.string(), note: v.optional(v.string()), }, handler: async (ctx, args) => { const type = args.type as BlacklistType; const normalizedValue = normalizeBlacklistValue(type, args.value); if (!normalizedValue) { throw new Error("Blacklist-Wert ist ungültig."); } const existing = await ctx.db .query("blacklistEntries") .withIndex("by_type_and_normalizedValue", (q) => q.eq("type", type).eq("normalizedValue", normalizedValue), ) .take(1); if (existing[0]) { await scheduleBackfillThenBlacklistApply(ctx, { type, normalizedValue, reason: buildBlacklistReason({ type, value: existing[0].value, note: existing[0].note, }), }); return existing[0]._id; } const created = await ctx.db.insert("blacklistEntries", { type, value: args.value.trim(), normalizedValue, note: args.note, createdAt: Date.now(), }); await scheduleBackfillThenBlacklistApply(ctx, { type, normalizedValue, reason: buildBlacklistReason({ type, value: args.value.trim(), note: args.note, }), }); return created; }, }); export const update = mutation({ args: { id: v.id("blacklistEntries"), type: v.optional(blacklistType), value: v.optional(v.string()), note: v.optional(v.string()), }, handler: async (ctx, args) => { const current = await ctx.db.get(args.id); if (!current) { throw new Error("Blacklist-Eintrag nicht gefunden."); } const nextType = (args.type ?? current.type) as BlacklistType; const patch: { type: BlacklistType; value?: string; normalizedValue?: string; note?: string; } = { type: nextType, }; const nextNormalizedValueFromCurrent = normalizeBlacklistValue( nextType, current.value, ); if (!nextNormalizedValueFromCurrent) { throw new Error("Blacklist-Wert ist ungültig."); } let nextValue = current.value; let nextNormalizedValue = nextNormalizedValueFromCurrent; if (args.value !== undefined) { const value = args.value.trim(); const normalizedValue = normalizeBlacklistValue(nextType, value); if (!normalizedValue) { throw new Error("Blacklist-Wert ist ungültig."); } const existing = await ctx.db .query("blacklistEntries") .withIndex("by_type_and_normalizedValue", (q) => q.eq("type", nextType).eq("normalizedValue", normalizedValue), ) .take(1); if (existing[0] && existing[0]._id !== args.id) { return existing[0]._id; } patch.value = value; patch.normalizedValue = normalizedValue; nextValue = value; nextNormalizedValue = normalizedValue; } if (args.note !== undefined) { patch.note = args.note; } await ctx.db.patch(args.id, patch); await scheduleBackfillThenBlacklistApply(ctx, { type: nextType, normalizedValue: nextNormalizedValue, reason: buildBlacklistReason({ type: nextType, value: nextValue, note: patch.note ?? args.note ?? current.note, }), }); return args.id; }, }); export const backfillLeadMatchingFieldsForBlacklist = internalMutation({ args: { type: blacklistType, normalizedValue: v.string(), reason: v.string(), cursor: v.union(v.string(), v.null()), }, handler: async (ctx, args) => { const page = await ctx.db .query("leads") .order("asc") .paginate({ numItems: BLACKLIST_APPLY_BATCH_SIZE, cursor: args.cursor, }); for (const lead of page.page) { const patch = buildLeadMatchingFieldsPatch(lead); if (patch) { await ctx.db.patch(lead._id, patch); } } if (!page.isDone) { await ctx.scheduler.runAfter( 0, internal.blacklist.backfillLeadMatchingFieldsForBlacklist, { type: args.type, normalizedValue: args.normalizedValue, reason: args.reason, cursor: page.continueCursor, }, ); return null; } await ctx.scheduler.runAfter( 0, internal.blacklist.applyBlacklistToMatchingLeadsBatch, { type: args.type, normalizedValue: args.normalizedValue, reason: args.reason, cursor: null, }, ); return null; }, }); export const applyBlacklistToMatchingLeadsBatch = internalMutation({ args: { type: blacklistType, normalizedValue: v.string(), reason: v.string(), cursor: v.union(v.string(), v.null()), }, handler: async (ctx, args) => { const queryBuilder = getLeadMatchQuery( ctx, args.type as BlacklistType, args.normalizedValue, ); if (!queryBuilder) { return null; } const page = await queryBuilder() .order("asc") .paginate({ numItems: BLACKLIST_APPLY_BATCH_SIZE, cursor: args.cursor, }); const patch = buildReasonPatch(args.reason); for (const lead of page.page) { await ctx.db.patch(lead._id, patch); } if (!page.isDone) { await ctx.scheduler.runAfter( 0, internal.blacklist.applyBlacklistToMatchingLeadsBatch, { type: args.type, normalizedValue: args.normalizedValue, reason: args.reason, cursor: page.continueCursor, }, ); } return null; }, }); export const remove = mutation({ args: { id: v.id("blacklistEntries") }, handler: async (ctx, args) => { await ctx.db.delete(args.id); return args.id; }, }); export const list = query({ args: { type: v.optional(blacklistType), limit: v.optional(v.number()), }, handler: async (ctx, args) => { const limit = normalizeListLimit(args.limit); if (args.type) { const type = args.type; return await ctx.db .query("blacklistEntries") .withIndex("by_type_and_normalizedValue", (q) => q.eq("type", type)) .take(limit); } return await ctx.db.query("blacklistEntries").order("desc").take(limit); }, });