import { v } from "convex/values"; import { internal } from "./_generated/api"; import type { Doc } from "./_generated/dataModel"; import { internalMutation } from "./_generated/server"; import { normalizeEmailAddress } from "../lib/lead-discovery-google"; const RUN_COUNTER_TEMPLATE = { leadsFound: 0, leadsCreated: 0, auditsCreated: 0, outreachPrepared: 0, errors: 0, }; type WebsiteLead = Pick, "_id" | "websiteUrl" | "contactStatus">; type LeadContactStatus = Doc<"leads">["contactStatus"]; export const queueLeadEnrichment = internalMutation({ args: { leadId: v.id("leads"), parentRunId: v.optional(v.id("agentRuns")), }, returns: v.union(v.id("agentRuns"), v.null()), handler: async (ctx, args) => { const now = Date.now(); const lead = await ctx.db.get(args.leadId); if (!lead || !lead.websiteUrl) { return null; } const activePending = await ctx.db .query("agentRuns") .withIndex("by_type_and_status_and_leadId", (q) => q .eq("type", "website_enrichment") .eq("status", "pending") .eq("leadId", args.leadId), ) .take(1); const activeRunning = await ctx.db .query("agentRuns") .withIndex("by_type_and_status_and_leadId", (q) => q .eq("type", "website_enrichment") .eq("status", "running") .eq("leadId", args.leadId), ) .take(1); if (activePending.length > 0) { return activePending[0]._id; } if (activeRunning.length > 0) { return activeRunning[0]._id; } const runId = await ctx.db.insert("agentRuns", { type: "website_enrichment", leadId: args.leadId, status: "pending", counters: RUN_COUNTER_TEMPLATE, currentStep: "website_enrichment", createdAt: now, updatedAt: now, }); await ctx.db.insert("agentRunEvents", { runId, level: "info", message: "Website-Enrichment wurde in die Warteschlange gesetzt.", details: [ { label: "Lead", value: args.leadId }, ...(args.parentRunId ? [{ label: "Parent-Run", value: args.parentRunId }] : []), ], createdAt: now, }); await ctx.scheduler.runAfter( 0, internal.websiteEnrichmentAction.processLeadEnrichment, { runId, }, ); return runId; }, }); export const startLeadEnrichmentRun = internalMutation({ args: { runId: v.id("agentRuns") }, handler: async (ctx, args): Promise< { lead: WebsiteLead } | null > => { const now = Date.now(); const run = await ctx.db.get(args.runId); if (!run || run.type !== "website_enrichment" || run.status !== "pending") { return null; } if (!run.leadId) { await ctx.db.patch(args.runId, { status: "failed", currentStep: "website_enrichment", errorSummary: "Der Lauf hat keine Lead-ID.", updatedAt: now, finishedAt: now, }); await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "error", message: "Website-Enrichment konnte nicht gestartet werden: Keine Lead-ID.", details: [{ label: "Lead-ID", value: run.leadId ?? "unbekannt" }], createdAt: now, }); return null; } const lead = await ctx.db.get(run.leadId); if (!lead) { await ctx.db.patch(args.runId, { status: "failed", currentStep: "website_enrichment", errorSummary: "Lead fehlt oder besitzt keine Website.", updatedAt: now, finishedAt: now, }); await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "error", message: "Website-Enrichment konnte nicht gestartet werden: Kein Lead mit Website-URL.", details: [{ label: "Lead-ID", value: run.leadId }], createdAt: now, }); return null; } if (!lead.websiteUrl) { await ctx.db.patch(args.runId, { status: "failed", currentStep: "website_enrichment", errorSummary: "Lead fehlt oder besitzt keine Website.", updatedAt: now, finishedAt: now, }); await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "error", message: "Website-Enrichment konnte nicht gestartet werden: Kein Lead mit Website-URL.", details: [{ label: "Lead-ID", value: lead._id }], createdAt: now, }); await ctx.db.patch(lead._id, { contactStatusReason: "Website-URL fehlt für das Website-Enrichment.", updatedAt: now, }); return null; } await ctx.db.patch(args.runId, { status: "running", currentStep: "website_enrichment", startedAt: now, updatedAt: now, }); await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "info", message: "Website-Enrichment gestartet.", details: [{ label: "Lead", value: lead._id }], createdAt: now, }); return { lead: { _id: lead._id, websiteUrl: lead.websiteUrl, contactStatus: lead.contactStatus, }, }; }, }); export const persistLeadEnrichmentResult = internalMutation({ args: { runId: v.id("agentRuns"), leadId: v.id("leads"), pages: v.array( v.object({ sourceUrl: v.string(), finalUrl: v.string(), pageKind: v.union( v.literal("homepage"), v.literal("contact"), v.literal("impressum"), v.literal("services"), v.literal("about"), v.literal("team"), v.literal("other"), ), title: v.optional(v.string()), metaDescription: v.optional(v.string()), headings: v.array(v.string()), visibleTextExcerpt: v.optional(v.string()), hasContactFormSignal: v.boolean(), hasContactCtaSignal: v.boolean(), }), ), links: v.array( v.object({ pageUrl: v.string(), href: v.string(), text: v.optional(v.string()), isInternal: v.boolean(), isBroken: v.optional(v.boolean()), }), ), emailCandidates: v.array( v.object({ email: v.string(), normalizedEmail: v.string(), emailSource: v.string(), sourceUrl: v.string(), contactPerson: v.optional(v.string()), isBusinessContactAddress: v.boolean(), isGeneric: v.boolean(), accepted: v.boolean(), }), ), screenshots: v.array( v.object({ storageId: v.id("_storage"), viewport: v.union(v.literal("desktop"), v.literal("mobile")), sourceUrl: v.string(), capturedAt: v.number(), width: v.number(), height: v.number(), mimeType: v.string(), }), ), technicalChecks: v.array( v.object({ sourceUrl: v.string(), finalUrl: v.optional(v.string()), usesHttps: v.boolean(), missingTitle: v.boolean(), missingMetaDescription: v.boolean(), hasVisibleContactPath: v.boolean(), brokenInternalLinkCount: v.number(), }), ), }, handler: async (ctx, args) => { const createdAt = Date.now(); for (const page of args.pages) { await ctx.db.insert("websiteCrawlPages", { ...page, leadId: args.leadId, runId: args.runId, createdAt, }); } for (const link of args.links) { await ctx.db.insert("websiteCrawlLinks", { ...link, leadId: args.leadId, runId: args.runId, createdAt, }); } for (const candidate of args.emailCandidates) { await ctx.db.insert("websiteEmailCandidates", { ...candidate, leadId: args.leadId, runId: args.runId, createdAt, }); } for (const screenshot of args.screenshots) { await ctx.db.insert("websiteCrawlScreenshots", { ...screenshot, leadId: args.leadId, runId: args.runId, createdAt, }); } for (const checks of args.technicalChecks) { await ctx.db.insert("websiteTechnicalChecks", { ...checks, leadId: args.leadId, runId: args.runId, createdAt, }); } }, }); export const finishLeadEnrichmentRun = internalMutation({ args: { runId: v.id("agentRuns"), status: v.union( v.literal("succeeded"), v.literal("failed"), v.literal("canceled"), ), currentStep: v.optional(v.string()), errorSummary: v.optional(v.string()), errors: v.optional(v.number()), }, handler: async (ctx, args) => { const now = Date.now(); await ctx.db.patch(args.runId, { status: args.status, updatedAt: now, finishedAt: now, currentStep: args.currentStep ?? "website_enrichment", errorSummary: args.errorSummary, counters: { leadsFound: 1, leadsCreated: 0, auditsCreated: 0, outreachPrepared: 0, errors: args.errors ?? 0, }, }); }, }); export const patchLeadFromWebsiteEnrichment = internalMutation({ args: { leadId: v.id("leads"), email: v.optional(v.string()), emailSource: v.optional(v.string()), contactPerson: v.optional(v.string()), currentContactStatus: v.union( v.literal("new"), v.literal("missing_contact"), v.literal("audit_ready"), v.literal("outreach_ready"), v.literal("contacted"), v.literal("replied"), v.literal("do_not_contact"), ), contactStatusReason: v.optional(v.string()), }, handler: async (ctx, args) => { const lead = await ctx.db.get(args.leadId); if (!lead) { return null; } type LeadPatch = { email?: string; normalizedEmail?: string; emailSource?: string; contactPerson?: string; contactStatus?: LeadContactStatus; contactStatusReason?: string; updatedAt: number; }; const patch: LeadPatch = { updatedAt: Date.now(), }; if (args.email && args.emailSource) { const normalized = normalizeEmailAddress(args.email); if (normalized) { patch.email = normalized; patch.normalizedEmail = normalized; patch.emailSource = args.emailSource; } } if (args.contactPerson) { patch.contactPerson = args.contactPerson; } if (args.contactStatusReason !== undefined) { patch.contactStatusReason = args.contactStatusReason; } else if (args.email && args.currentContactStatus === "missing_contact") { patch.contactStatus = "new"; } if (Object.keys(patch).length > 1) { await ctx.db.patch(args.leadId, patch); } return args.leadId; }, });