Files
webdev-pipeline/convex/blacklist.ts

473 lines
11 KiB
TypeScript

import { v } from "convex/values";
import {
normalizeDomain,
normalizeEmailAddress,
normalizePhone,
normalizeText,
} from "../lib/lead-discovery-google";
import { normalizeListLimit } from "./domain";
import { internal } from "./_generated/api";
import type { Doc } from "./_generated/dataModel";
import { internalMutation, mutation, query, type MutationCtx } from "./_generated/server";
const blacklistType = v.union(
v.literal("domain"),
v.literal("email"),
v.literal("phone"),
v.literal("company"),
v.literal("google_place_id"),
);
type BlacklistType =
| "domain"
| "email"
| "phone"
| "company"
| "google_place_id";
const BLACKLIST_APPLY_BATCH_SIZE = 100;
const BLACKLIST_REVIEW_NOTE_PREFIX =
"Lead automatisch durch Sperrlisteneintrag blockiert.";
type BlacklistReason = {
type: BlacklistType;
normalizedValue: string;
reason: string;
};
type LeadIdAndBlacklistPatch = Pick<
Doc<"leads">,
"blacklistStatus" | "priority" | "contactStatus" | "blacklistReason" | "priorityReason" | "contactStatusReason"
> & {
updatedAt: number;
};
type LeadMatchingFieldsPatch = Partial<
Pick<
Doc<"leads">,
| "normalizedEmail"
| "normalizedPhone"
| "normalizedCompanyName"
| "normalizedAddress"
| "normalizedGooglePlaceId"
>
> & {
updatedAt: number;
};
type LeadIdRow = Pick<Doc<"leads">, "_id">;
type LeadMatchQuery = {
order: (direction: "asc" | "desc") => {
paginate: (args: {
numItems: number;
cursor: string | null;
}) => Promise<{
page: LeadIdRow[];
isDone: boolean;
continueCursor: string | null;
}>;
};
};
function buildBlacklistReason(entry: { type: BlacklistType; value: string; note?: string }) {
const normalizedNote = entry.note?.trim();
return normalizedNote
? `${BLACKLIST_REVIEW_NOTE_PREFIX} ${entry.type}: ${entry.value}. ${normalizedNote}`
: `${BLACKLIST_REVIEW_NOTE_PREFIX} ${entry.type}: ${entry.value}.`;
}
function buildReasonPatch(reason: string) {
const patch: LeadIdAndBlacklistPatch = {
blacklistStatus: "blocked" as const,
priority: "blocked" as const,
contactStatus: "do_not_contact" as const,
blacklistReason: reason,
priorityReason: reason,
contactStatusReason: reason,
updatedAt: Date.now(),
};
return patch;
}
function getLeadMatchQuery(
ctx: MutationCtx,
type: BlacklistType,
normalizedValue: string,
): (() => LeadMatchQuery) | null {
if (!normalizedValue) {
return null;
}
switch (type) {
case "domain":
return () =>
ctx.db
.query("leads")
.withIndex("by_websiteDomain", (q) =>
q.eq("websiteDomain", normalizedValue),
);
case "email":
return () =>
ctx.db
.query("leads")
.withIndex("by_normalizedEmail", (q) =>
q.eq("normalizedEmail", normalizedValue),
);
case "phone":
return () =>
ctx.db
.query("leads")
.withIndex("by_normalizedPhone", (q) =>
q.eq("normalizedPhone", normalizedValue),
);
case "company":
return () =>
ctx.db
.query("leads")
.withIndex("by_normalizedCompanyName", (q) =>
q.eq("normalizedCompanyName", normalizedValue),
);
case "google_place_id":
return () =>
ctx.db
.query("leads")
.withIndex("by_normalizedGooglePlaceId", (q) =>
q.eq("normalizedGooglePlaceId", normalizedValue),
);
default:
return null;
}
}
function buildLeadMatchingFieldsPatch(lead: Doc<"leads">) {
const patch: LeadMatchingFieldsPatch = {
updatedAt: Date.now(),
};
const normalizedEmail = normalizeEmailAddress(lead.email);
const normalizedPhone = normalizePhone(lead.phone);
const normalizedCompanyName = normalizeText(lead.companyName);
const normalizedAddress = normalizeText(lead.address);
const normalizedGooglePlaceId = normalizeDomain(lead.googlePlaceId);
if (!lead.normalizedEmail && normalizedEmail) {
patch.normalizedEmail = normalizedEmail;
}
if (!lead.normalizedPhone && normalizedPhone) {
patch.normalizedPhone = normalizedPhone;
}
if (!lead.normalizedCompanyName && normalizedCompanyName) {
patch.normalizedCompanyName = normalizedCompanyName;
}
if (!lead.normalizedAddress && normalizedAddress) {
patch.normalizedAddress = normalizedAddress;
}
if (!lead.normalizedGooglePlaceId && normalizedGooglePlaceId) {
patch.normalizedGooglePlaceId = normalizedGooglePlaceId;
}
return Object.keys(patch).length > 1 ? patch : null;
}
async function scheduleBackfillThenBlacklistApply(
ctx: MutationCtx,
reason: BlacklistReason,
) {
await ctx.scheduler.runAfter(
0,
internal.blacklist.backfillLeadMatchingFieldsForBlacklist,
{
...reason,
cursor: null,
},
);
}
function normalizeBlacklistValue(type: BlacklistType, value: string) {
const trimmed = value.trim();
if (!trimmed) {
return null;
}
switch (type) {
case "email":
return normalizeEmailAddress(trimmed);
case "phone":
return normalizePhone(trimmed);
case "domain":
case "google_place_id":
return normalizeDomain(trimmed);
case "company":
return normalizeText(trimmed);
default:
return null;
}
}
export const create = mutation({
args: {
type: blacklistType,
value: v.string(),
note: v.optional(v.string()),
},
handler: async (ctx, args) => {
const type = args.type as BlacklistType;
const normalizedValue = normalizeBlacklistValue(type, args.value);
if (!normalizedValue) {
throw new Error("Blacklist-Wert ist ungültig.");
}
const existing = await ctx.db
.query("blacklistEntries")
.withIndex("by_type_and_normalizedValue", (q) =>
q.eq("type", type).eq("normalizedValue", normalizedValue),
)
.take(1);
if (existing[0]) {
await scheduleBackfillThenBlacklistApply(ctx, {
type,
normalizedValue,
reason: buildBlacklistReason({
type,
value: existing[0].value,
note: existing[0].note,
}),
});
return existing[0]._id;
}
const created = await ctx.db.insert("blacklistEntries", {
type,
value: args.value.trim(),
normalizedValue,
note: args.note,
createdAt: Date.now(),
});
await scheduleBackfillThenBlacklistApply(ctx, {
type,
normalizedValue,
reason: buildBlacklistReason({
type,
value: args.value.trim(),
note: args.note,
}),
});
return created;
},
});
export const update = mutation({
args: {
id: v.id("blacklistEntries"),
type: v.optional(blacklistType),
value: v.optional(v.string()),
note: v.optional(v.string()),
},
handler: async (ctx, args) => {
const current = await ctx.db.get(args.id);
if (!current) {
throw new Error("Blacklist-Eintrag nicht gefunden.");
}
const nextType = (args.type ?? current.type) as BlacklistType;
const patch: {
type: BlacklistType;
value?: string;
normalizedValue?: string;
note?: string;
} = {
type: nextType,
};
const nextNormalizedValueFromCurrent = normalizeBlacklistValue(
nextType,
current.value,
);
if (!nextNormalizedValueFromCurrent) {
throw new Error("Blacklist-Wert ist ungültig.");
}
let nextValue = current.value;
let nextNormalizedValue = nextNormalizedValueFromCurrent;
if (args.value !== undefined) {
const value = args.value.trim();
const normalizedValue = normalizeBlacklistValue(nextType, value);
if (!normalizedValue) {
throw new Error("Blacklist-Wert ist ungültig.");
}
const existing = await ctx.db
.query("blacklistEntries")
.withIndex("by_type_and_normalizedValue", (q) =>
q.eq("type", nextType).eq("normalizedValue", normalizedValue),
)
.take(1);
if (existing[0] && existing[0]._id !== args.id) {
return existing[0]._id;
}
patch.value = value;
patch.normalizedValue = normalizedValue;
nextValue = value;
nextNormalizedValue = normalizedValue;
}
if (args.note !== undefined) {
patch.note = args.note;
}
await ctx.db.patch(args.id, patch);
await scheduleBackfillThenBlacklistApply(ctx, {
type: nextType,
normalizedValue: nextNormalizedValue,
reason: buildBlacklistReason({
type: nextType,
value: nextValue,
note: patch.note ?? args.note ?? current.note,
}),
});
return args.id;
},
});
export const backfillLeadMatchingFieldsForBlacklist = internalMutation({
args: {
type: blacklistType,
normalizedValue: v.string(),
reason: v.string(),
cursor: v.union(v.string(), v.null()),
},
handler: async (ctx, args) => {
const page = await ctx.db
.query("leads")
.order("asc")
.paginate({
numItems: BLACKLIST_APPLY_BATCH_SIZE,
cursor: args.cursor,
});
for (const lead of page.page) {
const patch = buildLeadMatchingFieldsPatch(lead);
if (patch) {
await ctx.db.patch(lead._id, patch);
}
}
if (!page.isDone) {
await ctx.scheduler.runAfter(
0,
internal.blacklist.backfillLeadMatchingFieldsForBlacklist,
{
type: args.type,
normalizedValue: args.normalizedValue,
reason: args.reason,
cursor: page.continueCursor,
},
);
return null;
}
await ctx.scheduler.runAfter(
0,
internal.blacklist.applyBlacklistToMatchingLeadsBatch,
{
type: args.type,
normalizedValue: args.normalizedValue,
reason: args.reason,
cursor: null,
},
);
return null;
},
});
export const applyBlacklistToMatchingLeadsBatch = internalMutation({
args: {
type: blacklistType,
normalizedValue: v.string(),
reason: v.string(),
cursor: v.union(v.string(), v.null()),
},
handler: async (ctx, args) => {
const queryBuilder = getLeadMatchQuery(
ctx,
args.type as BlacklistType,
args.normalizedValue,
);
if (!queryBuilder) {
return null;
}
const page = await queryBuilder()
.order("asc")
.paginate({
numItems: BLACKLIST_APPLY_BATCH_SIZE,
cursor: args.cursor,
});
const patch = buildReasonPatch(args.reason);
for (const lead of page.page) {
await ctx.db.patch(lead._id, patch);
}
if (!page.isDone) {
await ctx.scheduler.runAfter(
0,
internal.blacklist.applyBlacklistToMatchingLeadsBatch,
{
type: args.type,
normalizedValue: args.normalizedValue,
reason: args.reason,
cursor: page.continueCursor,
},
);
}
return null;
},
});
export const remove = mutation({
args: { id: v.id("blacklistEntries") },
handler: async (ctx, args) => {
await ctx.db.delete(args.id);
return args.id;
},
});
export const list = query({
args: {
type: v.optional(blacklistType),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const limit = normalizeListLimit(args.limit);
if (args.type) {
const type = args.type;
return await ctx.db
.query("blacklistEntries")
.withIndex("by_type_and_normalizedValue", (q) => q.eq("type", type))
.take(limit);
}
return await ctx.db.query("blacklistEntries").order("desc").take(limit);
},
});