Refactor pipeline task handling and UI flows

This commit is contained in:
2026-06-13 21:09:49 +02:00
parent 21c7e4c9a4
commit ff4c572157
24 changed files with 1346 additions and 236 deletions

View File

@@ -11,6 +11,7 @@
import type * as auditGeneration from "../auditGeneration.js";
import type * as auditGenerationAction from "../auditGenerationAction.js";
import type * as auditInputs from "../auditInputs.js";
import type * as auditWorkflow from "../auditWorkflow.js";
import type * as audits from "../audits.js";
import type * as blacklist from "../blacklist.js";
import type * as campaignMetrics from "../campaignMetrics.js";
@@ -42,6 +43,7 @@ declare const fullApi: ApiFromModules<{
auditGeneration: typeof auditGeneration;
auditGenerationAction: typeof auditGenerationAction;
auditInputs: typeof auditInputs;
auditWorkflow: typeof auditWorkflow;
audits: typeof audits;
blacklist: typeof blacklist;
campaignMetrics: typeof campaignMetrics;
@@ -92,4 +94,6 @@ export declare const internal: FilterApi<
export declare const components: {
betterAuth: import("../betterAuth/_generated/component.js").ComponentApi<"betterAuth">;
workflow: import("@convex-dev/workflow/_generated/component.js").ComponentApi<"workflow">;
auditWorkpool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"auditWorkpool">;
};

View File

@@ -350,6 +350,7 @@ export const queueLeadAuditGeneration = internalMutation({
leadId: v.id("leads"),
auditId: v.optional(v.id("audits")),
parentRunId: v.optional(v.id("agentRuns")),
scheduleAction: v.optional(v.boolean()),
},
returns: v.union(v.id("agentRuns"), v.null()),
handler: async (ctx, args): Promise<Id<"agentRuns"> | null> => {
@@ -418,13 +419,15 @@ export const queueLeadAuditGeneration = internalMutation({
createdAt: now,
});
await ctx.scheduler.runAfter(
0,
internal.auditGenerationAction.processAuditGeneration,
{
runId,
},
);
if (args.scheduleAction !== false) {
await ctx.scheduler.runAfter(
0,
internal.auditGenerationAction.processAuditGeneration,
{
runId,
},
);
}
return runId;
},
@@ -460,7 +463,11 @@ export const startAuditGenerationRun = internalMutation({
const now = Date.now();
const run = await ctx.db.get(args.runId);
if (!run || run.type !== "audit_generation" || run.status !== "pending") {
if (
!run ||
run.type !== "audit_generation" ||
(run.status !== "pending" && run.status !== "failed")
) {
return null;
}
@@ -511,6 +518,7 @@ export const startAuditGenerationRun = internalMutation({
status: "running",
currentStep: "audit_generation",
startedAt: now,
finishedAt: undefined,
updatedAt: now,
errorSummary: undefined,
});

View File

@@ -18,6 +18,8 @@ import {
qualityReviewSchema,
type AuditSpecialistFinding,
type AuditSpecialistResult,
type QualityReview,
type QualityReviewRevisedCopy,
} from "../lib/ai/schemas";
import {
validateCustomerFacingCopy,
@@ -32,6 +34,7 @@ import {
type ScreenshotOneRequest,
} from "../lib/external-audit-services";
import { type AuditUsedSkill } from "../lib/skills-registry";
import { getAuditProgressForStep } from "../lib/audits/progress";
import { internal } from "./_generated/api";
import type { Id } from "./_generated/dataModel";
import {
@@ -281,6 +284,44 @@ type GermanCopyOutput = {
};
};
function applyRevisedCopy(
currentCopy: GermanCopyOutput,
revisedCopy: QualityReviewRevisedCopy,
): GermanCopyOutput {
return {
...currentCopy,
publicSummary: revisedCopy.publicSummary,
publicBody: revisedCopy.publicBody,
emailSubject: revisedCopy.emailSubject,
emailBody: revisedCopy.emailBody,
phoneScript: {
openingLine: revisedCopy.phoneScript.openingLine,
callScript: revisedCopy.phoneScript.callScript,
closeLine: revisedCopy.phoneScript.closeLine,
},
followUpDraft: {
message: revisedCopy.followUpDraft.message,
...(revisedCopy.followUpDraft.followInDays !== null
? { followInDays: revisedCopy.followUpDraft.followInDays }
: {}),
...(revisedCopy.followUpDraft.goals !== null
? { goals: revisedCopy.followUpDraft.goals }
: {}),
},
};
}
function germanCopyGuardTelemetry(guardResult: GermanCopyGuardResult) {
return {
passed: guardResult.passed,
issues: guardResult.issues.map((issue) => ({
field: issue.field,
rule: issue.rule,
message: issue.message,
})),
};
}
type MultimodalContentPart =
| {
type: "text";
@@ -554,7 +595,11 @@ function buildQualityReviewPrompt(
`Öffentlicher Text: ${germanCopy.publicBody}`,
`Email-Betreff: ${germanCopy.emailSubject}`,
`Email-Text: ${germanCopy.emailBody}`,
"Antworte als JSON mit isValid, issues, suggestions, notes.",
"Wenn die Copy nur stilistische oder leichte fachliche Hinweise hat, nutze severity warning und rewriteRequired true.",
"Wenn eine Korrektur sinnvoll ist, liefere revisedCopy vollständig mit publicSummary, publicBody, emailSubject, emailBody, phoneScript und followUpDraft.",
"Wenn keine Korrektur nötig ist, setze rewriteRequired false und revisedCopy null.",
"Nutze severity unsafe nur für harte Risiken wie falsche Sprache, erfundene Behauptungen, aggressive Tonalität oder Rohdaten-Leaks.",
"Antworte als JSON mit isValid, severity, issues, suggestions, rewriteRequired, revisedCopy und notes.",
].join("\n");
}
@@ -630,6 +675,27 @@ async function appendRunEvent(
});
}
async function updateRootAuditProgress(
ctx: ActionCtx,
rootRunId: Id<"agentRuns"> | undefined,
currentStep: string,
) {
if (!rootRunId) {
return;
}
const progress = getAuditProgressForStep(currentStep);
await ctx.runMutation(internal.runs.updateProgressInternal, {
id: rootRunId,
status: "running",
currentStep,
progressStep: progress.step,
progressTotal: progress.total,
progressLabel: progress.label,
progressPercent: progress.percent,
});
}
async function loadAuditSkillRegistry(
ctx: ActionCtx,
runId: Id<"agentRuns">,
@@ -1204,6 +1270,7 @@ function getValidMediaType(mimeType: string) {
export const processAuditGeneration = internalAction({
args: {
runId: v.id("agentRuns"),
rootRunId: v.optional(v.id("agentRuns")),
},
handler: async (ctx, args) => {
let started:
@@ -1345,6 +1412,7 @@ export const processAuditGeneration = internalAction({
MAX_PROMPT_BYTES,
);
currentStep = "classification";
await updateRootAuditProgress(ctx, args.rootRunId, currentStep);
await persistAuditStage({
ctx,
runId: args.runId,
@@ -1545,6 +1613,7 @@ export const processAuditGeneration = internalAction({
const verifierSystemPrompt =
"Du bist EvidenceQA. Verifiziere Befunde streng gegen belegte Evidence-Refs.";
currentStep = "evidenceVerifier";
await updateRootAuditProgress(ctx, args.rootRunId, currentStep);
await persistAuditStage({
ctx,
@@ -1690,6 +1759,7 @@ export const processAuditGeneration = internalAction({
}
currentStep = "multimodalAudit";
await updateRootAuditProgress(ctx, args.rootRunId, currentStep);
const validScreenshotParts = screenshotParts.filter(
(part): part is MultimodalFilePart => part !== null,
@@ -1844,6 +1914,7 @@ export const processAuditGeneration = internalAction({
}
currentStep = "germanCopy";
await updateRootAuditProgress(ctx, args.rootRunId, currentStep);
// Stage 3: german copy generation
const germanSystemPrompt =
"Du bist fachlicher Texter für lokale Unternehmen im B2B-Kontext.";
@@ -1856,61 +1927,65 @@ export const processAuditGeneration = internalAction({
const safeGermanPrompt = sanitizeAndCapString(germanPrompt, MAX_PROMPT_BYTES);
try {
const publicSummaryResult = await generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: publicAuditTextSchema,
prompt: safeGermanPrompt
? `${safeGermanPrompt}\nAusgabe für publicSummary`
: "Ausgabe für publicSummary",
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
});
const germanBodyResult = await generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: publicAuditTextSchema,
prompt: `${safeGermanPrompt ?? ""}\nAusgabe für publicBody`,
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
});
const germanSubjectResult = await generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: emailSubjectSchema,
prompt: `${safeGermanPrompt ?? ""}\nAusgabe für emailSubject`,
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
});
const germanEmailResult = await generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: emailDraftSchema,
prompt: `${safeGermanPrompt ?? ""}\nAusgabe für emailBody`,
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
});
const germanCallScriptResult = await generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: callScriptSchema,
prompt: `${safeGermanPrompt ?? ""}\nAusgabe für callScript`,
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
});
const germanFollowUpResult = await generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: followUpDraftSchema,
prompt: `${safeGermanPrompt ?? ""}\nAusgabe für followUpDraft`,
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
});
const [
publicSummaryResult,
germanBodyResult,
germanSubjectResult,
germanEmailResult,
germanCallScriptResult,
germanFollowUpResult,
] = await Promise.all([
generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: publicAuditTextSchema,
prompt: safeGermanPrompt
? `${safeGermanPrompt}\nAusgabe für publicSummary`
: "Ausgabe für publicSummary",
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
}),
generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: publicAuditTextSchema,
prompt: `${safeGermanPrompt ?? ""}\nAusgabe für publicBody`,
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
}),
generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: emailSubjectSchema,
prompt: `${safeGermanPrompt ?? ""}\nAusgabe für emailSubject`,
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
}),
generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: emailDraftSchema,
prompt: `${safeGermanPrompt ?? ""}\nAusgabe für emailBody`,
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
}),
generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: callScriptSchema,
prompt: `${safeGermanPrompt ?? ""}\nAusgabe für callScript`,
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
}),
generateObject({
model: provider(germanCopyProfile.modelId),
system: germanSystemPrompt,
schema: followUpDraftSchema,
prompt: `${safeGermanPrompt ?? ""}\nAusgabe für followUpDraft`,
temperature: germanCopyProfile.temperature,
maxOutputTokens: germanCopyProfile.maxTokens,
}),
]);
const publicSummary = publicSummaryResult.object.publicText ?? "";
const publicBody = germanBodyResult.object.publicText ?? "";
@@ -2015,42 +2090,89 @@ export const processAuditGeneration = internalAction({
},
followUp: germanCopyOutput.followUpDraft.message,
});
const deterministicGuard = germanCopyGuardTelemetry(guardResult);
// Stage 4: final quality review
const qualityPrompt = buildQualityReviewPrompt(
let qualityPrompt = buildQualityReviewPrompt(
verifiedFindingsText,
germanCopyOutput,
);
const safeQualityPrompt = sanitizeAndCapString(qualityPrompt, MAX_PROMPT_BYTES);
let safeQualityPrompt = sanitizeAndCapString(qualityPrompt, MAX_PROMPT_BYTES);
const qualitySystemPrompt =
"Du prüfst die erzeugten Inhalte als Qualitätssicherung.";
currentStep = "qualityReview";
await updateRootAuditProgress(ctx, args.rootRunId, currentStep);
try {
const qualityResult = await generateObject({
model: provider(qualityReviewProfile.modelId),
system: qualitySystemPrompt,
schema: qualityReviewSchema,
prompt: safeQualityPrompt ?? "",
temperature: qualityReviewProfile.temperature,
maxOutputTokens: qualityReviewProfile.maxTokens,
});
let finalQualityReview: QualityReview | null = null;
let qualityFinishReason: string | undefined;
let rewriteApplied = false;
let copyReviewAttempts = 0;
const qualityReviewUsages: Array<OpenRouterUsage | undefined> = [];
qualityPassed = qualityResult.object.isValid && guardResult.passed;
while (copyReviewAttempts < 2) {
copyReviewAttempts += 1;
const qualityResult = await generateObject({
model: provider(qualityReviewProfile.modelId),
system: qualitySystemPrompt,
schema: qualityReviewSchema,
prompt: safeQualityPrompt ?? "",
temperature: qualityReviewProfile.temperature,
maxOutputTokens: qualityReviewProfile.maxTokens,
});
finalQualityReview = qualityResult.object;
qualityFinishReason = qualityResult.finishReason;
qualityReviewUsages.push(qualityResult.usage);
if (
copyReviewAttempts === 1 &&
qualityResult.object.rewriteRequired &&
qualityResult.object.revisedCopy
) {
germanCopyOutput = applyRevisedCopy(
germanCopyOutput,
qualityResult.object.revisedCopy,
);
rewriteApplied = true;
await appendRunEvent(ctx, {
runId: args.runId,
level: "warning",
message: "Copy-Review hat korrigiert.",
details: qualityResult.object.issues.slice(0, 4).map((issue) => ({
label: "Hinweis",
value: issue,
})),
});
qualityPrompt = buildQualityReviewPrompt(
verifiedFindingsText,
germanCopyOutput,
);
safeQualityPrompt = sanitizeAndCapString(
qualityPrompt,
MAX_PROMPT_BYTES,
);
continue;
}
break;
}
if (!finalQualityReview) {
throw new Error("Copy-Review konnte nicht ausgewertet werden.");
}
qualityPassed =
finalQualityReview.isValid && finalQualityReview.severity === "ok";
const qualityPayload = {
isValid: qualityResult.object.isValid && guardResult.passed,
issues: [
...qualityResult.object.issues,
...guardResult.issues.map(
(issue) => `${issue.field}: ${issue.message}`,
),
],
suggestions: qualityResult.object.suggestions,
notes: qualityResult.object.notes ?? [],
...finalQualityReview,
rewriteApplied,
reviewAttempts: copyReviewAttempts,
deterministicGuard,
finalDecision: qualityPassed ? "approved" : "stored_with_warnings",
};
const qualityErrorSummary =
"Qualitätsprüfung hat Inhalte als ungenügend markiert.";
await persistAuditStage({
ctx,
@@ -2067,43 +2189,26 @@ export const processAuditGeneration = internalAction({
MAX_RAW_RESPONSE_BYTES,
),
parsedJson: sanitizeAndCapParsedJson(qualityPayload),
...withStageUsage(qualityResult.usage),
status: qualityPassed ? "succeeded" : "failed",
finishReason: qualityResult.finishReason,
...(!qualityPassed ? { errorSummary: qualityErrorSummary } : {}),
...withStageUsage(aggregateOpenRouterUsage(qualityReviewUsages)),
status: "succeeded",
...(qualityFinishReason ? { finishReason: qualityFinishReason } : {}),
});
await recordOpenRouterUsage(ctx, {
runId: args.runId,
leadId: started.lead._id,
...(auditId ? { auditId } : {}),
usage: qualityResult.usage,
usage: aggregateOpenRouterUsage(qualityReviewUsages),
});
if (!qualityPassed) {
const message =
"Qualitätsprüfung und German-Copy-Guard haben nicht bestanden.";
if (!qualityPassed || !guardResult.passed) {
await appendRunEvent(ctx, {
runId: args.runId,
level: "warning",
message,
});
await ctx.runMutation(internal.auditGeneration.finishAuditGenerationRun, {
runId: args.runId,
status: "failed",
currentStep: "qualityReview",
errors: errors + 1,
errorSummary: message,
});
return null;
}
if (!qualityResult.object.isValid) {
await appendRunEvent(ctx, {
runId: args.runId,
level: "warning",
message:
"Qualitätsprüfung hat Review-Hinweise gemeldet; German-Copy-Guard bestanden.",
details: qualityResult.object.issues.slice(0, 4).map((issue) => ({
message: "Copy-Review mit Hinweisen abgeschlossen.",
details: [
...finalQualityReview.issues,
...guardResult.issues.map((issue) => issue.message),
].slice(0, 4).map((issue) => ({
label: "Hinweis",
value: issue,
})),
@@ -2288,3 +2393,25 @@ export const processAuditGeneration = internalAction({
}
},
});
export const processAuditGenerationForWorkflow = internalAction({
args: {
runId: v.id("agentRuns"),
rootRunId: v.id("agentRuns"),
},
handler: async (ctx, args): Promise<Id<"agentRuns">> => {
const result = await ctx.runAction(
internal.auditGenerationAction.processAuditGeneration,
{
runId: args.runId,
rootRunId: args.rootRunId,
},
);
if (!result) {
throw new Error("Audit-Generierung konnte nicht abgeschlossen werden.");
}
return args.runId;
},
});

238
convex/auditWorkflow.ts Normal file
View File

@@ -0,0 +1,238 @@
import { WorkflowManager, type WorkflowId } from "@convex-dev/workflow";
import { v } from "convex/values";
import { components, internal } from "./_generated/api";
import type { Id } from "./_generated/dataModel";
import { internalMutation } from "./_generated/server";
import { getAuditProgressForStep } from "../lib/audits/progress";
const MAX_ATTEMPTS = 3;
export const workflow = new WorkflowManager(components.workflow, {
workpoolOptions: {
maxParallelism: 3,
retryActionsByDefault: true,
defaultRetryBehavior: {
maxAttempts: 3,
initialBackoffMs: 1000,
base: 2,
},
},
});
function progressPatch(runId: Id<"agentRuns">, currentStep: string) {
const progress = getAuditProgressForStep(currentStep);
return {
id: runId,
currentStep,
progressStep: progress.step,
progressTotal: progress.total,
progressLabel: progress.label,
progressPercent: progress.percent,
};
}
function errorMessage(error: unknown) {
return error instanceof Error ? error.message : String(error);
}
export const runLeadAuditWorkflow = workflow
.define({
args: {
runId: v.id("agentRuns"),
},
})
.handler(async (step, args): Promise<Id<"agentRuns">> => {
try {
await step.runMutation(
internal.runs.updateProgressInternal,
{
...progressPatch(args.runId, "audit_prepared"),
status: "running",
maxAttempts: MAX_ATTEMPTS,
},
{ name: "1/6 Audit vorbereitet" },
);
const [, pageSpeedResult] = await Promise.all([
step.runMutation(
internal.runs.updateProgressInternal,
{
...progressPatch(args.runId, "pagespeed_insights"),
status: "running",
},
{ name: "2/6 Messe PageSpeed" },
),
step.runAction(
internal.pageSpeedAction.processPageSpeedAuditForWorkflow,
{ runId: args.runId },
{ name: "PageSpeed mobile/desktop", retry: true },
),
]);
if (!pageSpeedResult) {
throw new Error("PageSpeed-Analyse konnte nicht abgeschlossen werden.");
}
const auditRun = await step.runQuery(
internal.runs.getAuditRunForWorkflowInternal,
{ id: args.runId },
{ name: "Audit-Run laden" },
);
if (!auditRun?.leadId) {
throw new Error("Audit-Run hat keine Lead-ID.");
}
if (auditRun.status === "failed" || auditRun.status === "canceled") {
throw new Error("PageSpeed-Analyse ist final fehlgeschlagen.");
}
await step.runMutation(
internal.runs.updateProgressInternal,
{
...progressPatch(args.runId, "website_signals"),
status: "running",
},
{ name: "3/6 Sammle Website-Signale" },
);
const generationRunId = await step.runMutation(
internal.auditGeneration.queueLeadAuditGeneration,
{
leadId: auditRun.leadId,
...(auditRun.auditId ? { auditId: auditRun.auditId } : {}),
parentRunId: args.runId,
scheduleAction: false,
},
{ name: "Audit-Generierung vorbereiten" },
);
if (!generationRunId) {
throw new Error("Audit-Generierung konnte nicht angelegt werden.");
}
await step.runMutation(
internal.runs.updateProgressInternal,
{
...progressPatch(args.runId, "classification"),
status: "running",
},
{ name: "4/6 Bewerte Befunde" },
);
const generationResult = await step.runAction(
internal.auditGenerationAction.processAuditGenerationForWorkflow,
{ runId: generationRunId, rootRunId: args.runId },
{ name: "Specialists und German Copy", retry: true },
);
if (!generationResult) {
throw new Error("Audit-Generierung konnte nicht abgeschlossen werden.");
}
await step.runMutation(
internal.runs.updateProgressInternal,
{
...progressPatch(args.runId, "qualityReview"),
status: "succeeded",
},
{ name: "6/6 Speichere Audit" },
);
return args.runId;
} catch (error) {
const message = errorMessage(error);
await step.runMutation(
internal.runs.updateProgressInternal,
{
id: args.runId,
status: "failed",
errorSummary: "Audit konnte nach automatischen Versuchen nicht abgeschlossen werden.",
lastRetryReason: message,
},
{ name: "Audit final fehlgeschlagen", unstableArgs: true },
);
throw error;
}
});
export const startLeadAuditWorkflow = internalMutation({
args: {
runId: v.id("agentRuns"),
},
handler: async (ctx, args): Promise<WorkflowId> => {
const workflowId: WorkflowId = await workflow.start(
ctx,
internal.auditWorkflow.runLeadAuditWorkflow,
{ runId: args.runId },
{ startAsync: true },
);
await ctx.db.patch(args.runId, {
workflowId: String(workflowId),
attempt: 1,
maxAttempts: MAX_ATTEMPTS,
updatedAt: Date.now(),
});
return workflowId;
},
});
export const restartAuditWorkflow = internalMutation({
args: {
runId: v.id("agentRuns"),
},
handler: async (ctx, args): Promise<string> => {
const run = await ctx.db.get(args.runId);
if (!run || run.type !== "audit") {
throw new Error("Audit-Run wurde nicht gefunden.");
}
const nextAttempt = (run.attempt ?? 1) + 1;
const maxAttempts = run.maxAttempts ?? MAX_ATTEMPTS;
if (nextAttempt > maxAttempts) {
throw new Error("Maximale Anzahl an Audit-Versuchen erreicht.");
}
await ctx.db.patch(args.runId, {
status: "pending",
currentStep: "pagespeed_insights",
errorSummary: undefined,
lastRetryReason:
"Provider war kurz nicht erreichbar, ich versuche es erneut",
attempt: nextAttempt,
maxAttempts,
progressStep: 1,
progressTotal: 6,
progressLabel: "Audit vorbereitet",
progressPercent: 17,
startedAt: undefined,
finishedAt: undefined,
updatedAt: Date.now(),
});
if (run.workflowId) {
await workflow.restart(ctx, run.workflowId as WorkflowId, {
from: 0,
startAsync: true,
});
return run.workflowId;
}
const workflowId: WorkflowId = await workflow.start(
ctx,
internal.auditWorkflow.runLeadAuditWorkflow,
{ runId: args.runId },
{ startAsync: true },
);
await ctx.db.patch(args.runId, {
workflowId: String(workflowId),
updatedAt: Date.now(),
});
return String(workflowId);
},
});

View File

@@ -1,9 +1,11 @@
import { v } from "convex/values";
import { normalizeListLimit } from "./domain";
import { internal } from "./_generated/api";
import { internalMutation, mutation, query } from "./_generated/server";
import type { Doc, Id } from "./_generated/dataModel";
import type { MutationCtx, QueryCtx } from "./_generated/server";
import { getAuditProgressForStep } from "../lib/audits/progress";
export const AUDIT_REVIEW_NOTICE_AFTER_MS = 30 * 24 * 60 * 60 * 1000;
const DETAIL_EVIDENCE_LIMIT = 50;
@@ -63,12 +65,27 @@ type AuditDashboardRow =
id: Id<"agentRuns">;
runId: Id<"agentRuns">;
leadId: Id<"leads"> | null;
runType: Doc<"agentRuns">["type"];
title: string;
checkedDomain: string;
status: Doc<"agentRuns">["status"];
latestStage: string;
stageStatus: Doc<"agentRuns">["status"];
errorSummary: string | null;
progress: {
step: number;
total: number;
label: string;
percent: number;
};
retry: {
attempt: number;
maxAttempts: number;
isRetrying: boolean;
lastRetryReason: string | null;
canRetry: boolean;
};
canRetry: boolean;
pageCount: number;
checkedPages: string[];
createdAt: number;
@@ -104,6 +121,38 @@ const latestGenerationStage = (stages: Doc<"auditGenerations">[]) => {
return [...stages].sort((a, b) => b.updatedAt - a.updatedAt)[0] ?? null;
};
const progressForRun = (
run: Doc<"agentRuns">,
latestStage: Doc<"auditGenerations"> | null,
) => {
const fallback = getAuditProgressForStep(latestStage?.stage ?? run.currentStep);
return {
step: run.progressStep ?? fallback.step,
total: run.progressTotal ?? fallback.total,
label: run.progressLabel ?? fallback.label,
percent: run.progressPercent ?? fallback.percent,
};
};
const retryForRun = (run: Doc<"agentRuns">) => {
const attempt = run.attempt ?? 1;
const maxAttempts = run.maxAttempts ?? 3;
const canRetry =
run.type === "audit" &&
(run.status === "failed" || run.status === "canceled") &&
attempt < maxAttempts;
return {
attempt,
maxAttempts,
isRetrying:
(run.status === "pending" || run.status === "running") && attempt > 1,
lastRetryReason: run.lastRetryReason ?? null,
canRetry,
};
};
const normalizeComparableAuditUrl = (value: string | null | undefined) => {
const trimmed = value?.trim();
if (!trimmed) {
@@ -727,6 +776,31 @@ export const list = query({
},
});
export const retryAuditRun = mutation({
args: {
runId: v.id("agentRuns"),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const run = await ctx.db.get(args.runId);
if (!run || run.type !== "audit") {
throw new Error("Audit-Run wurde nicht gefunden.");
}
const status = run.status;
if (status !== "failed" && status !== "canceled") {
throw new Error("Nur final fehlgeschlagene Audits können neu gestartet werden.");
}
await ctx.scheduler.runAfter(0, internal.auditWorkflow.restartAuditWorkflow, {
runId: args.runId,
});
return { runId: args.runId };
},
});
export const listDashboardRows = query({
args: {
limit: v.optional(v.number()),
@@ -771,29 +845,50 @@ export const listDashboardRows = query({
}
}
const rootAuditRuns = await ctx.db
.query("agentRuns")
.withIndex("by_type", (q) => q.eq("type", "audit"))
.order("desc")
.take(limit);
const rootAuditRunLeadIds = new Set(
rootAuditRuns
.map((run) => run.leadId)
.filter((leadId): leadId is Id<"leads"> => leadId !== undefined),
);
const generationRuns = await ctx.db
.query("agentRuns")
.withIndex("by_type", (q) => q.eq("type", "audit_generation"))
.order("desc")
.take(limit);
for (const run of generationRuns) {
for (const run of [...rootAuditRuns, ...generationRuns]) {
if (!run.leadId) {
continue;
}
if (
run.type === "audit_generation" &&
rootAuditRunLeadIds.has(run.leadId)
) {
continue;
}
const directFinalAudit = run.auditId ? await ctx.db.get(run.auditId) : null;
const leadFinalAudits = await ctx.db
.query("audits")
.withIndex("by_leadId", (q) => q.eq("leadId", run.leadId as Id<"leads">))
.take(1);
const shouldHideBehindFinalAudit =
run.status === "succeeded" || run.type === "audit_generation";
if (
finalAuditRunIds.has(run._id) ||
(run.auditId && finalAuditIds.has(run.auditId)) ||
directFinalAudit ||
finalAuditLeadIds.has(run.leadId) ||
leadFinalAudits.length > 0
(shouldHideBehindFinalAudit && finalAuditRunIds.has(run._id)) ||
(shouldHideBehindFinalAudit && run.auditId && finalAuditIds.has(run.auditId)) ||
(shouldHideBehindFinalAudit && directFinalAudit) ||
(shouldHideBehindFinalAudit && finalAuditLeadIds.has(run.leadId)) ||
(shouldHideBehindFinalAudit && leadFinalAudits.length > 0)
) {
continue;
}
@@ -806,18 +901,24 @@ export const listDashboardRows = query({
const latestStage = latestGenerationStage(stages);
const lead = await ctx.db.get(run.leadId);
const checkedDomain = domainFromLead(lead);
const progress = progressForRun(run, latestStage);
const retry = retryForRun(run);
rows.push({
kind: "generation",
id: run._id,
runId: run._id,
leadId: run.leadId,
runType: run.type,
title: lead?.companyName ?? checkedDomain,
checkedDomain,
status: run.status,
latestStage: latestStage?.stage ?? run.currentStep ?? "audit_generation",
stageStatus: latestStage?.status ?? run.status,
errorSummary: run.errorSummary ?? latestStage?.errorSummary ?? null,
progress,
retry,
canRetry: retry.canRetry,
pageCount: 0,
checkedPages: [],
createdAt: run.createdAt,

View File

@@ -1,9 +1,13 @@
import { defineApp } from "convex/server";
import workflow from "@convex-dev/workflow/convex.config";
import auditWorkpool from "@convex-dev/workpool/convex.config";
import betterAuth from "./betterAuth/convex.config";
const app = defineApp();
app.use(betterAuth);
app.use(workflow);
app.use(auditWorkpool, { name: "auditWorkpool" });
export default app;

View File

@@ -190,6 +190,13 @@ async function queueLeadPageSpeedAuditForLead(
leadId: args.leadId,
status: "pending",
currentStep: "pagespeed_insights",
workflowId: undefined,
attempt: 1,
maxAttempts: 3,
progressStep: 1,
progressTotal: 6,
progressLabel: "Audit vorbereitet",
progressPercent: 17,
counters: PAGE_SPEED_COUNTER_TEMPLATE,
createdAt: now,
updatedAt: now,
@@ -211,7 +218,7 @@ async function queueLeadPageSpeedAuditForLead(
await ctx.scheduler.runAfter(
0,
internal.pageSpeedAction.processPageSpeedAudit,
internal.auditWorkflow.startLeadAuditWorkflow,
{
runId,
},
@@ -328,7 +335,11 @@ export const startPageSpeedAuditRun = internalMutation({
return null;
}
if (run.status !== "pending") {
if (
run.status !== "pending" &&
run.status !== "failed" &&
run.status !== "running"
) {
return null;
}
@@ -400,6 +411,7 @@ export const startPageSpeedAuditRun = internalMutation({
status: "running",
currentStep: "pagespeed_insights",
startedAt: now,
finishedAt: undefined,
updatedAt: now,
errorSummary: undefined,
});

View File

@@ -143,6 +143,7 @@ async function queueAuditGenerationAfterPageSpeed(
export const processPageSpeedAudit = internalAction({
args: {
runId: v.id("agentRuns"),
queueGeneration: v.optional(v.boolean()),
},
handler: async (ctx, args) => {
const apiKeyRaw = process.env.PAGESPEED_API_KEY?.trim();
@@ -185,19 +186,82 @@ export const processPageSpeedAudit = internalAction({
let succeededStrategies = 0;
try {
for (const strategy of STRATEGIES) {
const fetchedAt = Date.now();
try {
const raw = await fetchPageSpeedResult({
url: sourceUrl,
strategy,
apiKey,
timeoutMs,
});
const rawJson = JSON.stringify(raw) ?? "null";
const rawJsonBytes = new TextEncoder().encode(rawJson).byteLength;
if (rawJsonBytes > MAX_RAW_PAGESPEED_BYTES) {
failedStrategies += 1;
const strategyResults = await Promise.all(
STRATEGIES.map(async (strategy) => {
const fetchedAt = Date.now();
try {
const raw = await fetchPageSpeedResult({
url: sourceUrl,
strategy,
apiKey,
timeoutMs,
});
const rawJson = JSON.stringify(raw) ?? "null";
const rawJsonBytes = new TextEncoder().encode(rawJson).byteLength;
if (rawJsonBytes > MAX_RAW_PAGESPEED_BYTES) {
await ctx.runMutation(internal.pageSpeed.persistPageSpeedResult, {
leadId: started.lead._id,
...(started.auditId ? { auditId: started.auditId } : {}),
runId: args.runId,
strategy,
status: "failed",
sourceUrl,
errorType: "api_error",
errorSummary: RAW_PAGESPEED_BYTES_SUMMARY,
fetchedAt,
});
await ctx.runMutation(internal.runs.appendEventInternal, {
runId: args.runId,
level: "warning",
message: `PageSpeed-Analyse für ${strategy} fehlgeschlagen.`,
details: [
{ label: "Strategie", value: strategy },
{
label: "Fehler",
value: RAW_PAGESPEED_BYTES_SUMMARY,
},
],
});
return "failed" as const;
}
const rawStorageId = await ctx.storage.store(
new Blob([rawJson], { type: "application/json" }),
);
const normalized = normalizePageSpeedResult({
strategy,
sourceUrl,
raw,
});
await ctx.runMutation(internal.pageSpeed.persistPageSpeedResult, {
leadId: started.lead._id,
...(started.auditId ? { auditId: started.auditId } : {}),
runId: args.runId,
strategy,
status: "succeeded",
sourceUrl,
finalUrl: normalized.finalUrl,
rawStorageId,
fetchedAt,
normalized: toPersistedPageSpeedNormalizedResult(normalized),
});
await ctx.runMutation(internal.runs.appendEventInternal, {
runId: args.runId,
level: "info",
message: `PageSpeed-Analyse für ${strategy} abgeschlossen.`,
details: [{ label: "Strategie", value: strategy }],
});
return "succeeded" as const;
} catch (error) {
const { errorType, errorSummary } = classifyPageSpeedFailure(
error,
apiKeyRaw,
);
await ctx.runMutation(internal.pageSpeed.persistPageSpeedResult, {
leadId: started.lead._id,
...(started.auditId ? { auditId: started.auditId } : {}),
@@ -205,8 +269,8 @@ export const processPageSpeedAudit = internalAction({
strategy,
status: "failed",
sourceUrl,
errorType: "api_error",
errorSummary: RAW_PAGESPEED_BYTES_SUMMARY,
errorType,
errorSummary,
fetchedAt,
});
@@ -216,75 +280,18 @@ export const processPageSpeedAudit = internalAction({
message: `PageSpeed-Analyse für ${strategy} fehlgeschlagen.`,
details: [
{ label: "Strategie", value: strategy },
{
label: "Fehler",
value: RAW_PAGESPEED_BYTES_SUMMARY,
},
{ label: "Fehler", value: errorSummary },
],
});
continue;
return "failed" as const;
}
}),
);
const rawStorageId = await ctx.storage.store(
new Blob([rawJson], { type: "application/json" }),
);
const normalized = normalizePageSpeedResult({
strategy,
sourceUrl,
raw,
});
await ctx.runMutation(internal.pageSpeed.persistPageSpeedResult, {
leadId: started.lead._id,
...(started.auditId ? { auditId: started.auditId } : {}),
runId: args.runId,
strategy,
status: "succeeded",
sourceUrl,
finalUrl: normalized.finalUrl,
rawStorageId,
fetchedAt,
normalized: toPersistedPageSpeedNormalizedResult(normalized),
});
await ctx.runMutation(internal.runs.appendEventInternal, {
runId: args.runId,
level: "info",
message: `PageSpeed-Analyse für ${strategy} abgeschlossen.`,
details: [{ label: "Strategie", value: strategy }],
});
succeededStrategies += 1;
} catch (error) {
const { errorType, errorSummary } = classifyPageSpeedFailure(
error,
apiKeyRaw,
);
failedStrategies += 1;
await ctx.runMutation(internal.pageSpeed.persistPageSpeedResult, {
leadId: started.lead._id,
...(started.auditId ? { auditId: started.auditId } : {}),
runId: args.runId,
strategy,
status: "failed",
sourceUrl,
errorType,
errorSummary,
fetchedAt,
});
await ctx.runMutation(internal.runs.appendEventInternal, {
runId: args.runId,
level: "warning",
message: `PageSpeed-Analyse für ${strategy} fehlgeschlagen.`,
details: [
{ label: "Strategie", value: strategy },
{ label: "Fehler", value: errorSummary },
],
});
}
}
succeededStrategies = strategyResults.filter(
(result) => result === "succeeded",
).length;
failedStrategies = strategyResults.length - succeededStrategies;
const status = succeededStrategies > 0 ? "succeeded" : "failed";
const errors = failedStrategies;
@@ -298,7 +305,9 @@ export const processPageSpeedAudit = internalAction({
: undefined,
});
await queueAuditGenerationAfterPageSpeed(ctx, args.runId, started);
if (args.queueGeneration !== false) {
await queueAuditGenerationAfterPageSpeed(ctx, args.runId, started);
}
return args.runId;
} catch (error) {
@@ -316,8 +325,34 @@ export const processPageSpeedAudit = internalAction({
message: "PageSpeed-Analyse fehlgeschlagen.",
details: [{ label: "Fehler", value: errorSummary, source: "pagespeed_action" }],
});
await queueAuditGenerationAfterPageSpeed(ctx, args.runId, started);
if (args.queueGeneration !== false) {
await queueAuditGenerationAfterPageSpeed(ctx, args.runId, started);
}
return null;
}
},
});
export const processPageSpeedAuditForWorkflow = internalAction({
args: {
runId: v.id("agentRuns"),
},
handler: async (ctx, args): Promise<Id<"agentRuns">> => {
const result = await ctx.runAction(
internal.pageSpeedAction.processPageSpeedAudit,
{
runId: args.runId,
queueGeneration: false,
},
);
const run = await ctx.runQuery(internal.runs.getAuditRunForWorkflowInternal, {
id: args.runId,
});
if (!result || run?.status === "failed" || run?.status === "canceled") {
throw new Error("PageSpeed-Analyse konnte nicht abgeschlossen werden.");
}
return args.runId;
},
});

View File

@@ -7,7 +7,7 @@ import {
normalizeListLimit,
} from "./domain";
import type { Id } from "./_generated/dataModel";
import { internalMutation, mutation, query } from "./_generated/server";
import { internalMutation, internalQuery, mutation, query } from "./_generated/server";
import type { MutationCtx, QueryCtx } from "./_generated/server";
const runType = v.union(...RUN_TYPES.map((type) => v.literal(type)));
@@ -127,6 +127,112 @@ export const updateStatus = mutation({
},
});
export const updateProgressInternal = internalMutation({
args: {
id: v.id("agentRuns"),
status: v.optional(runStatus),
currentStep: v.optional(v.string()),
errorSummary: v.optional(v.string()),
workflowId: v.optional(v.string()),
attempt: v.optional(v.number()),
maxAttempts: v.optional(v.number()),
progressStep: v.optional(v.number()),
progressTotal: v.optional(v.number()),
progressLabel: v.optional(v.string()),
progressPercent: v.optional(v.number()),
lastRetryReason: v.optional(v.string()),
},
handler: async (ctx, args) => {
const now = Date.now();
const patch: {
status?: (typeof RUN_STATUSES)[number];
updatedAt: number;
currentStep?: string;
errorSummary?: string;
workflowId?: string;
attempt?: number;
maxAttempts?: number;
progressStep?: number;
progressTotal?: number;
progressLabel?: string;
progressPercent?: number;
lastRetryReason?: string;
startedAt?: number;
finishedAt?: number;
} = {
updatedAt: now,
};
if (args.status !== undefined) {
patch.status = args.status;
if (args.status === "running") {
patch.startedAt = now;
patch.finishedAt = undefined;
}
if (
args.status === "succeeded" ||
args.status === "failed" ||
args.status === "canceled"
) {
patch.finishedAt = now;
}
}
if (args.currentStep !== undefined) {
patch.currentStep = args.currentStep;
}
if (args.errorSummary !== undefined) {
patch.errorSummary = args.errorSummary;
}
if (args.workflowId !== undefined) {
patch.workflowId = args.workflowId;
}
if (args.attempt !== undefined) {
patch.attempt = args.attempt;
}
if (args.maxAttempts !== undefined) {
patch.maxAttempts = args.maxAttempts;
}
if (args.progressStep !== undefined) {
patch.progressStep = args.progressStep;
}
if (args.progressTotal !== undefined) {
patch.progressTotal = args.progressTotal;
}
if (args.progressLabel !== undefined) {
patch.progressLabel = args.progressLabel;
}
if (args.progressPercent !== undefined) {
patch.progressPercent = args.progressPercent;
}
if (args.lastRetryReason !== undefined) {
patch.lastRetryReason = args.lastRetryReason;
}
await ctx.db.patch(args.id, patch);
return args.id;
},
});
export const getAuditRunForWorkflowInternal = internalQuery({
args: {
id: v.id("agentRuns"),
},
handler: async (ctx, args) => {
const run = await ctx.db.get(args.id);
if (!run || run.type !== "audit") {
return null;
}
return {
_id: run._id,
leadId: run.leadId ?? null,
auditId: run.auditId ?? null,
status: run.status,
currentStep: run.currentStep ?? null,
};
},
});
export const list = query({
args: {
status: v.optional(runStatus),

View File

@@ -642,6 +642,14 @@ export default defineSchema({
finishedAt: v.optional(v.number()),
currentStep: v.optional(v.string()),
errorSummary: v.optional(v.string()),
workflowId: v.optional(v.string()),
attempt: v.optional(v.number()),
maxAttempts: v.optional(v.number()),
progressStep: v.optional(v.number()),
progressTotal: v.optional(v.number()),
progressLabel: v.optional(v.string()),
progressPercent: v.optional(v.number()),
lastRetryReason: v.optional(v.string()),
counters: v.optional(
v.object({
leadsFound: v.number(),