feat: add website enrichment crawler
This commit is contained in:
408
convex/websiteEnrichment.ts
Normal file
408
convex/websiteEnrichment.ts
Normal file
@@ -0,0 +1,408 @@
|
||||
import { v } from "convex/values";
|
||||
import { internal } from "./_generated/api";
|
||||
import type { Doc } from "./_generated/dataModel";
|
||||
import { internalMutation } from "./_generated/server";
|
||||
import { normalizeEmailAddress } from "../lib/lead-discovery-google";
|
||||
|
||||
const RUN_COUNTER_TEMPLATE = {
|
||||
leadsFound: 0,
|
||||
leadsCreated: 0,
|
||||
auditsCreated: 0,
|
||||
outreachPrepared: 0,
|
||||
errors: 0,
|
||||
};
|
||||
|
||||
type WebsiteLead = Pick<Doc<"leads">, "_id" | "websiteUrl" | "contactStatus">;
|
||||
type LeadContactStatus = Doc<"leads">["contactStatus"];
|
||||
|
||||
export const queueLeadEnrichment = internalMutation({
|
||||
args: {
|
||||
leadId: v.id("leads"),
|
||||
parentRunId: v.optional(v.id("agentRuns")),
|
||||
},
|
||||
returns: v.union(v.id("agentRuns"), v.null()),
|
||||
handler: async (ctx, args) => {
|
||||
const now = Date.now();
|
||||
const lead = await ctx.db.get(args.leadId);
|
||||
|
||||
if (!lead || !lead.websiteUrl) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const activePending = await ctx.db
|
||||
.query("agentRuns")
|
||||
.withIndex("by_type_and_status_and_leadId", (q) =>
|
||||
q
|
||||
.eq("type", "website_enrichment")
|
||||
.eq("status", "pending")
|
||||
.eq("leadId", args.leadId),
|
||||
)
|
||||
.take(1);
|
||||
|
||||
const activeRunning = await ctx.db
|
||||
.query("agentRuns")
|
||||
.withIndex("by_type_and_status_and_leadId", (q) =>
|
||||
q
|
||||
.eq("type", "website_enrichment")
|
||||
.eq("status", "running")
|
||||
.eq("leadId", args.leadId),
|
||||
)
|
||||
.take(1);
|
||||
|
||||
if (activePending.length > 0) {
|
||||
return activePending[0]._id;
|
||||
}
|
||||
if (activeRunning.length > 0) {
|
||||
return activeRunning[0]._id;
|
||||
}
|
||||
|
||||
const runId = await ctx.db.insert("agentRuns", {
|
||||
type: "website_enrichment",
|
||||
leadId: args.leadId,
|
||||
status: "pending",
|
||||
counters: RUN_COUNTER_TEMPLATE,
|
||||
currentStep: "website_enrichment",
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
|
||||
await ctx.db.insert("agentRunEvents", {
|
||||
runId,
|
||||
level: "info",
|
||||
message: "Website-Enrichment wurde in die Warteschlange gesetzt.",
|
||||
details: [
|
||||
{ label: "Lead", value: args.leadId },
|
||||
...(args.parentRunId
|
||||
? [{ label: "Parent-Run", value: args.parentRunId }]
|
||||
: []),
|
||||
],
|
||||
createdAt: now,
|
||||
});
|
||||
|
||||
await ctx.scheduler.runAfter(
|
||||
0,
|
||||
internal.websiteEnrichmentAction.processLeadEnrichment,
|
||||
{
|
||||
runId,
|
||||
},
|
||||
);
|
||||
|
||||
return runId;
|
||||
},
|
||||
});
|
||||
|
||||
export const startLeadEnrichmentRun = internalMutation({
|
||||
args: { runId: v.id("agentRuns") },
|
||||
handler: async (ctx, args): Promise<
|
||||
{ lead: WebsiteLead } | null
|
||||
> => {
|
||||
const now = Date.now();
|
||||
const run = await ctx.db.get(args.runId);
|
||||
|
||||
if (!run || run.type !== "website_enrichment" || run.status !== "pending") {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!run.leadId) {
|
||||
await ctx.db.patch(args.runId, {
|
||||
status: "failed",
|
||||
currentStep: "website_enrichment",
|
||||
errorSummary: "Der Lauf hat keine Lead-ID.",
|
||||
updatedAt: now,
|
||||
finishedAt: now,
|
||||
});
|
||||
await ctx.db.insert("agentRunEvents", {
|
||||
runId: args.runId,
|
||||
level: "error",
|
||||
message:
|
||||
"Website-Enrichment konnte nicht gestartet werden: Keine Lead-ID.",
|
||||
details: [{ label: "Lead-ID", value: run.leadId ?? "unbekannt" }],
|
||||
createdAt: now,
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
||||
const lead = await ctx.db.get(run.leadId);
|
||||
|
||||
if (!lead) {
|
||||
await ctx.db.patch(args.runId, {
|
||||
status: "failed",
|
||||
currentStep: "website_enrichment",
|
||||
errorSummary: "Lead fehlt oder besitzt keine Website.",
|
||||
updatedAt: now,
|
||||
finishedAt: now,
|
||||
});
|
||||
await ctx.db.insert("agentRunEvents", {
|
||||
runId: args.runId,
|
||||
level: "error",
|
||||
message:
|
||||
"Website-Enrichment konnte nicht gestartet werden: Kein Lead mit Website-URL.",
|
||||
details: [{ label: "Lead-ID", value: run.leadId }],
|
||||
createdAt: now,
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!lead.websiteUrl) {
|
||||
await ctx.db.patch(args.runId, {
|
||||
status: "failed",
|
||||
currentStep: "website_enrichment",
|
||||
errorSummary: "Lead fehlt oder besitzt keine Website.",
|
||||
updatedAt: now,
|
||||
finishedAt: now,
|
||||
});
|
||||
await ctx.db.insert("agentRunEvents", {
|
||||
runId: args.runId,
|
||||
level: "error",
|
||||
message:
|
||||
"Website-Enrichment konnte nicht gestartet werden: Kein Lead mit Website-URL.",
|
||||
details: [{ label: "Lead-ID", value: lead._id }],
|
||||
createdAt: now,
|
||||
});
|
||||
await ctx.db.patch(lead._id, {
|
||||
contactStatusReason:
|
||||
"Website-URL fehlt für das Website-Enrichment.",
|
||||
updatedAt: now,
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
||||
await ctx.db.patch(args.runId, {
|
||||
status: "running",
|
||||
currentStep: "website_enrichment",
|
||||
startedAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
|
||||
await ctx.db.insert("agentRunEvents", {
|
||||
runId: args.runId,
|
||||
level: "info",
|
||||
message: "Website-Enrichment gestartet.",
|
||||
details: [{ label: "Lead", value: lead._id }],
|
||||
createdAt: now,
|
||||
});
|
||||
|
||||
return {
|
||||
lead: {
|
||||
_id: lead._id,
|
||||
websiteUrl: lead.websiteUrl,
|
||||
contactStatus: lead.contactStatus,
|
||||
},
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
export const persistLeadEnrichmentResult = internalMutation({
|
||||
args: {
|
||||
runId: v.id("agentRuns"),
|
||||
leadId: v.id("leads"),
|
||||
pages: v.array(
|
||||
v.object({
|
||||
sourceUrl: v.string(),
|
||||
finalUrl: v.string(),
|
||||
pageKind: v.union(
|
||||
v.literal("homepage"),
|
||||
v.literal("contact"),
|
||||
v.literal("impressum"),
|
||||
v.literal("services"),
|
||||
v.literal("about"),
|
||||
v.literal("team"),
|
||||
v.literal("other"),
|
||||
),
|
||||
title: v.optional(v.string()),
|
||||
metaDescription: v.optional(v.string()),
|
||||
headings: v.array(v.string()),
|
||||
visibleTextExcerpt: v.optional(v.string()),
|
||||
hasContactFormSignal: v.boolean(),
|
||||
hasContactCtaSignal: v.boolean(),
|
||||
}),
|
||||
),
|
||||
links: v.array(
|
||||
v.object({
|
||||
pageUrl: v.string(),
|
||||
href: v.string(),
|
||||
text: v.optional(v.string()),
|
||||
isInternal: v.boolean(),
|
||||
isBroken: v.optional(v.boolean()),
|
||||
}),
|
||||
),
|
||||
emailCandidates: v.array(
|
||||
v.object({
|
||||
email: v.string(),
|
||||
normalizedEmail: v.string(),
|
||||
emailSource: v.string(),
|
||||
sourceUrl: v.string(),
|
||||
contactPerson: v.optional(v.string()),
|
||||
isBusinessContactAddress: v.boolean(),
|
||||
isGeneric: v.boolean(),
|
||||
accepted: v.boolean(),
|
||||
}),
|
||||
),
|
||||
screenshots: v.array(
|
||||
v.object({
|
||||
storageId: v.id("_storage"),
|
||||
viewport: v.union(v.literal("desktop"), v.literal("mobile")),
|
||||
sourceUrl: v.string(),
|
||||
capturedAt: v.number(),
|
||||
width: v.number(),
|
||||
height: v.number(),
|
||||
mimeType: v.string(),
|
||||
}),
|
||||
),
|
||||
technicalChecks: v.array(
|
||||
v.object({
|
||||
sourceUrl: v.string(),
|
||||
finalUrl: v.optional(v.string()),
|
||||
usesHttps: v.boolean(),
|
||||
missingTitle: v.boolean(),
|
||||
missingMetaDescription: v.boolean(),
|
||||
hasVisibleContactPath: v.boolean(),
|
||||
brokenInternalLinkCount: v.number(),
|
||||
}),
|
||||
),
|
||||
},
|
||||
handler: async (ctx, args) => {
|
||||
const createdAt = Date.now();
|
||||
|
||||
for (const page of args.pages) {
|
||||
await ctx.db.insert("websiteCrawlPages", {
|
||||
...page,
|
||||
leadId: args.leadId,
|
||||
runId: args.runId,
|
||||
createdAt,
|
||||
});
|
||||
}
|
||||
|
||||
for (const link of args.links) {
|
||||
await ctx.db.insert("websiteCrawlLinks", {
|
||||
...link,
|
||||
leadId: args.leadId,
|
||||
runId: args.runId,
|
||||
createdAt,
|
||||
});
|
||||
}
|
||||
|
||||
for (const candidate of args.emailCandidates) {
|
||||
await ctx.db.insert("websiteEmailCandidates", {
|
||||
...candidate,
|
||||
leadId: args.leadId,
|
||||
runId: args.runId,
|
||||
createdAt,
|
||||
});
|
||||
}
|
||||
|
||||
for (const screenshot of args.screenshots) {
|
||||
await ctx.db.insert("websiteCrawlScreenshots", {
|
||||
...screenshot,
|
||||
leadId: args.leadId,
|
||||
runId: args.runId,
|
||||
createdAt,
|
||||
});
|
||||
}
|
||||
|
||||
for (const checks of args.technicalChecks) {
|
||||
await ctx.db.insert("websiteTechnicalChecks", {
|
||||
...checks,
|
||||
leadId: args.leadId,
|
||||
runId: args.runId,
|
||||
createdAt,
|
||||
});
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
export const finishLeadEnrichmentRun = internalMutation({
|
||||
args: {
|
||||
runId: v.id("agentRuns"),
|
||||
status: v.union(
|
||||
v.literal("succeeded"),
|
||||
v.literal("failed"),
|
||||
v.literal("canceled"),
|
||||
),
|
||||
currentStep: v.optional(v.string()),
|
||||
errorSummary: v.optional(v.string()),
|
||||
errors: v.optional(v.number()),
|
||||
},
|
||||
handler: async (ctx, args) => {
|
||||
const now = Date.now();
|
||||
|
||||
await ctx.db.patch(args.runId, {
|
||||
status: args.status,
|
||||
updatedAt: now,
|
||||
finishedAt: now,
|
||||
currentStep: args.currentStep ?? "website_enrichment",
|
||||
errorSummary: args.errorSummary,
|
||||
counters: {
|
||||
leadsFound: 1,
|
||||
leadsCreated: 0,
|
||||
auditsCreated: 0,
|
||||
outreachPrepared: 0,
|
||||
errors: args.errors ?? 0,
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
export const patchLeadFromWebsiteEnrichment = internalMutation({
|
||||
args: {
|
||||
leadId: v.id("leads"),
|
||||
email: v.optional(v.string()),
|
||||
emailSource: v.optional(v.string()),
|
||||
contactPerson: v.optional(v.string()),
|
||||
currentContactStatus: v.union(
|
||||
v.literal("new"),
|
||||
v.literal("missing_contact"),
|
||||
v.literal("audit_ready"),
|
||||
v.literal("outreach_ready"),
|
||||
v.literal("contacted"),
|
||||
v.literal("replied"),
|
||||
v.literal("do_not_contact"),
|
||||
),
|
||||
contactStatusReason: v.optional(v.string()),
|
||||
},
|
||||
handler: async (ctx, args) => {
|
||||
const lead = await ctx.db.get(args.leadId);
|
||||
if (!lead) {
|
||||
return null;
|
||||
}
|
||||
|
||||
type LeadPatch = {
|
||||
email?: string;
|
||||
normalizedEmail?: string;
|
||||
emailSource?: string;
|
||||
contactPerson?: string;
|
||||
contactStatus?: LeadContactStatus;
|
||||
contactStatusReason?: string;
|
||||
updatedAt: number;
|
||||
};
|
||||
|
||||
const patch: LeadPatch = {
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
|
||||
if (args.email && args.emailSource) {
|
||||
const normalized = normalizeEmailAddress(args.email);
|
||||
if (normalized) {
|
||||
patch.email = normalized;
|
||||
patch.normalizedEmail = normalized;
|
||||
patch.emailSource = args.emailSource;
|
||||
}
|
||||
}
|
||||
|
||||
if (args.contactPerson) {
|
||||
patch.contactPerson = args.contactPerson;
|
||||
}
|
||||
|
||||
if (args.contactStatusReason !== undefined) {
|
||||
patch.contactStatusReason = args.contactStatusReason;
|
||||
} else if (args.email && args.currentContactStatus === "missing_contact") {
|
||||
patch.contactStatus = "new";
|
||||
}
|
||||
|
||||
if (Object.keys(patch).length > 1) {
|
||||
await ctx.db.patch(args.leadId, patch);
|
||||
}
|
||||
|
||||
return args.leadId;
|
||||
},
|
||||
});
|
||||
Reference in New Issue
Block a user