import { v } from "convex/values"; import { normalizeListLimit } from "./domain"; import { internalMutation, mutation, query } from "./_generated/server"; import type { Doc, Id } from "./_generated/dataModel"; import type { MutationCtx, QueryCtx } from "./_generated/server"; const strategy = v.union( v.literal("call_first"), v.literal("email_first"), v.literal("defer"), v.literal("do_not_contact"), ); const REVIEW_JOIN_LIMIT = 4; const requireOperator = async (ctx: QueryCtx | MutationCtx) => { const identity = await ctx.auth.getUserIdentity(); if (!identity) { throw new Error("Nicht autorisiert."); } return identity; }; const latestOutreachForLead = async ( ctx: QueryCtx, leadId: Id<"leads">, ) => { const rows = await ctx.db .query("outreachRecords") .withIndex("by_leadId", (q) => q.eq("leadId", leadId)) .order("desc") .take(1); return rows[0] ?? null; }; const latestAuditForLead = async (ctx: QueryCtx, leadId: Id<"leads">) => { const rows = await ctx.db .query("audits") .withIndex("by_leadId", (q) => q.eq("leadId", leadId)) .order("desc") .take(1); return rows[0] ?? null; }; const loadReviewRow = async ( ctx: QueryCtx, lead: Doc<"leads">, reviewOutreach: Doc<"outreachRecords"> | null, ) => { const latestOutreach = reviewOutreach ?? await latestOutreachForLead(ctx, lead._id); const audit = latestOutreach?.auditId ? await ctx.db.get(latestOutreach.auditId) : await latestAuditForLead(ctx, lead._id); const auditGenerations = audit ? await ctx.db .query("auditGenerations") .withIndex("by_auditId", (q) => q.eq("auditId", audit._id)) .order("desc") .take(REVIEW_JOIN_LIMIT) : await ctx.db .query("auditGenerations") .withIndex("by_leadId", (q) => q.eq("leadId", lead._id)) .order("desc") .take(REVIEW_JOIN_LIMIT); const pageSpeedResults = audit ? await ctx.db .query("pageSpeedResults") .withIndex("by_auditId", (q) => q.eq("auditId", audit._id)) .order("desc") .take(REVIEW_JOIN_LIMIT) : await ctx.db .query("pageSpeedResults") .withIndex("by_leadId", (q) => q.eq("leadId", lead._id)) .order("desc") .take(REVIEW_JOIN_LIMIT); const crawlPages = await ctx.db .query("websiteCrawlPages") .withIndex("by_leadId_and_createdAt", (q) => q.eq("leadId", lead._id)) .order("desc") .take(REVIEW_JOIN_LIMIT); const emailCandidates = await ctx.db .query("websiteEmailCandidates") .withIndex("by_leadId", (q) => q.eq("leadId", lead._id)) .order("desc") .take(REVIEW_JOIN_LIMIT); return { id: lead._id, lead: { id: lead._id, companyName: lead.companyName, niche: lead.niche ?? null, address: lead.address ?? null, city: lead.city ?? null, postalCode: lead.postalCode ?? null, websiteUrl: lead.websiteUrl ?? null, websiteDomain: lead.websiteDomain ?? null, email: lead.email ?? null, normalizedEmail: lead.normalizedEmail ?? null, phone: lead.phone ?? null, normalizedPhone: lead.normalizedPhone ?? null, contactPerson: lead.contactPerson ?? null, priority: lead.priority, priorityReason: lead.priorityReason ?? null, contactStatus: lead.contactStatus, contactStatusReason: lead.contactStatusReason ?? null, duplicateStatus: lead.duplicateStatus, duplicateReason: lead.duplicateReason ?? null, blacklistStatus: lead.blacklistStatus, blacklistReason: lead.blacklistReason ?? null, notes: lead.notes ?? null, googleMapsUrl: lead.googleMapsUrl ?? null, googleRating: lead.googleRating ?? null, googleUserRatingCount: lead.googleUserRatingCount ?? null, updatedAt: lead.updatedAt, }, latestOutreach: latestOutreach, audit: audit, auditGenerations: auditGenerations.map((generation) => ({ id: generation._id, stage: generation.stage, status: generation.status, modelProfile: generation.modelProfile, modelId: generation.modelId, errorSummary: generation.errorSummary ?? null, finishReason: generation.finishReason ?? null, parsedJson: generation.parsedJson ?? null, createdAt: generation.createdAt, updatedAt: generation.updatedAt, })), usedSkills: audit?.usedSkills ?? [], skillSummaries: audit?.skillSummaries ?? [], sourceSummaries: { pageSpeedResults: pageSpeedResults.map((result) => ({ id: result._id, strategy: result.strategy, status: result.status, sourceUrl: result.sourceUrl, finalUrl: result.finalUrl ?? null, errorType: result.errorType ?? null, errorSummary: result.errorSummary ?? null, normalized: result.normalized ?? null, fetchedAt: result.fetchedAt, createdAt: result.createdAt, })), crawlPages: crawlPages.map((page) => ({ id: page._id, sourceUrl: page.sourceUrl, finalUrl: page.finalUrl, pageKind: page.pageKind, title: page.title ?? null, metaDescription: page.metaDescription ?? null, headings: page.headings.slice(0, REVIEW_JOIN_LIMIT), visibleTextExcerpt: page.visibleTextExcerpt ?? null, hasContactFormSignal: page.hasContactFormSignal, hasContactCtaSignal: page.hasContactCtaSignal, createdAt: page.createdAt, })), emailCandidates: emailCandidates.map((candidate) => ({ id: candidate._id, email: candidate.email, normalizedEmail: candidate.normalizedEmail, emailSource: candidate.emailSource, sourceUrl: candidate.sourceUrl, contactPerson: candidate.contactPerson ?? null, isBusinessContactAddress: candidate.isBusinessContactAddress, isGeneric: candidate.isGeneric, accepted: candidate.accepted, createdAt: candidate.createdAt, })), }, sortAt: Math.max( lead.updatedAt, latestOutreach?.updatedAt ?? 0, audit?.updatedAt ?? 0, ), }; }; export const create = mutation({ args: { leadId: v.id("leads"), auditId: v.optional(v.id("audits")), strategy, phoneScript: v.optional(v.string()), emailSubject: v.optional(v.string()), emailBody: v.optional(v.string()), followUpDraft: v.optional(v.string()), }, handler: async (ctx, args) => { await requireOperator(ctx); const lead = await ctx.db.get(args.leadId); if (!lead) { throw new Error("Lead wurde nicht gefunden."); } if (args.auditId) { const audit = await ctx.db.get(args.auditId); if (!audit) { throw new Error("Audit wurde nicht gefunden."); } if (audit.leadId !== args.leadId) { throw new Error("Audit gehoert nicht zu diesem Lead."); } } const now = Date.now(); return await ctx.db.insert("outreachRecords", { ...args, approvalStatus: "draft", sendStatus: "not_sent", responseStatus: "none", salesStatus: "follow_up_planned", createdAt: now, updatedAt: now, }); }, }); export const upsertFromAuditGeneration = internalMutation({ args: { leadId: v.id("leads"), auditId: v.optional(v.id("audits")), strategy: strategy, phoneScript: v.optional(v.string()), emailSubject: v.optional(v.string()), emailBody: v.optional(v.string()), followUpDraft: v.optional(v.string()), }, handler: async (ctx, args) => { const now = Date.now(); const lead = await ctx.db.get(args.leadId); if (!lead) { throw new Error("Lead wurde nicht gefunden."); } if (args.auditId) { const audit = await ctx.db.get(args.auditId); if (!audit) { throw new Error("Audit wurde nicht gefunden."); } if (audit.leadId !== args.leadId) { throw new Error("Audit gehoert nicht zu diesem Lead."); } } const existing = await ctx.db .query("outreachRecords") .withIndex("by_leadId", (q) => q.eq("leadId", args.leadId)) .order("desc") .take(1); if (existing.length > 0) { const current = existing[0]!; if (current.sendStatus === "sent") { return await ctx.db.insert("outreachRecords", { ...args, approvalStatus: "draft", sendStatus: "not_sent", responseStatus: "none", salesStatus: "follow_up_planned", createdAt: now, updatedAt: now, }); } await ctx.db.patch(current._id, { ...(args.auditId !== undefined ? { auditId: args.auditId } : {}), strategy: args.strategy, ...(args.phoneScript !== undefined ? { phoneScript: args.phoneScript } : {}), ...(args.emailSubject !== undefined ? { emailSubject: args.emailSubject } : {}), ...(args.emailBody !== undefined ? { emailBody: args.emailBody } : {}), ...(args.followUpDraft !== undefined ? { followUpDraft: args.followUpDraft } : {}), approvalStatus: "draft", updatedAt: now, }); return current._id; } return await ctx.db.insert("outreachRecords", { ...args, approvalStatus: "draft", sendStatus: "not_sent", responseStatus: "none", salesStatus: "follow_up_planned", createdAt: now, updatedAt: now, }); }, }); export const listReviewWorkspace = query({ args: { limit: v.optional(v.number()), }, handler: async (ctx, args) => { await requireOperator(ctx); const limit = normalizeListLimit(args.limit); const candidateLimit = Math.min(limit * 10, 300); const outreachReadyLeads = await ctx.db .query("leads") .withIndex("by_contactStatus_and_updatedAt", (q) => q.eq("contactStatus", "outreach_ready"), ) .order("desc") .take(candidateLimit); const draftNotSentOutreach = await ctx.db .query("outreachRecords") .withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) => q.eq("approvalStatus", "draft").eq("sendStatus", "not_sent"), ) .order("desc") .take(candidateLimit); const draftQueuedOutreach = await ctx.db .query("outreachRecords") .withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) => q.eq("approvalStatus", "draft").eq("sendStatus", "queued"), ) .order("desc") .take(candidateLimit); const draftFailedOutreach = await ctx.db .query("outreachRecords") .withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) => q.eq("approvalStatus", "draft").eq("sendStatus", "failed"), ) .order("desc") .take(candidateLimit); const approvedNotSentOutreach = await ctx.db .query("outreachRecords") .withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) => q.eq("approvalStatus", "approved").eq("sendStatus", "not_sent"), ) .order("desc") .take(candidateLimit); const approvedQueuedOutreach = await ctx.db .query("outreachRecords") .withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) => q.eq("approvalStatus", "approved").eq("sendStatus", "queued"), ) .order("desc") .take(candidateLimit); const approvedFailedOutreach = await ctx.db .query("outreachRecords") .withIndex("by_approvalStatus_and_sendStatus_and_updatedAt", (q) => q.eq("approvalStatus", "approved").eq("sendStatus", "failed"), ) .order("desc") .take(candidateLimit); const leadCandidates = new Map< Id<"leads">, { lead: Doc<"leads">; outreach: Doc<"outreachRecords"> | null } >(); for (const lead of outreachReadyLeads) { leadCandidates.set(lead._id, { lead, outreach: null }); } const reviewOutreach = [ ...draftNotSentOutreach, ...draftQueuedOutreach, ...draftFailedOutreach, ...approvedNotSentOutreach, ...approvedQueuedOutreach, ...approvedFailedOutreach, ] .filter((outreach) => (outreach.approvalStatus === "draft" || outreach.approvalStatus === "approved") && outreach.sendStatus !== "sent" ) .sort((a, b) => b.updatedAt - a.updatedAt); for (const outreach of reviewOutreach) { const lead = await ctx.db.get(outreach.leadId); if (!lead) { continue; } const existing = leadCandidates.get(lead._id); if (!existing || (existing.outreach?.updatedAt ?? 0) < outreach.updatedAt) { leadCandidates.set(lead._id, { lead, outreach }); } } const rows = await Promise.all( [...leadCandidates.values()].map(({ lead, outreach }) => loadReviewRow(ctx, lead, outreach), ), ); return rows .sort((a, b) => b.sortAt - a.sortAt) .slice(0, limit) .map(({ sortAt, ...row }) => (void sortAt, row)); }, }); export const saveReviewDraft = mutation({ args: { id: v.id("outreachRecords"), strategy: strategy, phoneScript: v.optional(v.string()), emailSubject: v.optional(v.string()), emailBody: v.optional(v.string()), followUpDraft: v.optional(v.string()), }, handler: async (ctx, args) => { await requireOperator(ctx); const outreach = await ctx.db.get(args.id); if (!outreach) { throw new Error("Outreach-Datensatz wurde nicht gefunden."); } if (outreach.sendStatus === "sent") { throw new Error("Gesendete Outreach-Datensaetze koennen nicht bearbeitet werden."); } const now = Date.now(); await ctx.db.patch(args.id, { strategy: args.strategy, ...(args.phoneScript !== undefined ? { phoneScript: args.phoneScript } : {}), ...(args.emailSubject !== undefined ? { emailSubject: args.emailSubject } : {}), ...(args.emailBody !== undefined ? { emailBody: args.emailBody } : {}), ...(args.followUpDraft !== undefined ? { followUpDraft: args.followUpDraft } : {}), approvalStatus: "draft", updatedAt: now, }); return { id: args.id, approvalStatus: "draft", updatedAt: now }; }, }); export const approveEmailDraft = mutation({ args: { id: v.id("outreachRecords"), }, handler: async (ctx, args) => { await requireOperator(ctx); const outreach = await ctx.db.get(args.id); if (!outreach) { throw new Error("Outreach-Datensatz wurde nicht gefunden."); } if (outreach.sendStatus === "sent") { throw new Error("Gesendete Outreach-Datensaetze koennen nicht freigegeben werden."); } const lead = await ctx.db.get(outreach.leadId); if (!lead) { throw new Error("Lead wurde nicht gefunden."); } const recipient = lead.email?.trim(); const subject = outreach.emailSubject?.trim(); const body = outreach.emailBody?.trim(); if (!recipient) { throw new Error("Empfaenger-E-Mail fehlt."); } if (!subject) { throw new Error("E-Mail-Betreff fehlt."); } if (!body) { throw new Error("E-Mail-Text fehlt."); } const audit = outreach.auditId ? await ctx.db.get(outreach.auditId) : null; const now = Date.now(); await ctx.db.patch(args.id, { approvalStatus: "approved", updatedAt: now, }); return { id: args.id, recipient: recipient, subject: subject, auditSlug: audit?.slug ?? null, approvalStatus: "approved", updatedAt: now, }; }, }); export const list = query({ args: { leadId: v.optional(v.id("leads")), approvalStatus: v.optional( v.union(v.literal("draft"), v.literal("approved"), v.literal("rejected")), ), limit: v.optional(v.number()), }, handler: async (ctx, args) => { await requireOperator(ctx); const limit = normalizeListLimit(args.limit); if (args.leadId) { const leadId = args.leadId; return await ctx.db .query("outreachRecords") .withIndex("by_leadId", (q) => q.eq("leadId", leadId)) .order("desc") .take(limit); } if (args.approvalStatus) { const approvalStatus = args.approvalStatus; return await ctx.db .query("outreachRecords") .withIndex("by_approvalStatus", (q) => q.eq("approvalStatus", approvalStatus), ) .order("desc") .take(limit); } return await ctx.db.query("outreachRecords").order("desc").take(limit); }, });