feat: add OpenRouter audit generation pipeline
This commit is contained in:
578
convex/auditGeneration.ts
Normal file
578
convex/auditGeneration.ts
Normal file
@@ -0,0 +1,578 @@
|
||||
import { internal } from "./_generated/api";
|
||||
import type { Doc, Id } from "./_generated/dataModel";
|
||||
import { internalMutation, internalQuery } from "./_generated/server";
|
||||
import {
|
||||
AUDIT_GENERATION_STAGES,
|
||||
AUDIT_GENERATION_STATUSES,
|
||||
RUN_STATUSES,
|
||||
} from "./domain";
|
||||
import { v } from "convex/values";
|
||||
import {
|
||||
type PageSpeedAuditErrorType,
|
||||
type PageSpeedMinimalAuditResult,
|
||||
} from "../lib/pagespeed-audit-input";
|
||||
|
||||
export const MAX_PROMPT_BYTES = 12_000;
|
||||
export const MAX_RAW_RESPONSE_BYTES = 12_000;
|
||||
export const MAX_PARSED_JSON_BYTES = 12_000;
|
||||
const TRUNCATION_MARKER = "\n\n[... abgeschnitten ...]";
|
||||
|
||||
const auditGenerationStage = v.union(
|
||||
...AUDIT_GENERATION_STAGES.map((stage) => v.literal(stage)),
|
||||
);
|
||||
const auditGenerationStatus = v.union(
|
||||
...AUDIT_GENERATION_STATUSES.map((status) => v.literal(status)),
|
||||
);
|
||||
const runStatus = v.union(...RUN_STATUSES.map((status) => v.literal(status)));
|
||||
|
||||
const auditGenerationParsedValue = v.union(
|
||||
v.string(),
|
||||
v.number(),
|
||||
v.boolean(),
|
||||
v.null(),
|
||||
v.array(v.any()),
|
||||
v.record(v.string(), v.any()),
|
||||
);
|
||||
|
||||
const auditGenerationParsedJson = v.union(
|
||||
v.string(),
|
||||
v.record(v.string(), auditGenerationParsedValue),
|
||||
);
|
||||
|
||||
type AuditGenerationLead = Pick<
|
||||
Doc<"leads">,
|
||||
| "_id"
|
||||
| "companyName"
|
||||
| "niche"
|
||||
| "city"
|
||||
| "address"
|
||||
| "websiteUrl"
|
||||
| "websiteDomain"
|
||||
| "phone"
|
||||
| "contactPerson"
|
||||
>;
|
||||
type AuditGenerationEvidenceCrawlPage = Pick<
|
||||
Doc<"websiteCrawlPages">,
|
||||
| "sourceUrl"
|
||||
| "finalUrl"
|
||||
| "title"
|
||||
| "metaDescription"
|
||||
| "pageKind"
|
||||
| "hasContactFormSignal"
|
||||
| "hasContactCtaSignal"
|
||||
| "visibleTextExcerpt"
|
||||
>;
|
||||
type AuditGenerationEvidenceTechnicalCheck = Pick<
|
||||
Doc<"websiteTechnicalChecks">,
|
||||
| "sourceUrl"
|
||||
| "finalUrl"
|
||||
| "usesHttps"
|
||||
| "missingTitle"
|
||||
| "missingMetaDescription"
|
||||
| "hasVisibleContactPath"
|
||||
| "brokenInternalLinkCount"
|
||||
>;
|
||||
type AuditGenerationEvidenceScreenshot = Pick<
|
||||
Doc<"websiteCrawlScreenshots">,
|
||||
| "storageId"
|
||||
| "viewport"
|
||||
| "sourceUrl"
|
||||
| "capturedAt"
|
||||
| "width"
|
||||
| "height"
|
||||
| "mimeType"
|
||||
>;
|
||||
|
||||
type AuditGenerationEvidence = {
|
||||
lead: AuditGenerationLead;
|
||||
crawlPages: AuditGenerationEvidenceCrawlPage[];
|
||||
technicalChecks: AuditGenerationEvidenceTechnicalCheck[];
|
||||
screenshots: AuditGenerationEvidenceScreenshot[];
|
||||
pageSpeedInputs: PageSpeedMinimalAuditResult[];
|
||||
};
|
||||
|
||||
function byteLength(value: string) {
|
||||
return new TextEncoder().encode(value).byteLength;
|
||||
}
|
||||
|
||||
function truncateToByteLimit(value: string, maxBytes: number) {
|
||||
if (maxBytes <= 0) {
|
||||
return "";
|
||||
}
|
||||
|
||||
let usedBytes = 0;
|
||||
let endIndex = 0;
|
||||
|
||||
for (const char of value) {
|
||||
const charBytes = byteLength(char);
|
||||
if (usedBytes + charBytes > maxBytes) {
|
||||
break;
|
||||
}
|
||||
usedBytes += charBytes;
|
||||
endIndex += char.length;
|
||||
}
|
||||
|
||||
return value.slice(0, endIndex);
|
||||
}
|
||||
|
||||
function truncateWithMarker(value: string, maxBytes: number) {
|
||||
if (byteLength(value) <= maxBytes) {
|
||||
return value;
|
||||
}
|
||||
|
||||
const markerBytes = byteLength(TRUNCATION_MARKER);
|
||||
if (markerBytes >= maxBytes) {
|
||||
const markerBytesBuffer = new TextEncoder().encode(TRUNCATION_MARKER);
|
||||
return new TextDecoder().decode(markerBytesBuffer.slice(0, maxBytes));
|
||||
}
|
||||
|
||||
const byteBudget = Math.max(0, maxBytes - markerBytes);
|
||||
const trimmed = truncateToByteLimit(value, byteBudget);
|
||||
|
||||
return `${trimmed}${TRUNCATION_MARKER}`;
|
||||
}
|
||||
|
||||
function sanitizeAndCapString(value: string | undefined, maxBytes: number) {
|
||||
if (!value) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const safe = (sanitizeSecretCandidates(value) ?? "").trim();
|
||||
return byteLength(safe) > maxBytes ? truncateWithMarker(safe, maxBytes) : safe;
|
||||
}
|
||||
|
||||
function safeStringify(value: unknown): string {
|
||||
try {
|
||||
return JSON.stringify(value);
|
||||
} catch {
|
||||
return "[unserializable payload]";
|
||||
}
|
||||
}
|
||||
|
||||
function sanitizeAndCapParsedJson(parsedJson: unknown) {
|
||||
if (parsedJson === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (typeof parsedJson === "string") {
|
||||
return sanitizeAndCapString(parsedJson, MAX_PARSED_JSON_BYTES);
|
||||
}
|
||||
|
||||
const serialized = safeStringify(parsedJson);
|
||||
const safeSerialized = sanitizeSecretCandidates(serialized) ?? "";
|
||||
if (byteLength(safeSerialized) <= MAX_PARSED_JSON_BYTES) {
|
||||
return safeSerialized;
|
||||
}
|
||||
|
||||
return truncateWithMarker(safeSerialized, MAX_PARSED_JSON_BYTES);
|
||||
}
|
||||
|
||||
function normalizePageSpeedResultRow(
|
||||
row: Doc<"pageSpeedResults">,
|
||||
): PageSpeedMinimalAuditResult {
|
||||
return {
|
||||
strategy: row.strategy,
|
||||
status: row.status,
|
||||
sourceUrl: row.sourceUrl,
|
||||
...(row.finalUrl ? { finalUrl: row.finalUrl } : {}),
|
||||
...(row.normalized ? { normalized: row.normalized } : {}),
|
||||
...(row.errorType ? { errorType: row.errorType as PageSpeedAuditErrorType } : {}),
|
||||
...(row.errorSummary ? { errorSummary: row.errorSummary } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
const auditGenerationUsage = v.object({
|
||||
promptTokens: v.optional(v.number()),
|
||||
completionTokens: v.optional(v.number()),
|
||||
totalTokens: v.optional(v.number()),
|
||||
cacheReadTokens: v.optional(v.number()),
|
||||
totalCostUsd: v.optional(v.number()),
|
||||
});
|
||||
|
||||
const secretHints = [
|
||||
"OPENROUTER_API_KEY",
|
||||
"GOOGLE_PLACES_API_KEY",
|
||||
"GOOGLE_GEOCODING_API_KEY",
|
||||
"PAGESPEED_API_KEY",
|
||||
"SMTP_PASSWORD",
|
||||
"SMTP_HOST",
|
||||
"SMTP_USER",
|
||||
"BETTER_AUTH_SECRET",
|
||||
"RYBBIT_API_KEY",
|
||||
];
|
||||
|
||||
function sanitizeSecretCandidates(value: string | undefined): string | undefined {
|
||||
if (!value) {
|
||||
return value;
|
||||
}
|
||||
|
||||
let sanitized = value;
|
||||
|
||||
for (const key of secretHints) {
|
||||
const secret = process.env[key];
|
||||
if (!secret) {
|
||||
continue;
|
||||
}
|
||||
|
||||
sanitized = sanitized.replace(
|
||||
new RegExp(escapeRegExp(secret), "g"),
|
||||
"[REDACTED]",
|
||||
);
|
||||
}
|
||||
|
||||
return sanitized
|
||||
.replace(/\b(?:api[_-]?key|token|secret|password)\s*[:=]\s*[^\s\"']+/gi, "[REDACTED]")
|
||||
.trim();
|
||||
}
|
||||
|
||||
function escapeRegExp(value: string) {
|
||||
return value.replace(/[.*+?^${}()|[\\]\\]/g, "\\$&");
|
||||
}
|
||||
|
||||
type StartLeadSnapshot = Pick<
|
||||
Doc<"leads">,
|
||||
"_id" | "websiteUrl" | "websiteDomain" | "contactStatus"
|
||||
>;
|
||||
|
||||
export const getAuditGenerationEvidence = internalQuery({
|
||||
args: {
|
||||
runId: v.id("agentRuns"),
|
||||
},
|
||||
handler: async (ctx, args): Promise<AuditGenerationEvidence | null> => {
|
||||
const run = await ctx.db.get(args.runId);
|
||||
if (!run || !run.leadId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const lead = await ctx.db.get(run.leadId);
|
||||
if (!lead) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const runIdFilter = {
|
||||
table: "by_runId" as const,
|
||||
value: args.runId,
|
||||
};
|
||||
const leadIdFilter = {
|
||||
table: "by_leadId" as const,
|
||||
value: lead._id,
|
||||
};
|
||||
|
||||
const crawlPagesByRun = await ctx.db
|
||||
.query("websiteCrawlPages")
|
||||
.withIndex("by_runId", (q) => q.eq("runId", runIdFilter.value))
|
||||
.order("desc")
|
||||
.take(40);
|
||||
|
||||
const technicalChecksByRun = await ctx.db
|
||||
.query("websiteTechnicalChecks")
|
||||
.withIndex("by_runId", (q) => q.eq("runId", runIdFilter.value))
|
||||
.order("desc")
|
||||
.take(80);
|
||||
|
||||
const screenshotsByRun = await ctx.db
|
||||
.query("websiteCrawlScreenshots")
|
||||
.withIndex("by_runId", (q) => q.eq("runId", runIdFilter.value))
|
||||
.order("desc")
|
||||
.take(20);
|
||||
|
||||
const pageSpeedByRun = run.auditId
|
||||
? await ctx.db
|
||||
.query("pageSpeedResults")
|
||||
.withIndex("by_auditId", (q) => q.eq("auditId", run.auditId as Id<"audits">))
|
||||
.order("desc")
|
||||
.take(20)
|
||||
: await ctx.db
|
||||
.query("pageSpeedResults")
|
||||
.withIndex("by_leadId", (q) => q.eq("leadId", leadIdFilter.value))
|
||||
.order("desc")
|
||||
.take(20);
|
||||
|
||||
const crawlPages = crawlPagesByRun;
|
||||
const technicalChecks = technicalChecksByRun;
|
||||
const screenshots = screenshotsByRun;
|
||||
|
||||
return {
|
||||
lead: {
|
||||
_id: lead._id,
|
||||
companyName: lead.companyName,
|
||||
niche: lead.niche,
|
||||
city: lead.city,
|
||||
address: lead.address,
|
||||
websiteUrl: lead.websiteUrl,
|
||||
websiteDomain: lead.websiteDomain,
|
||||
phone: lead.phone,
|
||||
contactPerson: lead.contactPerson,
|
||||
},
|
||||
crawlPages,
|
||||
technicalChecks,
|
||||
screenshots,
|
||||
pageSpeedInputs: pageSpeedByRun.map(normalizePageSpeedResultRow),
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
export const queueLeadAuditGeneration = internalMutation({
|
||||
args: {
|
||||
leadId: v.id("leads"),
|
||||
auditId: v.optional(v.id("audits")),
|
||||
parentRunId: v.optional(v.id("agentRuns")),
|
||||
},
|
||||
returns: v.union(v.id("agentRuns"), v.null()),
|
||||
handler: async (ctx, args): Promise<Id<"agentRuns"> | null> => {
|
||||
const now = Date.now();
|
||||
const lead = await ctx.db.get(args.leadId);
|
||||
|
||||
if (!lead) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const existingPending = await ctx.db
|
||||
.query("agentRuns")
|
||||
.withIndex("by_type_and_status_and_leadId", (q) =>
|
||||
q
|
||||
.eq("type", "audit_generation")
|
||||
.eq("status", "pending")
|
||||
.eq("leadId", args.leadId),
|
||||
)
|
||||
.take(1);
|
||||
|
||||
const existingRunning = await ctx.db
|
||||
.query("agentRuns")
|
||||
.withIndex("by_type_and_status_and_leadId", (q) =>
|
||||
q
|
||||
.eq("type", "audit_generation")
|
||||
.eq("status", "running")
|
||||
.eq("leadId", args.leadId),
|
||||
)
|
||||
.take(1);
|
||||
|
||||
if (existingPending.length > 0) {
|
||||
return existingPending[0]._id;
|
||||
}
|
||||
|
||||
if (existingRunning.length > 0) {
|
||||
return existingRunning[0]._id;
|
||||
}
|
||||
|
||||
const runId = await ctx.db.insert("agentRuns", {
|
||||
type: "audit_generation",
|
||||
leadId: args.leadId,
|
||||
...(args.auditId ? { auditId: args.auditId } : {}),
|
||||
status: "pending",
|
||||
currentStep: "audit_generation",
|
||||
counters: {
|
||||
leadsFound: 0,
|
||||
leadsCreated: 0,
|
||||
auditsCreated: 0,
|
||||
outreachPrepared: 0,
|
||||
errors: 0,
|
||||
},
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
|
||||
await ctx.db.insert("agentRunEvents", {
|
||||
runId,
|
||||
level: "info",
|
||||
message: "Audit-Generierung wurde in die Warteschlange gesetzt.",
|
||||
details: [
|
||||
{ label: "Lead", value: args.leadId },
|
||||
...(args.parentRunId
|
||||
? [{ label: "Parent-Run", value: args.parentRunId }]
|
||||
: []),
|
||||
],
|
||||
createdAt: now,
|
||||
});
|
||||
|
||||
await ctx.scheduler.runAfter(
|
||||
0,
|
||||
internal.auditGenerationAction.processAuditGeneration,
|
||||
{
|
||||
runId,
|
||||
},
|
||||
);
|
||||
|
||||
return runId;
|
||||
},
|
||||
});
|
||||
|
||||
export const startAuditGenerationRun = internalMutation({
|
||||
args: {
|
||||
runId: v.id("agentRuns"),
|
||||
},
|
||||
returns: v.union(
|
||||
v.object({
|
||||
lead: v.object({
|
||||
_id: v.id("leads"),
|
||||
websiteUrl: v.optional(v.string()),
|
||||
websiteDomain: v.optional(v.string()),
|
||||
contactStatus: v.union(
|
||||
v.literal("new"),
|
||||
v.literal("missing_contact"),
|
||||
v.literal("audit_ready"),
|
||||
v.literal("outreach_ready"),
|
||||
v.literal("contacted"),
|
||||
v.literal("replied"),
|
||||
v.literal("do_not_contact"),
|
||||
),
|
||||
}),
|
||||
auditId: v.optional(v.id("audits")),
|
||||
}),
|
||||
v.null(),
|
||||
),
|
||||
handler: async (ctx, args): Promise<
|
||||
{ lead: StartLeadSnapshot; auditId?: Id<"audits"> } | null
|
||||
> => {
|
||||
const now = Date.now();
|
||||
const run = await ctx.db.get(args.runId);
|
||||
|
||||
if (!run || run.type !== "audit_generation" || run.status !== "pending") {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!run.leadId) {
|
||||
await ctx.db.patch(args.runId, {
|
||||
status: "failed",
|
||||
currentStep: "audit_generation",
|
||||
errorSummary: "Der Lauf hat keine Lead-ID.",
|
||||
updatedAt: now,
|
||||
finishedAt: now,
|
||||
});
|
||||
|
||||
await ctx.db.insert("agentRunEvents", {
|
||||
runId: args.runId,
|
||||
level: "error",
|
||||
message:
|
||||
"Audit-Generierung konnte nicht gestartet werden: Keine Lead-ID.",
|
||||
details: [{ label: "Lead-ID", value: "unbekannt" }],
|
||||
createdAt: now,
|
||||
});
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
const lead = await ctx.db.get(run.leadId);
|
||||
if (!lead) {
|
||||
await ctx.db.patch(args.runId, {
|
||||
status: "failed",
|
||||
currentStep: "audit_generation",
|
||||
errorSummary: "Lead wurde nicht gefunden.",
|
||||
updatedAt: now,
|
||||
finishedAt: now,
|
||||
});
|
||||
|
||||
await ctx.db.insert("agentRunEvents", {
|
||||
runId: args.runId,
|
||||
level: "error",
|
||||
message:
|
||||
"Audit-Generierung konnte nicht gestartet werden: Kein Lead mit dieser ID.",
|
||||
details: [{ label: "Lead-ID", value: run.leadId }],
|
||||
createdAt: now,
|
||||
});
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
await ctx.db.patch(args.runId, {
|
||||
status: "running",
|
||||
currentStep: "audit_generation",
|
||||
startedAt: now,
|
||||
updatedAt: now,
|
||||
errorSummary: undefined,
|
||||
});
|
||||
|
||||
await ctx.db.insert("agentRunEvents", {
|
||||
runId: args.runId,
|
||||
level: "info",
|
||||
message: "Audit-Generierung gestartet.",
|
||||
details: [{ label: "Lead-ID", value: lead._id }],
|
||||
createdAt: now,
|
||||
});
|
||||
|
||||
return {
|
||||
lead: {
|
||||
_id: lead._id,
|
||||
websiteUrl: lead.websiteUrl,
|
||||
websiteDomain: lead.websiteDomain,
|
||||
contactStatus: lead.contactStatus,
|
||||
},
|
||||
...(run.auditId ? { auditId: run.auditId } : {}),
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
export const persistAuditGenerationResult = internalMutation({
|
||||
args: {
|
||||
leadId: v.id("leads"),
|
||||
auditId: v.optional(v.id("audits")),
|
||||
runId: v.id("agentRuns"),
|
||||
stage: auditGenerationStage,
|
||||
modelProfile: v.string(),
|
||||
modelId: v.string(),
|
||||
prompt: v.string(),
|
||||
systemPrompt: v.optional(v.string()),
|
||||
rawResponse: v.optional(v.string()),
|
||||
parsedJson: v.optional(auditGenerationParsedJson),
|
||||
usage: v.optional(auditGenerationUsage),
|
||||
finishReason: v.optional(v.string()),
|
||||
status: auditGenerationStatus,
|
||||
errorSummary: v.optional(v.string()),
|
||||
},
|
||||
handler: async (ctx, args) => {
|
||||
const now = Date.now();
|
||||
|
||||
return await ctx.db.insert("auditGenerations", {
|
||||
leadId: args.leadId,
|
||||
auditId: args.auditId,
|
||||
runId: args.runId,
|
||||
stage: args.stage,
|
||||
modelProfile: args.modelProfile,
|
||||
modelId: args.modelId,
|
||||
prompt: sanitizeAndCapString(args.prompt, MAX_PROMPT_BYTES) ?? "",
|
||||
...(args.systemPrompt
|
||||
? { systemPrompt: sanitizeAndCapString(args.systemPrompt, MAX_PROMPT_BYTES) }
|
||||
: {}),
|
||||
...(args.rawResponse
|
||||
? { rawResponse: sanitizeAndCapString(args.rawResponse, MAX_RAW_RESPONSE_BYTES) }
|
||||
: {}),
|
||||
...(args.parsedJson ? { parsedJson: sanitizeAndCapParsedJson(args.parsedJson) } : {}),
|
||||
...(args.usage ? { usage: args.usage } : {}),
|
||||
...(args.finishReason ? { finishReason: args.finishReason } : {}),
|
||||
status: args.status,
|
||||
...(args.errorSummary
|
||||
? { errorSummary: sanitizeAndCapString(args.errorSummary, MAX_RAW_RESPONSE_BYTES) }
|
||||
: {}),
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
export const finishAuditGenerationRun = internalMutation({
|
||||
args: {
|
||||
runId: v.id("agentRuns"),
|
||||
status: runStatus,
|
||||
currentStep: v.optional(v.string()),
|
||||
errorSummary: v.optional(v.string()),
|
||||
errors: v.optional(v.number()),
|
||||
},
|
||||
handler: async (ctx, args) => {
|
||||
const now = Date.now();
|
||||
|
||||
await ctx.db.patch(args.runId, {
|
||||
status: args.status,
|
||||
updatedAt: now,
|
||||
finishedAt: now,
|
||||
currentStep: args.currentStep ?? "audit_generation",
|
||||
errorSummary: args.errorSummary,
|
||||
counters: {
|
||||
leadsFound: 0,
|
||||
leadsCreated: 0,
|
||||
auditsCreated: 0,
|
||||
outreachPrepared: 0,
|
||||
errors: args.errors ?? 0,
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
Reference in New Issue
Block a user