Files
pitchfast/convex/outreach.ts
2026-06-05 16:47:22 +02:00

540 lines
16 KiB
TypeScript

import { v } from "convex/values";
import { normalizeListLimit } from "./domain";
import { internalMutation, mutation, query } from "./_generated/server";
import type { Doc, Id } from "./_generated/dataModel";
import type { MutationCtx, QueryCtx } from "./_generated/server";
const strategy = v.union(
v.literal("call_first"),
v.literal("email_first"),
v.literal("defer"),
v.literal("do_not_contact"),
);
const REVIEW_JOIN_LIMIT = 4;
const requireOperator = async (ctx: QueryCtx | MutationCtx) => {
const identity = await ctx.auth.getUserIdentity();
if (!identity) {
throw new Error("Nicht autorisiert.");
}
return identity;
};
const latestOutreachForLead = async (
ctx: QueryCtx,
leadId: Id<"leads">,
) => {
const rows = await ctx.db
.query("outreachRecords")
.withIndex("by_leadId", (q) => q.eq("leadId", leadId))
.order("desc")
.take(1);
return rows[0] ?? null;
};
const latestAuditForLead = async (ctx: QueryCtx, leadId: Id<"leads">) => {
const rows = await ctx.db
.query("audits")
.withIndex("by_leadId", (q) => q.eq("leadId", leadId))
.order("desc")
.take(1);
return rows[0] ?? null;
};
const loadReviewRow = async (
ctx: QueryCtx,
lead: Doc<"leads">,
reviewOutreach: Doc<"outreachRecords"> | null,
) => {
const latestOutreach = reviewOutreach ?? await latestOutreachForLead(ctx, lead._id);
const audit = latestOutreach?.auditId
? await ctx.db.get(latestOutreach.auditId)
: await latestAuditForLead(ctx, lead._id);
const auditGenerations = audit
? await ctx.db
.query("auditGenerations")
.withIndex("by_auditId", (q) => q.eq("auditId", audit._id))
.order("desc")
.take(REVIEW_JOIN_LIMIT)
: await ctx.db
.query("auditGenerations")
.withIndex("by_leadId", (q) => q.eq("leadId", lead._id))
.order("desc")
.take(REVIEW_JOIN_LIMIT);
const pageSpeedResults = audit
? await ctx.db
.query("pageSpeedResults")
.withIndex("by_auditId", (q) => q.eq("auditId", audit._id))
.order("desc")
.take(REVIEW_JOIN_LIMIT)
: await ctx.db
.query("pageSpeedResults")
.withIndex("by_leadId", (q) => q.eq("leadId", lead._id))
.order("desc")
.take(REVIEW_JOIN_LIMIT);
const crawlPages = await ctx.db
.query("websiteCrawlPages")
.withIndex("by_leadId_and_createdAt", (q) => q.eq("leadId", lead._id))
.order("desc")
.take(REVIEW_JOIN_LIMIT);
const emailCandidates = await ctx.db
.query("websiteEmailCandidates")
.withIndex("by_leadId", (q) => q.eq("leadId", lead._id))
.order("desc")
.take(REVIEW_JOIN_LIMIT);
return {
id: lead._id,
lead: {
id: lead._id,
companyName: lead.companyName,
niche: lead.niche ?? null,
address: lead.address ?? null,
city: lead.city ?? null,
postalCode: lead.postalCode ?? null,
websiteUrl: lead.websiteUrl ?? null,
websiteDomain: lead.websiteDomain ?? null,
email: lead.email ?? null,
normalizedEmail: lead.normalizedEmail ?? null,
phone: lead.phone ?? null,
normalizedPhone: lead.normalizedPhone ?? null,
contactPerson: lead.contactPerson ?? null,
priority: lead.priority,
priorityReason: lead.priorityReason ?? null,
contactStatus: lead.contactStatus,
contactStatusReason: lead.contactStatusReason ?? null,
duplicateStatus: lead.duplicateStatus,
duplicateReason: lead.duplicateReason ?? null,
blacklistStatus: lead.blacklistStatus,
blacklistReason: lead.blacklistReason ?? null,
notes: lead.notes ?? null,
googleMapsUrl: lead.googleMapsUrl ?? null,
googleRating: lead.googleRating ?? null,
googleUserRatingCount: lead.googleUserRatingCount ?? null,
updatedAt: lead.updatedAt,
},
latestOutreach: latestOutreach,
audit: audit,
auditGenerations: auditGenerations.map((generation) => ({
id: generation._id,
stage: generation.stage,
status: generation.status,
modelProfile: generation.modelProfile,
modelId: generation.modelId,
errorSummary: generation.errorSummary ?? null,
finishReason: generation.finishReason ?? null,
parsedJson: generation.parsedJson ?? null,
createdAt: generation.createdAt,
updatedAt: generation.updatedAt,
})),
usedSkills: audit?.usedSkills ?? [],
skillSummaries: audit?.skillSummaries ?? [],
sourceSummaries: {
pageSpeedResults: pageSpeedResults.map((result) => ({
id: result._id,
strategy: result.strategy,
status: result.status,
sourceUrl: result.sourceUrl,
finalUrl: result.finalUrl ?? null,
errorType: result.errorType ?? null,
errorSummary: result.errorSummary ?? null,
normalized: result.normalized ?? null,
fetchedAt: result.fetchedAt,
createdAt: result.createdAt,
})),
crawlPages: crawlPages.map((page) => ({
id: page._id,
sourceUrl: page.sourceUrl,
finalUrl: page.finalUrl,
pageKind: page.pageKind,
title: page.title ?? null,
metaDescription: page.metaDescription ?? null,
headings: page.headings.slice(0, REVIEW_JOIN_LIMIT),
visibleTextExcerpt: page.visibleTextExcerpt ?? null,
hasContactFormSignal: page.hasContactFormSignal,
hasContactCtaSignal: page.hasContactCtaSignal,
createdAt: page.createdAt,
})),
emailCandidates: emailCandidates.map((candidate) => ({
id: candidate._id,
email: candidate.email,
normalizedEmail: candidate.normalizedEmail,
emailSource: candidate.emailSource,
sourceUrl: candidate.sourceUrl,
contactPerson: candidate.contactPerson ?? null,
isBusinessContactAddress: candidate.isBusinessContactAddress,
isGeneric: candidate.isGeneric,
accepted: candidate.accepted,
createdAt: candidate.createdAt,
})),
},
sortAt: Math.max(
lead.updatedAt,
latestOutreach?.updatedAt ?? 0,
audit?.updatedAt ?? 0,
),
};
};
export const create = mutation({
args: {
leadId: v.id("leads"),
auditId: v.optional(v.id("audits")),
strategy,
phoneScript: v.optional(v.string()),
emailSubject: v.optional(v.string()),
emailBody: v.optional(v.string()),
followUpDraft: v.optional(v.string()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const lead = await ctx.db.get(args.leadId);
if (!lead) {
throw new Error("Lead wurde nicht gefunden.");
}
if (args.auditId) {
const audit = await ctx.db.get(args.auditId);
if (!audit) {
throw new Error("Audit wurde nicht gefunden.");
}
if (audit.leadId !== args.leadId) {
throw new Error("Audit gehoert nicht zu diesem Lead.");
}
}
const now = Date.now();
return await ctx.db.insert("outreachRecords", {
...args,
approvalStatus: "draft",
sendStatus: "not_sent",
responseStatus: "none",
salesStatus: "follow_up_planned",
createdAt: now,
updatedAt: now,
});
},
});
export const upsertFromAuditGeneration = internalMutation({
args: {
leadId: v.id("leads"),
auditId: v.optional(v.id("audits")),
strategy: strategy,
phoneScript: v.optional(v.string()),
emailSubject: v.optional(v.string()),
emailBody: v.optional(v.string()),
followUpDraft: v.optional(v.string()),
},
handler: async (ctx, args) => {
const now = Date.now();
const lead = await ctx.db.get(args.leadId);
if (!lead) {
throw new Error("Lead wurde nicht gefunden.");
}
if (args.auditId) {
const audit = await ctx.db.get(args.auditId);
if (!audit) {
throw new Error("Audit wurde nicht gefunden.");
}
if (audit.leadId !== args.leadId) {
throw new Error("Audit gehoert nicht zu diesem Lead.");
}
}
const existing = await ctx.db
.query("outreachRecords")
.withIndex("by_leadId", (q) => q.eq("leadId", args.leadId))
.order("desc")
.take(1);
if (existing.length > 0) {
const current = existing[0]!;
if (current.sendStatus === "sent") {
return await ctx.db.insert("outreachRecords", {
...args,
approvalStatus: "draft",
sendStatus: "not_sent",
responseStatus: "none",
salesStatus: "follow_up_planned",
createdAt: now,
updatedAt: now,
});
}
await ctx.db.patch(current._id, {
...(args.auditId !== undefined ? { auditId: args.auditId } : {}),
strategy: args.strategy,
...(args.phoneScript !== undefined ? { phoneScript: args.phoneScript } : {}),
...(args.emailSubject !== undefined
? { emailSubject: args.emailSubject }
: {}),
...(args.emailBody !== undefined ? { emailBody: args.emailBody } : {}),
...(args.followUpDraft !== undefined
? { followUpDraft: args.followUpDraft }
: {}),
approvalStatus: "draft",
updatedAt: now,
});
return current._id;
}
return await ctx.db.insert("outreachRecords", {
...args,
approvalStatus: "draft",
sendStatus: "not_sent",
responseStatus: "none",
salesStatus: "follow_up_planned",
createdAt: now,
updatedAt: now,
});
},
});
export const listReviewWorkspace = query({
args: {
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const limit = normalizeListLimit(args.limit);
const candidateLimit = Math.min(limit * 10, 300);
const outreachReadyLeads = await ctx.db
.query("leads")
.withIndex("by_contactStatus_and_updatedAt", (q) =>
q.eq("contactStatus", "outreach_ready"),
)
.order("desc")
.take(candidateLimit);
const draftNotSentOutreach = await ctx.db
.query("outreachRecords")
.withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) =>
q.eq("approvalStatus", "draft").eq("sendStatus", "not_sent"),
)
.order("desc")
.take(candidateLimit);
const draftQueuedOutreach = await ctx.db
.query("outreachRecords")
.withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) =>
q.eq("approvalStatus", "draft").eq("sendStatus", "queued"),
)
.order("desc")
.take(candidateLimit);
const draftFailedOutreach = await ctx.db
.query("outreachRecords")
.withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) =>
q.eq("approvalStatus", "draft").eq("sendStatus", "failed"),
)
.order("desc")
.take(candidateLimit);
const approvedNotSentOutreach = await ctx.db
.query("outreachRecords")
.withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) =>
q.eq("approvalStatus", "approved").eq("sendStatus", "not_sent"),
)
.order("desc")
.take(candidateLimit);
const approvedQueuedOutreach = await ctx.db
.query("outreachRecords")
.withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) =>
q.eq("approvalStatus", "approved").eq("sendStatus", "queued"),
)
.order("desc")
.take(candidateLimit);
const approvedFailedOutreach = await ctx.db
.query("outreachRecords")
.withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) =>
q.eq("approvalStatus", "approved").eq("sendStatus", "failed"),
)
.order("desc")
.take(candidateLimit);
const leadCandidates = new Map<
Id<"leads">,
{ lead: Doc<"leads">; outreach: Doc<"outreachRecords"> | null }
>();
for (const lead of outreachReadyLeads) {
leadCandidates.set(lead._id, { lead, outreach: null });
}
const reviewOutreach = [
...draftNotSentOutreach,
...draftQueuedOutreach,
...draftFailedOutreach,
...approvedNotSentOutreach,
...approvedQueuedOutreach,
...approvedFailedOutreach,
]
.filter((outreach) =>
(outreach.approvalStatus === "draft" ||
outreach.approvalStatus === "approved") &&
outreach.sendStatus !== "sent"
)
.sort((a, b) => b.updatedAt - a.updatedAt);
for (const outreach of reviewOutreach) {
const lead = await ctx.db.get(outreach.leadId);
if (!lead) {
continue;
}
const existing = leadCandidates.get(lead._id);
if (!existing || (existing.outreach?.updatedAt ?? 0) < outreach.updatedAt) {
leadCandidates.set(lead._id, { lead, outreach });
}
}
const rows = await Promise.all(
[...leadCandidates.values()].map(({ lead, outreach }) =>
loadReviewRow(ctx, lead, outreach),
),
);
return rows
.sort((a, b) => b.sortAt - a.sortAt)
.slice(0, limit)
.map(({ sortAt, ...row }) => (void sortAt, row));
},
});
export const saveReviewDraft = mutation({
args: {
id: v.id("outreachRecords"),
strategy: strategy,
phoneScript: v.optional(v.string()),
emailSubject: v.optional(v.string()),
emailBody: v.optional(v.string()),
followUpDraft: v.optional(v.string()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const outreach = await ctx.db.get(args.id);
if (!outreach) {
throw new Error("Outreach-Datensatz wurde nicht gefunden.");
}
if (outreach.sendStatus === "sent") {
throw new Error("Gesendete Outreach-Datensaetze koennen nicht bearbeitet werden.");
}
const now = Date.now();
await ctx.db.patch(args.id, {
strategy: args.strategy,
...(args.phoneScript !== undefined ? { phoneScript: args.phoneScript } : {}),
...(args.emailSubject !== undefined
? { emailSubject: args.emailSubject }
: {}),
...(args.emailBody !== undefined ? { emailBody: args.emailBody } : {}),
...(args.followUpDraft !== undefined
? { followUpDraft: args.followUpDraft }
: {}),
approvalStatus: "draft",
updatedAt: now,
});
return { id: args.id, approvalStatus: "draft", updatedAt: now };
},
});
export const approveEmailDraft = mutation({
args: {
id: v.id("outreachRecords"),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const outreach = await ctx.db.get(args.id);
if (!outreach) {
throw new Error("Outreach-Datensatz wurde nicht gefunden.");
}
if (outreach.sendStatus === "sent") {
throw new Error("Gesendete Outreach-Datensaetze koennen nicht freigegeben werden.");
}
const lead = await ctx.db.get(outreach.leadId);
if (!lead) {
throw new Error("Lead wurde nicht gefunden.");
}
const recipient = lead.email?.trim();
const subject = outreach.emailSubject?.trim();
const body = outreach.emailBody?.trim();
if (!recipient) {
throw new Error("Empfaenger-E-Mail fehlt.");
}
if (!subject) {
throw new Error("E-Mail-Betreff fehlt.");
}
if (!body) {
throw new Error("E-Mail-Text fehlt.");
}
const audit = outreach.auditId ? await ctx.db.get(outreach.auditId) : null;
const now = Date.now();
await ctx.db.patch(args.id, {
approvalStatus: "approved",
updatedAt: now,
});
return {
id: args.id,
recipient: recipient,
subject: subject,
auditSlug: audit?.slug ?? null,
approvalStatus: "approved",
updatedAt: now,
};
},
});
export const list = query({
args: {
leadId: v.optional(v.id("leads")),
approvalStatus: v.optional(
v.union(v.literal("draft"), v.literal("approved"), v.literal("rejected")),
),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const limit = normalizeListLimit(args.limit);
if (args.leadId) {
const leadId = args.leadId;
return await ctx.db
.query("outreachRecords")
.withIndex("by_leadId", (q) => q.eq("leadId", leadId))
.order("desc")
.take(limit);
}
if (args.approvalStatus) {
const approvalStatus = args.approvalStatus;
return await ctx.db
.query("outreachRecords")
.withIndex("by_approvalStatus", (q) =>
q.eq("approvalStatus", approvalStatus),
)
.order("desc")
.take(limit);
}
return await ctx.db.query("outreachRecords").order("desc").take(limit);
},
});