diff --git a/backlog/tasks/task-54 - Orchestrate-audits-with-Convex-Workflow-and-Workpool.md b/backlog/tasks/task-54 - Orchestrate-audits-with-Convex-Workflow-and-Workpool.md
new file mode 100644
index 0000000..877c38d
--- /dev/null
+++ b/backlog/tasks/task-54 - Orchestrate-audits-with-Convex-Workflow-and-Workpool.md
@@ -0,0 +1,53 @@
+---
+id: TASK-54
+title: Orchestrate audits with Convex Workflow and Workpool
+status: In Progress
+assignee: []
+created_date: '2026-06-12 19:45'
+updated_date: '2026-06-13 05:56'
+labels: []
+dependencies: []
+priority: high
+ordinal: 56000
+---
+
+## Description
+
+
+Replace the fragile audit scheduler chain with Convex Workflow/Workpool orchestration while keeping agentRuns as the visible product state for progress, retries, dashboard cards, and manual retry.
+
+
+## Acceptance Criteria
+
+- [x] #1 Audit starts create a visible agentRuns-backed card immediately in the audit dashboard
+- [x] #2 Convex Workflow and Workpool dependencies and components are registered
+- [x] #3 Audit progress exposes step, total, label, and percent in dashboard rows
+- [x] #4 Retry behavior is tracked on agentRuns and user-facing errors are hidden until final failure
+- [x] #5 Audit dashboard supports a manual retry action for final failed runs
+- [x] #6 Existing audit and outreach persistence remains compatible
+
+
+## Implementation Plan
+
+
+1. Add failing source/UI tests for workflow registration, immediate audit dashboard rows, progress mapping, retry controls, and retry orchestration.
+2. Add Convex Workflow/Workpool dependencies and register components.
+3. Add agentRun orchestration/progress fields and helper mappings.
+4. Start audits through Workflow while preserving existing agentRuns as product state.
+5. Surface active audit runs, progress, retry state, and manual retry in the audit dashboard.
+6. Adjust quality-review behavior and run verification.
+
+
+## Implementation Notes
+
+
+Implemented Workflow/Workpool orchestration for root audit runs, progress mapping, dashboard retry UI, parallel PageSpeed strategies, and parallel German copy calls. Ran npx convex codegen and pnpm test successfully (416/416). Task remains In Progress pending user confirmation.
+
+Added workflow-specific wrapper actions so Workpool retries throw on failed PageSpeed/audit-generation steps, while legacy actions remain compatible. Re-ran pnpm test successfully (416/416) after codegen.
+
+User reported runtime regression: root audit workflow fails during 2/6 PageSpeed and jumps to 6/6 with PageSpeed-Analyse konnte nicht abgeschlossen werden. Investigating with systematic debugging before patching.
+
+Fixed runtime regression reported after Workflow migration: the workflow marked root audit runs as running before PageSpeed, while startPageSpeedAuditRun rejected running runs and returned null before any PageSpeed work began. startPageSpeedAuditRun now accepts running root audit runs. The final workflow failure path now preserves the current failing progress step instead of forcing qualityReview/6 of 6. Added regression coverage in tests/audit-workflow-source.test.ts. Verified with pnpm test (418/418), git diff --check, and npx convex dev --once against the dev deployment.
+
+Implemented LLM copy review replacement for the hard German-Copy-Guard gate. qualityReview now supports severity, rewriteRequired, revisedCopy, one automatic rewrite attempt, deterministic guard telemetry, and warning-only copy feedback. Audit/outreach persistence proceeds after copy review warnings; only technical/schema/provider/persistence failures remain fatal. Also deduped audit dashboard rows so child audit_generation runs are hidden behind visible root audit runs for the same lead. Verified with focused tests, pnpm test (420/420), git diff --check, and npx convex dev --once against the dev deployment.
+
diff --git a/components/audits/audits-board.tsx b/components/audits/audits-board.tsx
index be4abee..00a8b92 100644
--- a/components/audits/audits-board.tsx
+++ b/components/audits/audits-board.tsx
@@ -2,12 +2,13 @@
import { useMemo, useState } from "react";
-import { useQuery } from "convex/react";
+import { useMutation, useQuery } from "convex/react";
import { FunctionReturnType } from "convex/server";
-import { Activity, Files, FileSearch, SquarePen } from "lucide-react";
+import { Activity, Files, FileSearch, RotateCcw, SquarePen } from "lucide-react";
import Link from "next/link";
import { api } from "@/convex/_generated/api";
+import type { Id } from "@/convex/_generated/dataModel";
import { Skeleton } from "@/components/ui/skeleton";
import { Badge } from "@/components/ui/badge";
import {
@@ -96,7 +97,9 @@ function AuditsBoardLoading() {
export function AuditsBoard() {
const dashboardRows = useQuery(api.audits.listDashboardRows, { limit: 100 });
+ const retryAuditRun = useMutation(api.audits.retryAuditRun);
const [activeFilter, setActiveFilter] = useState("all");
+ const [retryingRunId, setRetryingRunId] = useState(null);
const rows = useMemo(() => {
if (!dashboardRows) {
return [];
@@ -141,6 +144,14 @@ export function AuditsBoard() {
{ label: "Pipeline", value: "generation", count: statusCounts.generation },
{ label: "Fehlgeschlagen", value: "failed", count: statusCounts.failed },
];
+ const handleRetryAudit = async (runId: Id<"agentRuns">) => {
+ setRetryingRunId(runId);
+ try {
+ await retryAuditRun({ runId });
+ } finally {
+ setRetryingRunId(null);
+ }
+ };
if (dashboardRows === undefined) {
return ;
@@ -260,6 +271,41 @@ export function AuditsBoard() {
{row.errorSummary}
) : null}
+ {row.kind === "generation" ? (
+
+
+
+ {row.progress.step}/{row.progress.total} · {row.progress.label}
+
+
+ {row.progress.percent}%
+
+
+
+ {row.retry.isRetrying ? (
+
+ Versuch {row.retry.attempt}/{row.retry.maxAttempts} läuft
+
+ ) : null}
+ {row.retry.isRetrying && row.retry.lastRetryReason ? (
+
+ {row.retry.lastRetryReason}
+
+ ) : null}
+
+ ) : null}
@@ -271,6 +317,18 @@ export function AuditsBoard() {
Öffnen
+ ) : row.canRetry ? (
+
) : (
Pipeline läuft
diff --git a/convex/_generated/api.d.ts b/convex/_generated/api.d.ts
index 2d6f048..e748a79 100644
--- a/convex/_generated/api.d.ts
+++ b/convex/_generated/api.d.ts
@@ -11,6 +11,7 @@
import type * as auditGeneration from "../auditGeneration.js";
import type * as auditGenerationAction from "../auditGenerationAction.js";
import type * as auditInputs from "../auditInputs.js";
+import type * as auditWorkflow from "../auditWorkflow.js";
import type * as audits from "../audits.js";
import type * as blacklist from "../blacklist.js";
import type * as campaignMetrics from "../campaignMetrics.js";
@@ -42,6 +43,7 @@ declare const fullApi: ApiFromModules<{
auditGeneration: typeof auditGeneration;
auditGenerationAction: typeof auditGenerationAction;
auditInputs: typeof auditInputs;
+ auditWorkflow: typeof auditWorkflow;
audits: typeof audits;
blacklist: typeof blacklist;
campaignMetrics: typeof campaignMetrics;
@@ -92,4 +94,6 @@ export declare const internal: FilterApi<
export declare const components: {
betterAuth: import("../betterAuth/_generated/component.js").ComponentApi<"betterAuth">;
+ workflow: import("@convex-dev/workflow/_generated/component.js").ComponentApi<"workflow">;
+ auditWorkpool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"auditWorkpool">;
};
diff --git a/convex/auditGeneration.ts b/convex/auditGeneration.ts
index 13975f5..ebc9440 100644
--- a/convex/auditGeneration.ts
+++ b/convex/auditGeneration.ts
@@ -350,6 +350,7 @@ export const queueLeadAuditGeneration = internalMutation({
leadId: v.id("leads"),
auditId: v.optional(v.id("audits")),
parentRunId: v.optional(v.id("agentRuns")),
+ scheduleAction: v.optional(v.boolean()),
},
returns: v.union(v.id("agentRuns"), v.null()),
handler: async (ctx, args): Promise | null> => {
@@ -418,13 +419,15 @@ export const queueLeadAuditGeneration = internalMutation({
createdAt: now,
});
- await ctx.scheduler.runAfter(
- 0,
- internal.auditGenerationAction.processAuditGeneration,
- {
- runId,
- },
- );
+ if (args.scheduleAction !== false) {
+ await ctx.scheduler.runAfter(
+ 0,
+ internal.auditGenerationAction.processAuditGeneration,
+ {
+ runId,
+ },
+ );
+ }
return runId;
},
@@ -460,7 +463,11 @@ export const startAuditGenerationRun = internalMutation({
const now = Date.now();
const run = await ctx.db.get(args.runId);
- if (!run || run.type !== "audit_generation" || run.status !== "pending") {
+ if (
+ !run ||
+ run.type !== "audit_generation" ||
+ (run.status !== "pending" && run.status !== "failed")
+ ) {
return null;
}
@@ -511,6 +518,7 @@ export const startAuditGenerationRun = internalMutation({
status: "running",
currentStep: "audit_generation",
startedAt: now,
+ finishedAt: undefined,
updatedAt: now,
errorSummary: undefined,
});
diff --git a/convex/auditGenerationAction.ts b/convex/auditGenerationAction.ts
index 0d7a70f..f88f939 100644
--- a/convex/auditGenerationAction.ts
+++ b/convex/auditGenerationAction.ts
@@ -18,6 +18,8 @@ import {
qualityReviewSchema,
type AuditSpecialistFinding,
type AuditSpecialistResult,
+ type QualityReview,
+ type QualityReviewRevisedCopy,
} from "../lib/ai/schemas";
import {
validateCustomerFacingCopy,
@@ -32,6 +34,7 @@ import {
type ScreenshotOneRequest,
} from "../lib/external-audit-services";
import { type AuditUsedSkill } from "../lib/skills-registry";
+import { getAuditProgressForStep } from "../lib/audits/progress";
import { internal } from "./_generated/api";
import type { Id } from "./_generated/dataModel";
import {
@@ -281,6 +284,44 @@ type GermanCopyOutput = {
};
};
+function applyRevisedCopy(
+ currentCopy: GermanCopyOutput,
+ revisedCopy: QualityReviewRevisedCopy,
+): GermanCopyOutput {
+ return {
+ ...currentCopy,
+ publicSummary: revisedCopy.publicSummary,
+ publicBody: revisedCopy.publicBody,
+ emailSubject: revisedCopy.emailSubject,
+ emailBody: revisedCopy.emailBody,
+ phoneScript: {
+ openingLine: revisedCopy.phoneScript.openingLine,
+ callScript: revisedCopy.phoneScript.callScript,
+ closeLine: revisedCopy.phoneScript.closeLine,
+ },
+ followUpDraft: {
+ message: revisedCopy.followUpDraft.message,
+ ...(revisedCopy.followUpDraft.followInDays !== null
+ ? { followInDays: revisedCopy.followUpDraft.followInDays }
+ : {}),
+ ...(revisedCopy.followUpDraft.goals !== null
+ ? { goals: revisedCopy.followUpDraft.goals }
+ : {}),
+ },
+ };
+}
+
+function germanCopyGuardTelemetry(guardResult: GermanCopyGuardResult) {
+ return {
+ passed: guardResult.passed,
+ issues: guardResult.issues.map((issue) => ({
+ field: issue.field,
+ rule: issue.rule,
+ message: issue.message,
+ })),
+ };
+}
+
type MultimodalContentPart =
| {
type: "text";
@@ -554,7 +595,11 @@ function buildQualityReviewPrompt(
`Öffentlicher Text: ${germanCopy.publicBody}`,
`Email-Betreff: ${germanCopy.emailSubject}`,
`Email-Text: ${germanCopy.emailBody}`,
- "Antworte als JSON mit isValid, issues, suggestions, notes.",
+ "Wenn die Copy nur stilistische oder leichte fachliche Hinweise hat, nutze severity warning und rewriteRequired true.",
+ "Wenn eine Korrektur sinnvoll ist, liefere revisedCopy vollständig mit publicSummary, publicBody, emailSubject, emailBody, phoneScript und followUpDraft.",
+ "Wenn keine Korrektur nötig ist, setze rewriteRequired false und revisedCopy null.",
+ "Nutze severity unsafe nur für harte Risiken wie falsche Sprache, erfundene Behauptungen, aggressive Tonalität oder Rohdaten-Leaks.",
+ "Antworte als JSON mit isValid, severity, issues, suggestions, rewriteRequired, revisedCopy und notes.",
].join("\n");
}
@@ -630,6 +675,27 @@ async function appendRunEvent(
});
}
+async function updateRootAuditProgress(
+ ctx: ActionCtx,
+ rootRunId: Id<"agentRuns"> | undefined,
+ currentStep: string,
+) {
+ if (!rootRunId) {
+ return;
+ }
+
+ const progress = getAuditProgressForStep(currentStep);
+ await ctx.runMutation(internal.runs.updateProgressInternal, {
+ id: rootRunId,
+ status: "running",
+ currentStep,
+ progressStep: progress.step,
+ progressTotal: progress.total,
+ progressLabel: progress.label,
+ progressPercent: progress.percent,
+ });
+}
+
async function loadAuditSkillRegistry(
ctx: ActionCtx,
runId: Id<"agentRuns">,
@@ -1204,6 +1270,7 @@ function getValidMediaType(mimeType: string) {
export const processAuditGeneration = internalAction({
args: {
runId: v.id("agentRuns"),
+ rootRunId: v.optional(v.id("agentRuns")),
},
handler: async (ctx, args) => {
let started:
@@ -1345,6 +1412,7 @@ export const processAuditGeneration = internalAction({
MAX_PROMPT_BYTES,
);
currentStep = "classification";
+ await updateRootAuditProgress(ctx, args.rootRunId, currentStep);
await persistAuditStage({
ctx,
runId: args.runId,
@@ -1545,6 +1613,7 @@ export const processAuditGeneration = internalAction({
const verifierSystemPrompt =
"Du bist EvidenceQA. Verifiziere Befunde streng gegen belegte Evidence-Refs.";
currentStep = "evidenceVerifier";
+ await updateRootAuditProgress(ctx, args.rootRunId, currentStep);
await persistAuditStage({
ctx,
@@ -1690,6 +1759,7 @@ export const processAuditGeneration = internalAction({
}
currentStep = "multimodalAudit";
+ await updateRootAuditProgress(ctx, args.rootRunId, currentStep);
const validScreenshotParts = screenshotParts.filter(
(part): part is MultimodalFilePart => part !== null,
@@ -1844,6 +1914,7 @@ export const processAuditGeneration = internalAction({
}
currentStep = "germanCopy";
+ await updateRootAuditProgress(ctx, args.rootRunId, currentStep);
// Stage 3: german copy generation
const germanSystemPrompt =
"Du bist fachlicher Texter für lokale Unternehmen im B2B-Kontext.";
@@ -1856,61 +1927,65 @@ export const processAuditGeneration = internalAction({
const safeGermanPrompt = sanitizeAndCapString(germanPrompt, MAX_PROMPT_BYTES);
try {
- const publicSummaryResult = await generateObject({
- model: provider(germanCopyProfile.modelId),
- system: germanSystemPrompt,
- schema: publicAuditTextSchema,
- prompt: safeGermanPrompt
- ? `${safeGermanPrompt}\nAusgabe für publicSummary`
- : "Ausgabe für publicSummary",
- temperature: germanCopyProfile.temperature,
- maxOutputTokens: germanCopyProfile.maxTokens,
- });
-
- const germanBodyResult = await generateObject({
- model: provider(germanCopyProfile.modelId),
- system: germanSystemPrompt,
- schema: publicAuditTextSchema,
- prompt: `${safeGermanPrompt ?? ""}\nAusgabe für publicBody`,
- temperature: germanCopyProfile.temperature,
- maxOutputTokens: germanCopyProfile.maxTokens,
- });
-
- const germanSubjectResult = await generateObject({
- model: provider(germanCopyProfile.modelId),
- system: germanSystemPrompt,
- schema: emailSubjectSchema,
- prompt: `${safeGermanPrompt ?? ""}\nAusgabe für emailSubject`,
- temperature: germanCopyProfile.temperature,
- maxOutputTokens: germanCopyProfile.maxTokens,
- });
-
- const germanEmailResult = await generateObject({
- model: provider(germanCopyProfile.modelId),
- system: germanSystemPrompt,
- schema: emailDraftSchema,
- prompt: `${safeGermanPrompt ?? ""}\nAusgabe für emailBody`,
- temperature: germanCopyProfile.temperature,
- maxOutputTokens: germanCopyProfile.maxTokens,
- });
-
- const germanCallScriptResult = await generateObject({
- model: provider(germanCopyProfile.modelId),
- system: germanSystemPrompt,
- schema: callScriptSchema,
- prompt: `${safeGermanPrompt ?? ""}\nAusgabe für callScript`,
- temperature: germanCopyProfile.temperature,
- maxOutputTokens: germanCopyProfile.maxTokens,
- });
-
- const germanFollowUpResult = await generateObject({
- model: provider(germanCopyProfile.modelId),
- system: germanSystemPrompt,
- schema: followUpDraftSchema,
- prompt: `${safeGermanPrompt ?? ""}\nAusgabe für followUpDraft`,
- temperature: germanCopyProfile.temperature,
- maxOutputTokens: germanCopyProfile.maxTokens,
- });
+ const [
+ publicSummaryResult,
+ germanBodyResult,
+ germanSubjectResult,
+ germanEmailResult,
+ germanCallScriptResult,
+ germanFollowUpResult,
+ ] = await Promise.all([
+ generateObject({
+ model: provider(germanCopyProfile.modelId),
+ system: germanSystemPrompt,
+ schema: publicAuditTextSchema,
+ prompt: safeGermanPrompt
+ ? `${safeGermanPrompt}\nAusgabe für publicSummary`
+ : "Ausgabe für publicSummary",
+ temperature: germanCopyProfile.temperature,
+ maxOutputTokens: germanCopyProfile.maxTokens,
+ }),
+ generateObject({
+ model: provider(germanCopyProfile.modelId),
+ system: germanSystemPrompt,
+ schema: publicAuditTextSchema,
+ prompt: `${safeGermanPrompt ?? ""}\nAusgabe für publicBody`,
+ temperature: germanCopyProfile.temperature,
+ maxOutputTokens: germanCopyProfile.maxTokens,
+ }),
+ generateObject({
+ model: provider(germanCopyProfile.modelId),
+ system: germanSystemPrompt,
+ schema: emailSubjectSchema,
+ prompt: `${safeGermanPrompt ?? ""}\nAusgabe für emailSubject`,
+ temperature: germanCopyProfile.temperature,
+ maxOutputTokens: germanCopyProfile.maxTokens,
+ }),
+ generateObject({
+ model: provider(germanCopyProfile.modelId),
+ system: germanSystemPrompt,
+ schema: emailDraftSchema,
+ prompt: `${safeGermanPrompt ?? ""}\nAusgabe für emailBody`,
+ temperature: germanCopyProfile.temperature,
+ maxOutputTokens: germanCopyProfile.maxTokens,
+ }),
+ generateObject({
+ model: provider(germanCopyProfile.modelId),
+ system: germanSystemPrompt,
+ schema: callScriptSchema,
+ prompt: `${safeGermanPrompt ?? ""}\nAusgabe für callScript`,
+ temperature: germanCopyProfile.temperature,
+ maxOutputTokens: germanCopyProfile.maxTokens,
+ }),
+ generateObject({
+ model: provider(germanCopyProfile.modelId),
+ system: germanSystemPrompt,
+ schema: followUpDraftSchema,
+ prompt: `${safeGermanPrompt ?? ""}\nAusgabe für followUpDraft`,
+ temperature: germanCopyProfile.temperature,
+ maxOutputTokens: germanCopyProfile.maxTokens,
+ }),
+ ]);
const publicSummary = publicSummaryResult.object.publicText ?? "";
const publicBody = germanBodyResult.object.publicText ?? "";
@@ -2015,42 +2090,89 @@ export const processAuditGeneration = internalAction({
},
followUp: germanCopyOutput.followUpDraft.message,
});
+ const deterministicGuard = germanCopyGuardTelemetry(guardResult);
// Stage 4: final quality review
- const qualityPrompt = buildQualityReviewPrompt(
+ let qualityPrompt = buildQualityReviewPrompt(
verifiedFindingsText,
germanCopyOutput,
);
- const safeQualityPrompt = sanitizeAndCapString(qualityPrompt, MAX_PROMPT_BYTES);
+ let safeQualityPrompt = sanitizeAndCapString(qualityPrompt, MAX_PROMPT_BYTES);
const qualitySystemPrompt =
"Du prüfst die erzeugten Inhalte als Qualitätssicherung.";
currentStep = "qualityReview";
+ await updateRootAuditProgress(ctx, args.rootRunId, currentStep);
try {
- const qualityResult = await generateObject({
- model: provider(qualityReviewProfile.modelId),
- system: qualitySystemPrompt,
- schema: qualityReviewSchema,
- prompt: safeQualityPrompt ?? "",
- temperature: qualityReviewProfile.temperature,
- maxOutputTokens: qualityReviewProfile.maxTokens,
- });
+ let finalQualityReview: QualityReview | null = null;
+ let qualityFinishReason: string | undefined;
+ let rewriteApplied = false;
+ let copyReviewAttempts = 0;
+ const qualityReviewUsages: Array = [];
- qualityPassed = qualityResult.object.isValid && guardResult.passed;
+ while (copyReviewAttempts < 2) {
+ copyReviewAttempts += 1;
+ const qualityResult = await generateObject({
+ model: provider(qualityReviewProfile.modelId),
+ system: qualitySystemPrompt,
+ schema: qualityReviewSchema,
+ prompt: safeQualityPrompt ?? "",
+ temperature: qualityReviewProfile.temperature,
+ maxOutputTokens: qualityReviewProfile.maxTokens,
+ });
+
+ finalQualityReview = qualityResult.object;
+ qualityFinishReason = qualityResult.finishReason;
+ qualityReviewUsages.push(qualityResult.usage);
+
+ if (
+ copyReviewAttempts === 1 &&
+ qualityResult.object.rewriteRequired &&
+ qualityResult.object.revisedCopy
+ ) {
+ germanCopyOutput = applyRevisedCopy(
+ germanCopyOutput,
+ qualityResult.object.revisedCopy,
+ );
+ rewriteApplied = true;
+ await appendRunEvent(ctx, {
+ runId: args.runId,
+ level: "warning",
+ message: "Copy-Review hat korrigiert.",
+ details: qualityResult.object.issues.slice(0, 4).map((issue) => ({
+ label: "Hinweis",
+ value: issue,
+ })),
+ });
+
+ qualityPrompt = buildQualityReviewPrompt(
+ verifiedFindingsText,
+ germanCopyOutput,
+ );
+ safeQualityPrompt = sanitizeAndCapString(
+ qualityPrompt,
+ MAX_PROMPT_BYTES,
+ );
+ continue;
+ }
+
+ break;
+ }
+
+ if (!finalQualityReview) {
+ throw new Error("Copy-Review konnte nicht ausgewertet werden.");
+ }
+
+ qualityPassed =
+ finalQualityReview.isValid && finalQualityReview.severity === "ok";
const qualityPayload = {
- isValid: qualityResult.object.isValid && guardResult.passed,
- issues: [
- ...qualityResult.object.issues,
- ...guardResult.issues.map(
- (issue) => `${issue.field}: ${issue.message}`,
- ),
- ],
- suggestions: qualityResult.object.suggestions,
- notes: qualityResult.object.notes ?? [],
+ ...finalQualityReview,
+ rewriteApplied,
+ reviewAttempts: copyReviewAttempts,
+ deterministicGuard,
+ finalDecision: qualityPassed ? "approved" : "stored_with_warnings",
};
- const qualityErrorSummary =
- "Qualitätsprüfung hat Inhalte als ungenügend markiert.";
await persistAuditStage({
ctx,
@@ -2067,43 +2189,26 @@ export const processAuditGeneration = internalAction({
MAX_RAW_RESPONSE_BYTES,
),
parsedJson: sanitizeAndCapParsedJson(qualityPayload),
- ...withStageUsage(qualityResult.usage),
- status: qualityPassed ? "succeeded" : "failed",
- finishReason: qualityResult.finishReason,
- ...(!qualityPassed ? { errorSummary: qualityErrorSummary } : {}),
+ ...withStageUsage(aggregateOpenRouterUsage(qualityReviewUsages)),
+ status: "succeeded",
+ ...(qualityFinishReason ? { finishReason: qualityFinishReason } : {}),
});
await recordOpenRouterUsage(ctx, {
runId: args.runId,
leadId: started.lead._id,
...(auditId ? { auditId } : {}),
- usage: qualityResult.usage,
+ usage: aggregateOpenRouterUsage(qualityReviewUsages),
});
- if (!qualityPassed) {
- const message =
- "Qualitätsprüfung und German-Copy-Guard haben nicht bestanden.";
+ if (!qualityPassed || !guardResult.passed) {
await appendRunEvent(ctx, {
runId: args.runId,
level: "warning",
- message,
- });
- await ctx.runMutation(internal.auditGeneration.finishAuditGenerationRun, {
- runId: args.runId,
- status: "failed",
- currentStep: "qualityReview",
- errors: errors + 1,
- errorSummary: message,
- });
- return null;
- }
-
- if (!qualityResult.object.isValid) {
- await appendRunEvent(ctx, {
- runId: args.runId,
- level: "warning",
- message:
- "Qualitätsprüfung hat Review-Hinweise gemeldet; German-Copy-Guard bestanden.",
- details: qualityResult.object.issues.slice(0, 4).map((issue) => ({
+ message: "Copy-Review mit Hinweisen abgeschlossen.",
+ details: [
+ ...finalQualityReview.issues,
+ ...guardResult.issues.map((issue) => issue.message),
+ ].slice(0, 4).map((issue) => ({
label: "Hinweis",
value: issue,
})),
@@ -2288,3 +2393,25 @@ export const processAuditGeneration = internalAction({
}
},
});
+
+export const processAuditGenerationForWorkflow = internalAction({
+ args: {
+ runId: v.id("agentRuns"),
+ rootRunId: v.id("agentRuns"),
+ },
+ handler: async (ctx, args): Promise> => {
+ const result = await ctx.runAction(
+ internal.auditGenerationAction.processAuditGeneration,
+ {
+ runId: args.runId,
+ rootRunId: args.rootRunId,
+ },
+ );
+
+ if (!result) {
+ throw new Error("Audit-Generierung konnte nicht abgeschlossen werden.");
+ }
+
+ return args.runId;
+ },
+});
diff --git a/convex/auditWorkflow.ts b/convex/auditWorkflow.ts
new file mode 100644
index 0000000..d343848
--- /dev/null
+++ b/convex/auditWorkflow.ts
@@ -0,0 +1,238 @@
+import { WorkflowManager, type WorkflowId } from "@convex-dev/workflow";
+import { v } from "convex/values";
+
+import { components, internal } from "./_generated/api";
+import type { Id } from "./_generated/dataModel";
+import { internalMutation } from "./_generated/server";
+import { getAuditProgressForStep } from "../lib/audits/progress";
+
+const MAX_ATTEMPTS = 3;
+
+export const workflow = new WorkflowManager(components.workflow, {
+ workpoolOptions: {
+ maxParallelism: 3,
+ retryActionsByDefault: true,
+ defaultRetryBehavior: {
+ maxAttempts: 3,
+ initialBackoffMs: 1000,
+ base: 2,
+ },
+ },
+});
+
+function progressPatch(runId: Id<"agentRuns">, currentStep: string) {
+ const progress = getAuditProgressForStep(currentStep);
+
+ return {
+ id: runId,
+ currentStep,
+ progressStep: progress.step,
+ progressTotal: progress.total,
+ progressLabel: progress.label,
+ progressPercent: progress.percent,
+ };
+}
+
+function errorMessage(error: unknown) {
+ return error instanceof Error ? error.message : String(error);
+}
+
+export const runLeadAuditWorkflow = workflow
+ .define({
+ args: {
+ runId: v.id("agentRuns"),
+ },
+ })
+ .handler(async (step, args): Promise> => {
+ try {
+ await step.runMutation(
+ internal.runs.updateProgressInternal,
+ {
+ ...progressPatch(args.runId, "audit_prepared"),
+ status: "running",
+ maxAttempts: MAX_ATTEMPTS,
+ },
+ { name: "1/6 Audit vorbereitet" },
+ );
+
+ const [, pageSpeedResult] = await Promise.all([
+ step.runMutation(
+ internal.runs.updateProgressInternal,
+ {
+ ...progressPatch(args.runId, "pagespeed_insights"),
+ status: "running",
+ },
+ { name: "2/6 Messe PageSpeed" },
+ ),
+ step.runAction(
+ internal.pageSpeedAction.processPageSpeedAuditForWorkflow,
+ { runId: args.runId },
+ { name: "PageSpeed mobile/desktop", retry: true },
+ ),
+ ]);
+
+ if (!pageSpeedResult) {
+ throw new Error("PageSpeed-Analyse konnte nicht abgeschlossen werden.");
+ }
+
+ const auditRun = await step.runQuery(
+ internal.runs.getAuditRunForWorkflowInternal,
+ { id: args.runId },
+ { name: "Audit-Run laden" },
+ );
+
+ if (!auditRun?.leadId) {
+ throw new Error("Audit-Run hat keine Lead-ID.");
+ }
+ if (auditRun.status === "failed" || auditRun.status === "canceled") {
+ throw new Error("PageSpeed-Analyse ist final fehlgeschlagen.");
+ }
+
+ await step.runMutation(
+ internal.runs.updateProgressInternal,
+ {
+ ...progressPatch(args.runId, "website_signals"),
+ status: "running",
+ },
+ { name: "3/6 Sammle Website-Signale" },
+ );
+
+ const generationRunId = await step.runMutation(
+ internal.auditGeneration.queueLeadAuditGeneration,
+ {
+ leadId: auditRun.leadId,
+ ...(auditRun.auditId ? { auditId: auditRun.auditId } : {}),
+ parentRunId: args.runId,
+ scheduleAction: false,
+ },
+ { name: "Audit-Generierung vorbereiten" },
+ );
+
+ if (!generationRunId) {
+ throw new Error("Audit-Generierung konnte nicht angelegt werden.");
+ }
+
+ await step.runMutation(
+ internal.runs.updateProgressInternal,
+ {
+ ...progressPatch(args.runId, "classification"),
+ status: "running",
+ },
+ { name: "4/6 Bewerte Befunde" },
+ );
+
+ const generationResult = await step.runAction(
+ internal.auditGenerationAction.processAuditGenerationForWorkflow,
+ { runId: generationRunId, rootRunId: args.runId },
+ { name: "Specialists und German Copy", retry: true },
+ );
+
+ if (!generationResult) {
+ throw new Error("Audit-Generierung konnte nicht abgeschlossen werden.");
+ }
+
+ await step.runMutation(
+ internal.runs.updateProgressInternal,
+ {
+ ...progressPatch(args.runId, "qualityReview"),
+ status: "succeeded",
+ },
+ { name: "6/6 Speichere Audit" },
+ );
+
+ return args.runId;
+ } catch (error) {
+ const message = errorMessage(error);
+ await step.runMutation(
+ internal.runs.updateProgressInternal,
+ {
+ id: args.runId,
+ status: "failed",
+ errorSummary: "Audit konnte nach automatischen Versuchen nicht abgeschlossen werden.",
+ lastRetryReason: message,
+ },
+ { name: "Audit final fehlgeschlagen", unstableArgs: true },
+ );
+ throw error;
+ }
+ });
+
+export const startLeadAuditWorkflow = internalMutation({
+ args: {
+ runId: v.id("agentRuns"),
+ },
+ handler: async (ctx, args): Promise => {
+ const workflowId: WorkflowId = await workflow.start(
+ ctx,
+ internal.auditWorkflow.runLeadAuditWorkflow,
+ { runId: args.runId },
+ { startAsync: true },
+ );
+
+ await ctx.db.patch(args.runId, {
+ workflowId: String(workflowId),
+ attempt: 1,
+ maxAttempts: MAX_ATTEMPTS,
+ updatedAt: Date.now(),
+ });
+
+ return workflowId;
+ },
+});
+
+export const restartAuditWorkflow = internalMutation({
+ args: {
+ runId: v.id("agentRuns"),
+ },
+ handler: async (ctx, args): Promise => {
+ const run = await ctx.db.get(args.runId);
+ if (!run || run.type !== "audit") {
+ throw new Error("Audit-Run wurde nicht gefunden.");
+ }
+
+ const nextAttempt = (run.attempt ?? 1) + 1;
+ const maxAttempts = run.maxAttempts ?? MAX_ATTEMPTS;
+ if (nextAttempt > maxAttempts) {
+ throw new Error("Maximale Anzahl an Audit-Versuchen erreicht.");
+ }
+
+ await ctx.db.patch(args.runId, {
+ status: "pending",
+ currentStep: "pagespeed_insights",
+ errorSummary: undefined,
+ lastRetryReason:
+ "Provider war kurz nicht erreichbar, ich versuche es erneut",
+ attempt: nextAttempt,
+ maxAttempts,
+ progressStep: 1,
+ progressTotal: 6,
+ progressLabel: "Audit vorbereitet",
+ progressPercent: 17,
+ startedAt: undefined,
+ finishedAt: undefined,
+ updatedAt: Date.now(),
+ });
+
+ if (run.workflowId) {
+ await workflow.restart(ctx, run.workflowId as WorkflowId, {
+ from: 0,
+ startAsync: true,
+ });
+ return run.workflowId;
+ }
+
+ const workflowId: WorkflowId = await workflow.start(
+ ctx,
+ internal.auditWorkflow.runLeadAuditWorkflow,
+ { runId: args.runId },
+ { startAsync: true },
+ );
+
+ await ctx.db.patch(args.runId, {
+ workflowId: String(workflowId),
+ updatedAt: Date.now(),
+ });
+
+ return String(workflowId);
+ },
+});
diff --git a/convex/audits.ts b/convex/audits.ts
index f2cfcfc..c26839f 100644
--- a/convex/audits.ts
+++ b/convex/audits.ts
@@ -1,9 +1,11 @@
import { v } from "convex/values";
import { normalizeListLimit } from "./domain";
+import { internal } from "./_generated/api";
import { internalMutation, mutation, query } from "./_generated/server";
import type { Doc, Id } from "./_generated/dataModel";
import type { MutationCtx, QueryCtx } from "./_generated/server";
+import { getAuditProgressForStep } from "../lib/audits/progress";
export const AUDIT_REVIEW_NOTICE_AFTER_MS = 30 * 24 * 60 * 60 * 1000;
const DETAIL_EVIDENCE_LIMIT = 50;
@@ -63,12 +65,27 @@ type AuditDashboardRow =
id: Id<"agentRuns">;
runId: Id<"agentRuns">;
leadId: Id<"leads"> | null;
+ runType: Doc<"agentRuns">["type"];
title: string;
checkedDomain: string;
status: Doc<"agentRuns">["status"];
latestStage: string;
stageStatus: Doc<"agentRuns">["status"];
errorSummary: string | null;
+ progress: {
+ step: number;
+ total: number;
+ label: string;
+ percent: number;
+ };
+ retry: {
+ attempt: number;
+ maxAttempts: number;
+ isRetrying: boolean;
+ lastRetryReason: string | null;
+ canRetry: boolean;
+ };
+ canRetry: boolean;
pageCount: number;
checkedPages: string[];
createdAt: number;
@@ -104,6 +121,38 @@ const latestGenerationStage = (stages: Doc<"auditGenerations">[]) => {
return [...stages].sort((a, b) => b.updatedAt - a.updatedAt)[0] ?? null;
};
+const progressForRun = (
+ run: Doc<"agentRuns">,
+ latestStage: Doc<"auditGenerations"> | null,
+) => {
+ const fallback = getAuditProgressForStep(latestStage?.stage ?? run.currentStep);
+
+ return {
+ step: run.progressStep ?? fallback.step,
+ total: run.progressTotal ?? fallback.total,
+ label: run.progressLabel ?? fallback.label,
+ percent: run.progressPercent ?? fallback.percent,
+ };
+};
+
+const retryForRun = (run: Doc<"agentRuns">) => {
+ const attempt = run.attempt ?? 1;
+ const maxAttempts = run.maxAttempts ?? 3;
+ const canRetry =
+ run.type === "audit" &&
+ (run.status === "failed" || run.status === "canceled") &&
+ attempt < maxAttempts;
+
+ return {
+ attempt,
+ maxAttempts,
+ isRetrying:
+ (run.status === "pending" || run.status === "running") && attempt > 1,
+ lastRetryReason: run.lastRetryReason ?? null,
+ canRetry,
+ };
+};
+
const normalizeComparableAuditUrl = (value: string | null | undefined) => {
const trimmed = value?.trim();
if (!trimmed) {
@@ -727,6 +776,31 @@ export const list = query({
},
});
+export const retryAuditRun = mutation({
+ args: {
+ runId: v.id("agentRuns"),
+ },
+ handler: async (ctx, args) => {
+ await requireOperator(ctx);
+
+ const run = await ctx.db.get(args.runId);
+ if (!run || run.type !== "audit") {
+ throw new Error("Audit-Run wurde nicht gefunden.");
+ }
+
+ const status = run.status;
+ if (status !== "failed" && status !== "canceled") {
+ throw new Error("Nur final fehlgeschlagene Audits können neu gestartet werden.");
+ }
+
+ await ctx.scheduler.runAfter(0, internal.auditWorkflow.restartAuditWorkflow, {
+ runId: args.runId,
+ });
+
+ return { runId: args.runId };
+ },
+});
+
export const listDashboardRows = query({
args: {
limit: v.optional(v.number()),
@@ -771,29 +845,50 @@ export const listDashboardRows = query({
}
}
+ const rootAuditRuns = await ctx.db
+ .query("agentRuns")
+ .withIndex("by_type", (q) => q.eq("type", "audit"))
+ .order("desc")
+ .take(limit);
+ const rootAuditRunLeadIds = new Set(
+ rootAuditRuns
+ .map((run) => run.leadId)
+ .filter((leadId): leadId is Id<"leads"> => leadId !== undefined),
+ );
+
const generationRuns = await ctx.db
.query("agentRuns")
.withIndex("by_type", (q) => q.eq("type", "audit_generation"))
.order("desc")
.take(limit);
- for (const run of generationRuns) {
+ for (const run of [...rootAuditRuns, ...generationRuns]) {
if (!run.leadId) {
continue;
}
+ if (
+ run.type === "audit_generation" &&
+ rootAuditRunLeadIds.has(run.leadId)
+ ) {
+ continue;
+ }
+
const directFinalAudit = run.auditId ? await ctx.db.get(run.auditId) : null;
const leadFinalAudits = await ctx.db
.query("audits")
.withIndex("by_leadId", (q) => q.eq("leadId", run.leadId as Id<"leads">))
.take(1);
+ const shouldHideBehindFinalAudit =
+ run.status === "succeeded" || run.type === "audit_generation";
+
if (
- finalAuditRunIds.has(run._id) ||
- (run.auditId && finalAuditIds.has(run.auditId)) ||
- directFinalAudit ||
- finalAuditLeadIds.has(run.leadId) ||
- leadFinalAudits.length > 0
+ (shouldHideBehindFinalAudit && finalAuditRunIds.has(run._id)) ||
+ (shouldHideBehindFinalAudit && run.auditId && finalAuditIds.has(run.auditId)) ||
+ (shouldHideBehindFinalAudit && directFinalAudit) ||
+ (shouldHideBehindFinalAudit && finalAuditLeadIds.has(run.leadId)) ||
+ (shouldHideBehindFinalAudit && leadFinalAudits.length > 0)
) {
continue;
}
@@ -806,18 +901,24 @@ export const listDashboardRows = query({
const latestStage = latestGenerationStage(stages);
const lead = await ctx.db.get(run.leadId);
const checkedDomain = domainFromLead(lead);
+ const progress = progressForRun(run, latestStage);
+ const retry = retryForRun(run);
rows.push({
kind: "generation",
id: run._id,
runId: run._id,
leadId: run.leadId,
+ runType: run.type,
title: lead?.companyName ?? checkedDomain,
checkedDomain,
status: run.status,
latestStage: latestStage?.stage ?? run.currentStep ?? "audit_generation",
stageStatus: latestStage?.status ?? run.status,
errorSummary: run.errorSummary ?? latestStage?.errorSummary ?? null,
+ progress,
+ retry,
+ canRetry: retry.canRetry,
pageCount: 0,
checkedPages: [],
createdAt: run.createdAt,
diff --git a/convex/convex.config.ts b/convex/convex.config.ts
index 2feeea4..3521f64 100644
--- a/convex/convex.config.ts
+++ b/convex/convex.config.ts
@@ -1,9 +1,13 @@
import { defineApp } from "convex/server";
+import workflow from "@convex-dev/workflow/convex.config";
+import auditWorkpool from "@convex-dev/workpool/convex.config";
import betterAuth from "./betterAuth/convex.config";
const app = defineApp();
app.use(betterAuth);
+app.use(workflow);
+app.use(auditWorkpool, { name: "auditWorkpool" });
export default app;
diff --git a/convex/pageSpeed.ts b/convex/pageSpeed.ts
index 0325deb..0f05700 100644
--- a/convex/pageSpeed.ts
+++ b/convex/pageSpeed.ts
@@ -190,6 +190,13 @@ async function queueLeadPageSpeedAuditForLead(
leadId: args.leadId,
status: "pending",
currentStep: "pagespeed_insights",
+ workflowId: undefined,
+ attempt: 1,
+ maxAttempts: 3,
+ progressStep: 1,
+ progressTotal: 6,
+ progressLabel: "Audit vorbereitet",
+ progressPercent: 17,
counters: PAGE_SPEED_COUNTER_TEMPLATE,
createdAt: now,
updatedAt: now,
@@ -211,7 +218,7 @@ async function queueLeadPageSpeedAuditForLead(
await ctx.scheduler.runAfter(
0,
- internal.pageSpeedAction.processPageSpeedAudit,
+ internal.auditWorkflow.startLeadAuditWorkflow,
{
runId,
},
@@ -328,7 +335,11 @@ export const startPageSpeedAuditRun = internalMutation({
return null;
}
- if (run.status !== "pending") {
+ if (
+ run.status !== "pending" &&
+ run.status !== "failed" &&
+ run.status !== "running"
+ ) {
return null;
}
@@ -400,6 +411,7 @@ export const startPageSpeedAuditRun = internalMutation({
status: "running",
currentStep: "pagespeed_insights",
startedAt: now,
+ finishedAt: undefined,
updatedAt: now,
errorSummary: undefined,
});
diff --git a/convex/pageSpeedAction.ts b/convex/pageSpeedAction.ts
index 6c033b0..74a6b2b 100644
--- a/convex/pageSpeedAction.ts
+++ b/convex/pageSpeedAction.ts
@@ -143,6 +143,7 @@ async function queueAuditGenerationAfterPageSpeed(
export const processPageSpeedAudit = internalAction({
args: {
runId: v.id("agentRuns"),
+ queueGeneration: v.optional(v.boolean()),
},
handler: async (ctx, args) => {
const apiKeyRaw = process.env.PAGESPEED_API_KEY?.trim();
@@ -185,19 +186,82 @@ export const processPageSpeedAudit = internalAction({
let succeededStrategies = 0;
try {
- for (const strategy of STRATEGIES) {
- const fetchedAt = Date.now();
- try {
- const raw = await fetchPageSpeedResult({
- url: sourceUrl,
- strategy,
- apiKey,
- timeoutMs,
- });
- const rawJson = JSON.stringify(raw) ?? "null";
- const rawJsonBytes = new TextEncoder().encode(rawJson).byteLength;
- if (rawJsonBytes > MAX_RAW_PAGESPEED_BYTES) {
- failedStrategies += 1;
+ const strategyResults = await Promise.all(
+ STRATEGIES.map(async (strategy) => {
+ const fetchedAt = Date.now();
+ try {
+ const raw = await fetchPageSpeedResult({
+ url: sourceUrl,
+ strategy,
+ apiKey,
+ timeoutMs,
+ });
+ const rawJson = JSON.stringify(raw) ?? "null";
+ const rawJsonBytes = new TextEncoder().encode(rawJson).byteLength;
+ if (rawJsonBytes > MAX_RAW_PAGESPEED_BYTES) {
+ await ctx.runMutation(internal.pageSpeed.persistPageSpeedResult, {
+ leadId: started.lead._id,
+ ...(started.auditId ? { auditId: started.auditId } : {}),
+ runId: args.runId,
+ strategy,
+ status: "failed",
+ sourceUrl,
+ errorType: "api_error",
+ errorSummary: RAW_PAGESPEED_BYTES_SUMMARY,
+ fetchedAt,
+ });
+
+ await ctx.runMutation(internal.runs.appendEventInternal, {
+ runId: args.runId,
+ level: "warning",
+ message: `PageSpeed-Analyse für ${strategy} fehlgeschlagen.`,
+ details: [
+ { label: "Strategie", value: strategy },
+ {
+ label: "Fehler",
+ value: RAW_PAGESPEED_BYTES_SUMMARY,
+ },
+ ],
+ });
+
+ return "failed" as const;
+ }
+
+ const rawStorageId = await ctx.storage.store(
+ new Blob([rawJson], { type: "application/json" }),
+ );
+ const normalized = normalizePageSpeedResult({
+ strategy,
+ sourceUrl,
+ raw,
+ });
+
+ await ctx.runMutation(internal.pageSpeed.persistPageSpeedResult, {
+ leadId: started.lead._id,
+ ...(started.auditId ? { auditId: started.auditId } : {}),
+ runId: args.runId,
+ strategy,
+ status: "succeeded",
+ sourceUrl,
+ finalUrl: normalized.finalUrl,
+ rawStorageId,
+ fetchedAt,
+ normalized: toPersistedPageSpeedNormalizedResult(normalized),
+ });
+
+ await ctx.runMutation(internal.runs.appendEventInternal, {
+ runId: args.runId,
+ level: "info",
+ message: `PageSpeed-Analyse für ${strategy} abgeschlossen.`,
+ details: [{ label: "Strategie", value: strategy }],
+ });
+ return "succeeded" as const;
+ } catch (error) {
+ const { errorType, errorSummary } = classifyPageSpeedFailure(
+ error,
+ apiKeyRaw,
+ );
+
await ctx.runMutation(internal.pageSpeed.persistPageSpeedResult, {
leadId: started.lead._id,
...(started.auditId ? { auditId: started.auditId } : {}),
@@ -205,8 +269,8 @@ export const processPageSpeedAudit = internalAction({
strategy,
status: "failed",
sourceUrl,
- errorType: "api_error",
- errorSummary: RAW_PAGESPEED_BYTES_SUMMARY,
+ errorType,
+ errorSummary,
fetchedAt,
});
@@ -216,75 +280,18 @@ export const processPageSpeedAudit = internalAction({
message: `PageSpeed-Analyse für ${strategy} fehlgeschlagen.`,
details: [
{ label: "Strategie", value: strategy },
- {
- label: "Fehler",
- value: RAW_PAGESPEED_BYTES_SUMMARY,
- },
+ { label: "Fehler", value: errorSummary },
],
});
-
- continue;
+ return "failed" as const;
}
+ }),
+ );
- const rawStorageId = await ctx.storage.store(
- new Blob([rawJson], { type: "application/json" }),
- );
- const normalized = normalizePageSpeedResult({
- strategy,
- sourceUrl,
- raw,
- });
-
- await ctx.runMutation(internal.pageSpeed.persistPageSpeedResult, {
- leadId: started.lead._id,
- ...(started.auditId ? { auditId: started.auditId } : {}),
- runId: args.runId,
- strategy,
- status: "succeeded",
- sourceUrl,
- finalUrl: normalized.finalUrl,
- rawStorageId,
- fetchedAt,
- normalized: toPersistedPageSpeedNormalizedResult(normalized),
- });
-
- await ctx.runMutation(internal.runs.appendEventInternal, {
- runId: args.runId,
- level: "info",
- message: `PageSpeed-Analyse für ${strategy} abgeschlossen.`,
- details: [{ label: "Strategie", value: strategy }],
- });
- succeededStrategies += 1;
- } catch (error) {
- const { errorType, errorSummary } = classifyPageSpeedFailure(
- error,
- apiKeyRaw,
- );
- failedStrategies += 1;
-
- await ctx.runMutation(internal.pageSpeed.persistPageSpeedResult, {
- leadId: started.lead._id,
- ...(started.auditId ? { auditId: started.auditId } : {}),
- runId: args.runId,
- strategy,
- status: "failed",
- sourceUrl,
- errorType,
- errorSummary,
- fetchedAt,
- });
-
- await ctx.runMutation(internal.runs.appendEventInternal, {
- runId: args.runId,
- level: "warning",
- message: `PageSpeed-Analyse für ${strategy} fehlgeschlagen.`,
- details: [
- { label: "Strategie", value: strategy },
- { label: "Fehler", value: errorSummary },
- ],
- });
- }
- }
+ succeededStrategies = strategyResults.filter(
+ (result) => result === "succeeded",
+ ).length;
+ failedStrategies = strategyResults.length - succeededStrategies;
const status = succeededStrategies > 0 ? "succeeded" : "failed";
const errors = failedStrategies;
@@ -298,7 +305,9 @@ export const processPageSpeedAudit = internalAction({
: undefined,
});
- await queueAuditGenerationAfterPageSpeed(ctx, args.runId, started);
+ if (args.queueGeneration !== false) {
+ await queueAuditGenerationAfterPageSpeed(ctx, args.runId, started);
+ }
return args.runId;
} catch (error) {
@@ -316,8 +325,34 @@ export const processPageSpeedAudit = internalAction({
message: "PageSpeed-Analyse fehlgeschlagen.",
details: [{ label: "Fehler", value: errorSummary, source: "pagespeed_action" }],
});
- await queueAuditGenerationAfterPageSpeed(ctx, args.runId, started);
+ if (args.queueGeneration !== false) {
+ await queueAuditGenerationAfterPageSpeed(ctx, args.runId, started);
+ }
return null;
}
},
});
+
+export const processPageSpeedAuditForWorkflow = internalAction({
+ args: {
+ runId: v.id("agentRuns"),
+ },
+ handler: async (ctx, args): Promise> => {
+ const result = await ctx.runAction(
+ internal.pageSpeedAction.processPageSpeedAudit,
+ {
+ runId: args.runId,
+ queueGeneration: false,
+ },
+ );
+ const run = await ctx.runQuery(internal.runs.getAuditRunForWorkflowInternal, {
+ id: args.runId,
+ });
+
+ if (!result || run?.status === "failed" || run?.status === "canceled") {
+ throw new Error("PageSpeed-Analyse konnte nicht abgeschlossen werden.");
+ }
+
+ return args.runId;
+ },
+});
diff --git a/convex/runs.ts b/convex/runs.ts
index e664631..1e22a2a 100644
--- a/convex/runs.ts
+++ b/convex/runs.ts
@@ -7,7 +7,7 @@ import {
normalizeListLimit,
} from "./domain";
import type { Id } from "./_generated/dataModel";
-import { internalMutation, mutation, query } from "./_generated/server";
+import { internalMutation, internalQuery, mutation, query } from "./_generated/server";
import type { MutationCtx, QueryCtx } from "./_generated/server";
const runType = v.union(...RUN_TYPES.map((type) => v.literal(type)));
@@ -127,6 +127,112 @@ export const updateStatus = mutation({
},
});
+export const updateProgressInternal = internalMutation({
+ args: {
+ id: v.id("agentRuns"),
+ status: v.optional(runStatus),
+ currentStep: v.optional(v.string()),
+ errorSummary: v.optional(v.string()),
+ workflowId: v.optional(v.string()),
+ attempt: v.optional(v.number()),
+ maxAttempts: v.optional(v.number()),
+ progressStep: v.optional(v.number()),
+ progressTotal: v.optional(v.number()),
+ progressLabel: v.optional(v.string()),
+ progressPercent: v.optional(v.number()),
+ lastRetryReason: v.optional(v.string()),
+ },
+ handler: async (ctx, args) => {
+ const now = Date.now();
+ const patch: {
+ status?: (typeof RUN_STATUSES)[number];
+ updatedAt: number;
+ currentStep?: string;
+ errorSummary?: string;
+ workflowId?: string;
+ attempt?: number;
+ maxAttempts?: number;
+ progressStep?: number;
+ progressTotal?: number;
+ progressLabel?: string;
+ progressPercent?: number;
+ lastRetryReason?: string;
+ startedAt?: number;
+ finishedAt?: number;
+ } = {
+ updatedAt: now,
+ };
+
+ if (args.status !== undefined) {
+ patch.status = args.status;
+ if (args.status === "running") {
+ patch.startedAt = now;
+ patch.finishedAt = undefined;
+ }
+ if (
+ args.status === "succeeded" ||
+ args.status === "failed" ||
+ args.status === "canceled"
+ ) {
+ patch.finishedAt = now;
+ }
+ }
+ if (args.currentStep !== undefined) {
+ patch.currentStep = args.currentStep;
+ }
+ if (args.errorSummary !== undefined) {
+ patch.errorSummary = args.errorSummary;
+ }
+ if (args.workflowId !== undefined) {
+ patch.workflowId = args.workflowId;
+ }
+ if (args.attempt !== undefined) {
+ patch.attempt = args.attempt;
+ }
+ if (args.maxAttempts !== undefined) {
+ patch.maxAttempts = args.maxAttempts;
+ }
+ if (args.progressStep !== undefined) {
+ patch.progressStep = args.progressStep;
+ }
+ if (args.progressTotal !== undefined) {
+ patch.progressTotal = args.progressTotal;
+ }
+ if (args.progressLabel !== undefined) {
+ patch.progressLabel = args.progressLabel;
+ }
+ if (args.progressPercent !== undefined) {
+ patch.progressPercent = args.progressPercent;
+ }
+ if (args.lastRetryReason !== undefined) {
+ patch.lastRetryReason = args.lastRetryReason;
+ }
+
+ await ctx.db.patch(args.id, patch);
+ return args.id;
+ },
+});
+
+export const getAuditRunForWorkflowInternal = internalQuery({
+ args: {
+ id: v.id("agentRuns"),
+ },
+ handler: async (ctx, args) => {
+ const run = await ctx.db.get(args.id);
+ if (!run || run.type !== "audit") {
+ return null;
+ }
+
+ return {
+ _id: run._id,
+ leadId: run.leadId ?? null,
+ auditId: run.auditId ?? null,
+ status: run.status,
+ currentStep: run.currentStep ?? null,
+ };
+ },
+});
+
export const list = query({
args: {
status: v.optional(runStatus),
diff --git a/convex/schema.ts b/convex/schema.ts
index 219a111..715e5ea 100644
--- a/convex/schema.ts
+++ b/convex/schema.ts
@@ -642,6 +642,14 @@ export default defineSchema({
finishedAt: v.optional(v.number()),
currentStep: v.optional(v.string()),
errorSummary: v.optional(v.string()),
+ workflowId: v.optional(v.string()),
+ attempt: v.optional(v.number()),
+ maxAttempts: v.optional(v.number()),
+ progressStep: v.optional(v.number()),
+ progressTotal: v.optional(v.number()),
+ progressLabel: v.optional(v.string()),
+ progressPercent: v.optional(v.number()),
+ lastRetryReason: v.optional(v.string()),
counters: v.optional(
v.object({
leadsFound: v.number(),
diff --git a/lib/ai/schemas.ts b/lib/ai/schemas.ts
index 3ac6ac7..1b00335 100644
--- a/lib/ai/schemas.ts
+++ b/lib/ai/schemas.ts
@@ -132,10 +132,22 @@ export const followUpDraftSchema = z.object({
goals: z.array(z.string()).nullable(),
});
+export const qualityReviewRevisedCopySchema = z.object({
+ publicSummary: nonEmptyTextSchema,
+ publicBody: nonEmptyTextSchema,
+ emailSubject: nonEmptyTextSchema,
+ emailBody: nonEmptyTextSchema,
+ phoneScript: callScriptSchema,
+ followUpDraft: followUpDraftSchema,
+});
+
export const qualityReviewSchema = z.object({
isValid: z.boolean(),
+ severity: z.enum(["ok", "warning", "unsafe"]),
issues: z.array(z.string()),
suggestions: z.array(z.string()),
+ rewriteRequired: z.boolean(),
+ revisedCopy: qualityReviewRevisedCopySchema.nullable(),
notes: z.array(z.string()).nullable(),
});
@@ -154,4 +166,5 @@ export type EmailDraft = z.infer;
export type EmailSubject = z.infer;
export type CallScript = z.infer;
export type FollowUpDraft = z.infer;
+export type QualityReviewRevisedCopy = z.infer;
export type QualityReview = z.infer;
diff --git a/lib/audits/progress.ts b/lib/audits/progress.ts
new file mode 100644
index 0000000..5c1ed19
--- /dev/null
+++ b/lib/audits/progress.ts
@@ -0,0 +1,75 @@
+export const AUDIT_PROGRESS_TOTAL_STEPS = 6;
+
+export type AuditProgress = {
+ step: number;
+ total: number;
+ label: string;
+ percent: number;
+};
+
+const fallbackProgress: AuditProgress = {
+ step: 1,
+ total: AUDIT_PROGRESS_TOTAL_STEPS,
+ label: "Audit vorbereitet",
+ percent: 17,
+};
+
+const progressByStep: Record = {
+ audit_prepared: fallbackProgress,
+ pagespeed_insights: {
+ step: 2,
+ total: AUDIT_PROGRESS_TOTAL_STEPS,
+ label: "Messe PageSpeed",
+ percent: 33,
+ },
+ website_signals: {
+ step: 3,
+ total: AUDIT_PROGRESS_TOTAL_STEPS,
+ label: "Sammle Website-Signale",
+ percent: 50,
+ },
+ classification: {
+ step: 4,
+ total: AUDIT_PROGRESS_TOTAL_STEPS,
+ label: "Bewerte Befunde",
+ percent: 67,
+ },
+ evidenceVerifier: {
+ step: 4,
+ total: AUDIT_PROGRESS_TOTAL_STEPS,
+ label: "Bewerte Befunde",
+ percent: 67,
+ },
+ multimodalAudit: {
+ step: 4,
+ total: AUDIT_PROGRESS_TOTAL_STEPS,
+ label: "Bewerte Befunde",
+ percent: 67,
+ },
+ germanCopy: {
+ step: 5,
+ total: AUDIT_PROGRESS_TOTAL_STEPS,
+ label: "Erstelle Texte",
+ percent: 83,
+ },
+ qualityReview: {
+ step: 6,
+ total: AUDIT_PROGRESS_TOTAL_STEPS,
+ label: "Speichere Audit",
+ percent: 100,
+ },
+ persist_audit: {
+ step: 6,
+ total: AUDIT_PROGRESS_TOTAL_STEPS,
+ label: "Speichere Audit",
+ percent: 100,
+ },
+};
+
+export function getAuditProgressForStep(step: string | null | undefined) {
+ if (!step) {
+ return fallbackProgress;
+ }
+
+ return progressByStep[step] ?? fallbackProgress;
+}
diff --git a/package.json b/package.json
index e252d61..f91e4f2 100644
--- a/package.json
+++ b/package.json
@@ -12,6 +12,8 @@
},
"dependencies": {
"@convex-dev/better-auth": "^0.12.2",
+ "@convex-dev/workflow": "^0.4.4",
+ "@convex-dev/workpool": "^0.4.7",
"@hookform/resolvers": "^5.4.0",
"@openrouter/ai-sdk-provider": "^2.9.0",
"@sparticuz/chromium-min": "^149.0.0",
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index 73e0351..917558f 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -11,6 +11,12 @@ importers:
'@convex-dev/better-auth':
specifier: ^0.12.2
version: 0.12.2(@standard-schema/spec@1.1.0)(better-auth@1.6.14(@opentelemetry/api@1.9.1)(next@16.2.7(@babel/core@7.29.7)(@opentelemetry/api@1.9.1)(react-dom@19.2.4(react@19.2.4))(react@19.2.4))(react-dom@19.2.4(react@19.2.4))(react@19.2.4))(convex@1.40.0(react@19.2.4))(hono@4.12.23)(react@19.2.4)(typescript@5.9.3)
+ '@convex-dev/workflow':
+ specifier: ^0.4.4
+ version: 0.4.4(@convex-dev/workpool@0.4.7(convex-helpers@0.1.118(@standard-schema/spec@1.1.0)(convex@1.40.0(react@19.2.4))(hono@4.12.23)(react@19.2.4)(typescript@5.9.3)(zod@4.4.3))(convex@1.40.0(react@19.2.4)))(convex-helpers@0.1.118(@standard-schema/spec@1.1.0)(convex@1.40.0(react@19.2.4))(hono@4.12.23)(react@19.2.4)(typescript@5.9.3)(zod@4.4.3))(convex@1.40.0(react@19.2.4))
+ '@convex-dev/workpool':
+ specifier: ^0.4.7
+ version: 0.4.7(convex-helpers@0.1.118(@standard-schema/spec@1.1.0)(convex@1.40.0(react@19.2.4))(hono@4.12.23)(react@19.2.4)(typescript@5.9.3)(zod@4.4.3))(convex@1.40.0(react@19.2.4))
'@hookform/resolvers':
specifier: ^5.4.0
version: 5.4.0(react-hook-form@7.77.0(react@19.2.4))
@@ -337,6 +343,19 @@ packages:
convex: ^1.25.0
react: ^18.3.1 || ^19.0.0
+ '@convex-dev/workflow@0.4.4':
+ resolution: {integrity: sha512-ZQfVspAAxG4zZJEep2qaRtupw8OewwMezq6KNKaXKjo/gA+YffS9bXz13x+L/TSt9/Lb6gioae6Y9PDrqh7xQg==}
+ peerDependencies:
+ '@convex-dev/workpool': ^0.4.4
+ convex: ^1.36.1
+ convex-helpers: ^0.1.99
+
+ '@convex-dev/workpool@0.4.7':
+ resolution: {integrity: sha512-4O3VKcJXqYZ9icDgKdVPxjDGUAFK3oG0hbUwLcyYMYgsvVKlZDhvZRmczqSBZHLyrCGPpf925byh0dBigCfAGA==}
+ peerDependencies:
+ convex: ^1.31.7
+ convex-helpers: ^0.1.94
+
'@dotenvx/dotenvx@1.71.0':
resolution: {integrity: sha512-KEUw/mGu+EDRhYWRTNGHIimVCs9NvMFaIXOGrHSXoCteKLE5EsJnmPjOPpYorjXVg/0xG0fbdVw720azw1z4ag==}
hasBin: true
@@ -2071,6 +2090,9 @@ packages:
resolution: {integrity: sha512-6t10qk83GOG8p0vKmaCr8eiilZwO171AvbROMtvvNiwrTly62t+7XkA8RdIIVbpMhCASAsxgAzdRSwh6nw/5Dg==}
engines: {node: '>=4'}
+ async-channel@0.2.0:
+ resolution: {integrity: sha512-BJyjI/sfKlyijaBt2hbOSxT28xGNtLR0QLzAKO1Hlnv5BULY7sAoYoTPW3lfr1ZIC7y+FxabxO9T8GXpyoofGg==}
+
async-function@1.0.0:
resolution: {integrity: sha512-hsU18Ae8CDTR6Kgu9DYf0EbCr/a5iGL0rytQDobUcdpYOKokk8LEjVphnXkDkgpi0wYVsqrXuP0bZxJaTqdgoA==}
engines: {node: '>= 0.4'}
@@ -4638,6 +4660,18 @@ snapshots:
- hono
- typescript
+ '@convex-dev/workflow@0.4.4(@convex-dev/workpool@0.4.7(convex-helpers@0.1.118(@standard-schema/spec@1.1.0)(convex@1.40.0(react@19.2.4))(hono@4.12.23)(react@19.2.4)(typescript@5.9.3)(zod@4.4.3))(convex@1.40.0(react@19.2.4)))(convex-helpers@0.1.118(@standard-schema/spec@1.1.0)(convex@1.40.0(react@19.2.4))(hono@4.12.23)(react@19.2.4)(typescript@5.9.3)(zod@4.4.3))(convex@1.40.0(react@19.2.4))':
+ dependencies:
+ '@convex-dev/workpool': 0.4.7(convex-helpers@0.1.118(@standard-schema/spec@1.1.0)(convex@1.40.0(react@19.2.4))(hono@4.12.23)(react@19.2.4)(typescript@5.9.3)(zod@4.4.3))(convex@1.40.0(react@19.2.4))
+ async-channel: 0.2.0
+ convex: 1.40.0(react@19.2.4)
+ convex-helpers: 0.1.118(@standard-schema/spec@1.1.0)(convex@1.40.0(react@19.2.4))(hono@4.12.23)(react@19.2.4)(typescript@5.9.3)(zod@4.4.3)
+
+ '@convex-dev/workpool@0.4.7(convex-helpers@0.1.118(@standard-schema/spec@1.1.0)(convex@1.40.0(react@19.2.4))(hono@4.12.23)(react@19.2.4)(typescript@5.9.3)(zod@4.4.3))(convex@1.40.0(react@19.2.4))':
+ dependencies:
+ convex: 1.40.0(react@19.2.4)
+ convex-helpers: 0.1.118(@standard-schema/spec@1.1.0)(convex@1.40.0(react@19.2.4))(hono@4.12.23)(react@19.2.4)(typescript@5.9.3)(zod@4.4.3)
+
'@dotenvx/dotenvx@1.71.0':
dependencies:
commander: 11.1.0
@@ -6267,6 +6301,8 @@ snapshots:
dependencies:
tslib: 2.8.1
+ async-channel@0.2.0: {}
+
async-function@1.0.0: {}
available-typed-arrays@1.0.7:
diff --git a/tests/ai-schemas.test.ts b/tests/ai-schemas.test.ts
index 06d9e89..c9cad4b 100644
--- a/tests/ai-schemas.test.ts
+++ b/tests/ai-schemas.test.ts
@@ -139,16 +139,22 @@ test("structured output schemas avoid optional top-level fields for OpenAI stric
() =>
qualityReviewSchema.parse({
isValid: true,
+ severity: "ok",
issues: [],
suggestions: [],
+ rewriteRequired: false,
+ revisedCopy: null,
}),
/notes|invalid|required/i,
);
assert.equal(
qualityReviewSchema.parse({
isValid: true,
+ severity: "ok",
issues: [],
suggestions: [],
+ rewriteRequired: false,
+ revisedCopy: null,
notes: null,
}).notes,
null,
@@ -338,8 +344,11 @@ test("outreach schemas parse German customer-facing payloads", () => {
});
const qualityParsed = qualityReviewSchema.parse({
isValid: true,
+ severity: "ok",
issues: [],
suggestions: ["Mehr Kundennutzen konkret beschreiben."],
+ rewriteRequired: false,
+ revisedCopy: null,
notes: null,
});
@@ -350,6 +359,46 @@ test("outreach schemas parse German customer-facing payloads", () => {
assert.equal(Array.isArray(qualityParsed.suggestions), true);
});
+test("quality review schema accepts one-shot revised copy payloads", () => {
+ const parsed: QualityReview = qualityReviewSchema.parse({
+ isValid: false,
+ severity: "warning",
+ issues: ["Betreff klingt noch etwas generisch."],
+ suggestions: ["Betreff konkreter machen."],
+ rewriteRequired: true,
+ revisedCopy: {
+ publicSummary: "Mir ist aufgefallen, dass die mobile Seite etwas traege wirkt.",
+ publicBody:
+ "Mein Vorschlag waere, zuerst die sichtbaren Ladebremsen der Startseite zu pruefen.",
+ emailSubject: "Kurzer Hinweis zur mobilen Seite",
+ emailBody:
+ "Guten Tag, mir ist beim Blick auf Ihre Website aufgefallen, dass die mobile Seite etwas traege wirkt.",
+ phoneScript: {
+ openingLine: "Guten Tag, hier ist Matthias Meister.",
+ callScript: [
+ "Mir ist bei Ihrer mobilen Website ein konkreter Ladezeitpunkt aufgefallen.",
+ "Mein Vorschlag waere, diesen Punkt kurz zu priorisieren.",
+ ],
+ closeLine: "Soll ich Ihnen den Hinweis kurz per E-Mail senden?",
+ },
+ followUpDraft: {
+ message:
+ "Ich wollte kurz nachfassen, ob der Hinweis zur mobilen Seite fuer Sie relevant ist.",
+ followInDays: 7,
+ goals: ["kurze Rueckmeldung", "Interesse klaeren"],
+ },
+ },
+ notes: ["Ein Rewrite ist sinnvoll."],
+ });
+
+ assert.equal(parsed.rewriteRequired, true);
+ assert.equal(parsed.revisedCopy?.emailSubject, "Kurzer Hinweis zur mobilen Seite");
+ assert.deepEqual(parsed.revisedCopy?.followUpDraft.goals, [
+ "kurze Rueckmeldung",
+ "Interesse klaeren",
+ ]);
+});
+
test("schema-inferred types are exported for Convex action wiring", () => {
const typedFindings: InternalFindings = {
findings: [
@@ -393,8 +442,11 @@ test("schema-inferred types are exported for Convex action wiring", () => {
const typedQuality: QualityReview = {
isValid: true,
+ severity: "ok",
issues: [],
suggestions: [],
+ rewriteRequired: false,
+ revisedCopy: null,
notes: null,
};
diff --git a/tests/audit-generation-action-source.test.ts b/tests/audit-generation-action-source.test.ts
index 6282ea1..7635cd0 100644
--- a/tests/audit-generation-action-source.test.ts
+++ b/tests/audit-generation-action-source.test.ts
@@ -93,10 +93,10 @@ test("auditGenerationAction exports processAuditGeneration with runId validator"
assert.equal(
hasPattern(
actionSource,
- /processAuditGeneration\s*=\s*internalAction\(\s*{\s*args:\s*{\s*runId:\s*v\.id\(\s*["']agentRuns["']\s*\)\s*,?\s*}/,
+ /processAuditGeneration\s*=\s*internalAction\(\s*{\s*args:\s*{[\s\S]*?runId:\s*v\.id\(\s*["']agentRuns["']\s*\)[\s\S]*?rootRunId:\s*v\.optional\(v\.id\(\s*["']agentRuns["']\s*\)\)/,
),
true,
- "processAuditGeneration should validate runId: v.id(\"agentRuns\")",
+ "processAuditGeneration should validate runId and optional rootRunId as agentRuns IDs",
);
});
@@ -280,18 +280,38 @@ test("German copy prompt uses first-contact email tone guidelines without a new
);
});
-test("quality review blocks when model review or German copy guard fails", () => {
+test("quality review can rewrite copy once without making copy feedback a hard failure", () => {
const qualityPromptSource = extractFunctionSource("buildQualityReviewPrompt");
- assert.match(
+ assert.doesNotMatch(
actionSource,
/qualityPassed\s*=\s*qualityResult\.object\.isValid\s*&&\s*guardResult\.passed/,
- "qualityPassed should require both model review validity and German copy guard.",
+ "Copy quality feedback should not be a hard AND-gate with the deterministic German copy guard.",
);
assert.doesNotMatch(
actionSource,
/qualityPassed\s*=\s*guardResult\.passed\s*;/,
- "qualityPassed must not ignore the model quality review.",
+ "The deterministic German copy guard should not be the quality pass condition.",
+ );
+ assert.match(
+ actionSource,
+ /rewriteRequired[\s\S]*revisedCopy[\s\S]*applyRevisedCopy/,
+ "Quality review should be able to request one revised copy and apply it before persistence.",
+ );
+ assert.match(
+ actionSource,
+ /copyReviewAttempts\s*<\s*2/,
+ "Quality review should run at most the initial review plus one rewrite review.",
+ );
+ assert.match(
+ actionSource,
+ /message:\s*["']Copy-Review hat korrigiert\.["']/,
+ "A successful rewrite should be visible as a warning event.",
+ );
+ assert.match(
+ actionSource,
+ /message:\s*["']Copy-Review mit Hinweisen abgeschlossen\.["']/,
+ "Remaining copy feedback should be stored as warning telemetry.",
);
assert.match(
qualityPromptSource,
@@ -308,6 +328,11 @@ test("quality review blocks when model review or German copy guard fails", () =>
/verified findings|verifizierte Befunde/i,
"Quality review should keep concrete claims tied to verified findings.",
);
+ assert.match(
+ qualityPromptSource,
+ /revisedCopy|rewriteRequired/,
+ "Quality review prompt should ask for revised copy when rewrite is needed.",
+ );
});
test("action handles post-start failure paths in action-level catch", () => {
@@ -536,27 +561,21 @@ test("action handles missing screenshots with warning event fallback", () => {
);
});
-test("action runs german copy guard and blocks outreach-ready on validation failure", () => {
+test("action keeps German copy guard as telemetry without blocking outreach-ready", () => {
assert.equal(
hasPattern(actionSource, /validateCustomerFacingCopy/),
true,
- "Action should run German copy validation",
+ "Action should still run German copy validation for telemetry.",
);
- assert.equal(
- hasPattern(
- actionSource,
- /qualityPassed\s*=\s*qualityResult\.object\.isValid\s*&&\s*guardResult\.passed/,
- ),
- true,
- "Model QA and deterministic German copy guard failures should hard-block the audit run.",
+ assert.doesNotMatch(
+ actionSource,
+ /guardResult\.passed[\s\S]{0,500}finishAuditGenerationRun[\s\S]{0,250}status:\s*["']failed["']/,
+ "German copy guard findings should not finish the audit generation as failed.",
);
- assert.equal(
- hasPattern(
- actionSource,
- /qualityPassed\s*=\s*guardResult\.passed\s*;/,
- ),
- false,
- "Action must not ignore the model QA validity flag.",
+ assert.match(
+ actionSource,
+ /guardTelemetry|deterministicGuard/,
+ "German copy guard output should be persisted as telemetry in the quality payload.",
);
assert.equal(
hasPattern(actionSource, /internal\.leads\.reviewUpdateInternal/),
diff --git a/tests/audit-progress.test.ts b/tests/audit-progress.test.ts
new file mode 100644
index 0000000..0d9f2f8
--- /dev/null
+++ b/tests/audit-progress.test.ts
@@ -0,0 +1,41 @@
+import assert from "node:assert/strict";
+import test from "node:test";
+
+import {
+ AUDIT_PROGRESS_TOTAL_STEPS,
+ getAuditProgressForStep,
+} from "../lib/audits/progress";
+
+test("audit progress mapping exposes stable customer-facing progress steps", () => {
+ assert.equal(AUDIT_PROGRESS_TOTAL_STEPS, 6);
+
+ assert.deepEqual(getAuditProgressForStep("pagespeed_insights"), {
+ step: 2,
+ total: 6,
+ label: "Messe PageSpeed",
+ percent: 33,
+ });
+
+ assert.deepEqual(getAuditProgressForStep("qualityReview"), {
+ step: 6,
+ total: 6,
+ label: "Speichere Audit",
+ percent: 100,
+ });
+});
+
+test("audit progress mapping falls back safely for historical runs", () => {
+ assert.deepEqual(getAuditProgressForStep(undefined), {
+ step: 1,
+ total: 6,
+ label: "Audit vorbereitet",
+ percent: 17,
+ });
+
+ assert.deepEqual(getAuditProgressForStep("some_old_step"), {
+ step: 1,
+ total: 6,
+ label: "Audit vorbereitet",
+ percent: 17,
+ });
+});
diff --git a/tests/audit-workflow-source.test.ts b/tests/audit-workflow-source.test.ts
new file mode 100644
index 0000000..5789fcd
--- /dev/null
+++ b/tests/audit-workflow-source.test.ts
@@ -0,0 +1,92 @@
+import assert from "node:assert/strict";
+import { existsSync, readFileSync } from "node:fs";
+import path from "node:path";
+import test from "node:test";
+
+const source = (relativePath: string) => {
+ return readFileSync(path.join(process.cwd(), ...relativePath.split("/")), "utf8");
+};
+
+const fileExists = (relativePath: string) => {
+ return existsSync(path.join(process.cwd(), ...relativePath.split("/")));
+};
+
+test("Convex Workflow and Workpool dependencies and components are registered", () => {
+ const packageSource = source("package.json");
+ const configSource = source("convex/convex.config.ts");
+
+ assert.match(packageSource, /"@convex-dev\/workflow"/);
+ assert.match(packageSource, /"@convex-dev\/workpool"/);
+ assert.match(configSource, /from\s+["@']@convex-dev\/workflow\/convex\.config["@']/);
+ assert.match(configSource, /from\s+["@']@convex-dev\/workpool\/convex\.config["@']/);
+ assert.match(configSource, /app\.use\(workflow/);
+ assert.match(configSource, /app\.use\(auditWorkpool/);
+});
+
+test("audit workflow defines durable workflow manager with retrying workpool options", () => {
+ assert.equal(fileExists("convex/auditWorkflow.ts"), true);
+ const workflowSource = source("convex/auditWorkflow.ts");
+
+ assert.match(workflowSource, /WorkflowManager/);
+ assert.match(workflowSource, /components\.workflow/);
+ assert.match(workflowSource, /workpoolOptions/);
+ assert.match(workflowSource, /maxParallelism:\s*3/);
+ assert.match(workflowSource, /retryActionsByDefault:\s*true/);
+ assert.match(workflowSource, /maxAttempts:\s*3/);
+ assert.match(workflowSource, /initialBackoffMs:\s*1000/);
+ assert.match(workflowSource, /base:\s*2/);
+ assert.match(workflowSource, /step\.runAction/);
+ assert.match(workflowSource, /step\.runMutation/);
+ assert.match(workflowSource, /Promise\.all/);
+});
+
+test("requestLeadAudit creates a visible agentRun and starts the workflow", () => {
+ const pageSpeedSource = source("convex/pageSpeed.ts");
+
+ assert.match(pageSpeedSource, /internal\.auditWorkflow\.startLeadAuditWorkflow/);
+ assert.match(pageSpeedSource, /type:\s*"audit"/);
+ assert.match(pageSpeedSource, /progressLabel:\s*"Audit vorbereitet"/);
+ assert.match(pageSpeedSource, /workflowId/);
+});
+
+test("workflow PageSpeed start accepts root runs already marked running", () => {
+ const pageSpeedSource = source("convex/pageSpeed.ts");
+
+ assert.match(
+ pageSpeedSource,
+ /run\.status\s*!==\s*"pending"[\s\S]*run\.status\s*!==\s*"failed"[\s\S]*run\.status\s*!==\s*"running"/,
+ );
+});
+
+test("workflow failure progress stays on the failing step instead of jumping to quality review", () => {
+ const workflowSource = source("convex/auditWorkflow.ts");
+ const catchIndex = workflowSource.indexOf("} catch (error)");
+ assert.notEqual(catchIndex, -1, "Expected workflow catch block.");
+ const nextExportIndex = workflowSource.indexOf("export const startLeadAuditWorkflow", catchIndex);
+ assert.notEqual(nextExportIndex, -1, "Expected workflow catch block end.");
+ const catchSource = workflowSource.slice(catchIndex, nextExportIndex);
+
+ assert.doesNotMatch(catchSource, /progressPatch\(args\.runId,\s*"qualityReview"\)/);
+ assert.doesNotMatch(catchSource, /progressStep|progressTotal|progressLabel|progressPercent/);
+ assert.match(catchSource, /status:\s*"failed"/);
+});
+
+test("audit dashboard query includes root audit runs and exposes progress and retry fields", () => {
+ const auditsSource = source("convex/audits.ts");
+
+ assert.match(auditsSource, /\.eq\("type",\s*"audit"\)/);
+ assert.match(auditsSource, /kind:\s*"generation"/);
+ assert.match(auditsSource, /runType/);
+ assert.match(auditsSource, /progress:/);
+ assert.match(auditsSource, /retry:/);
+ assert.match(auditsSource, /canRetry/);
+});
+
+test("audit retry mutation restarts final failed or canceled runs through workflow", () => {
+ const auditsSource = source("convex/audits.ts");
+
+ assert.match(auditsSource, /export const retryAuditRun = mutation/);
+ assert.match(auditsSource, /requireOperator\(ctx\)/);
+ assert.match(auditsSource, /status !== "failed"[\s\S]*status !== "canceled"/);
+ assert.match(auditsSource, /internal\.auditWorkflow\.restartAuditWorkflow/);
+});
diff --git a/tests/audits-board-layout.test.ts b/tests/audits-board-layout.test.ts
index bc9d0fa..24cd973 100644
--- a/tests/audits-board-layout.test.ts
+++ b/tests/audits-board-layout.test.ts
@@ -32,7 +32,12 @@ test("AuditsBoard keeps audit detail links and non-clickable pipeline cards", as
assert.match(source, /row\.kind === "audit"/);
assert.match(source, /href=\{row\.detailHref\}/);
assert.match(source, /Öffnen/);
- assert.match(source, /Pipeline läuft/);
+ assert.match(source, /row\.progress/);
+ assert.match(source, /progressbar/);
+ assert.match(source, /row\.retry/);
+ assert.match(source, /Versuch/);
+ assert.match(source, /Erneut starten/);
+ assert.match(source, /retryAuditRun/);
assert.match(source, /getGenerationStatusLabel\(row\)/);
assert.match(source, /row\.errorSummary/);
});
diff --git a/tests/audits-dashboard-query-source.test.ts b/tests/audits-dashboard-query-source.test.ts
index 48e29f1..fc91bd2 100644
--- a/tests/audits-dashboard-query-source.test.ts
+++ b/tests/audits-dashboard-query-source.test.ts
@@ -122,3 +122,24 @@ test("audits dashboard query suppresses generation rows once a final audit exist
"Generation rows should surface run or stage errors.",
);
});
+
+test("audits dashboard query hides child generation rows behind root audit runs", async () => {
+ const auditsSource = await source("convex/audits.ts");
+ const querySource = extractExportSource(auditsSource, "listDashboardRows");
+
+ assert.match(
+ querySource,
+ /rootAuditRunLeadIds/,
+ "Query should track lead ids that already have root audit runs.",
+ );
+ assert.match(
+ querySource,
+ /run\.type\s*===\s*"audit_generation"[\s\S]*rootAuditRunLeadIds\.has\(run\.leadId\)/,
+ "Child audit_generation rows should be skipped when a root audit run for the same lead is already visible.",
+ );
+ assert.match(
+ querySource,
+ /continue/,
+ "Child generation rows should be skipped instead of rendered as a duplicate card.",
+ );
+});
diff --git a/tests/pagespeed-action-source.test.ts b/tests/pagespeed-action-source.test.ts
index c6fa8ec..70a683b 100644
--- a/tests/pagespeed-action-source.test.ts
+++ b/tests/pagespeed-action-source.test.ts
@@ -138,7 +138,7 @@ test("pageSpeedAction has action-level guard to fail whole run on unexpected err
assert.equal(
hasPattern(
actionSource,
- /try\s*{[\s\S]*?await ctx\.runMutation\(internal\.pageSpeed\.startPageSpeedAuditRun,\s*{[\s\S]*?}\);\s*[\s\S]*?for\s*\(\s*(?:const|let)\s+strategy\s+of\s+STRATEGIES[\s\S]*?\}\s*catch \(error\)\s*{[\s\S]*classifyPageSpeedFailure\(error,\s*apiKeyRaw\)[\s\S]*?internal\.pageSpeed\.finishPageSpeedAuditRun[\s\S]*status:\s*["']failed["']/,
+ /try\s*{[\s\S]*?Promise\.all\([\s\S]*?STRATEGIES\.map\(async \(strategy\)[\s\S]*?\}\s*catch \(error\)\s*{[\s\S]*classifyPageSpeedFailure\(error,\s*apiKeyRaw\)[\s\S]*?internal\.pageSpeed\.finishPageSpeedAuditRun[\s\S]*status:\s*["']failed["']/,
),
true,
"Action should wrap run lifecycle in an outer try/catch that finalizes the run as failed.",
@@ -182,7 +182,7 @@ test("pageSpeedAction enforces raw payload size guard before storage", () => {
assert.equal(
hasPattern(
actionSource,
- /if\s*\(\s*rawJsonBytes\s*>\s*MAX_RAW_PAGESPEED_BYTES[\s\S]*?}\s*[\s\S]*?continue;[\s\S]*?await ctx\.storage\.store\(/,
+ /if\s*\(\s*rawJsonBytes\s*>\s*MAX_RAW_PAGESPEED_BYTES[\s\S]*?return\s+["']failed["']\s+as\s+const;[\s\S]*?}\s*[\s\S]*?await ctx\.storage\.store\(/,
),
true,
"Raw payload storage must be skipped for oversized payloads.",
@@ -202,10 +202,10 @@ test("pageSpeedAction runs both strategies and catches per-strategy errors", ()
assert.equal(
hasPattern(
actionSource,
- /for\s*\(\s*(?:const|let)\s+strategy\s+of[\s\S]*?\)\s*{[\s\S]*?try[\s\S]*?catch\s*\([^)]*\)[\s\S]*?}/,
+ /Promise\.all\([\s\S]*?STRATEGIES\.map\(async \(strategy\)[\s\S]*?try[\s\S]*?catch\s*\([^)]*\)[\s\S]*?return\s+["']failed["']\s+as\s+const/,
),
true,
- "Action should catch errors inside per-strategy loop",
+ "Action should catch errors inside each parallel strategy task",
);
});
diff --git a/tests/pagespeed-persistence-source.test.ts b/tests/pagespeed-persistence-source.test.ts
index 8c35d7d..a1fb1c7 100644
--- a/tests/pagespeed-persistence-source.test.ts
+++ b/tests/pagespeed-persistence-source.test.ts
@@ -140,7 +140,7 @@ test("getLeadAuditStartStates exposes active audit run status for lead review bu
assert.match(source, /canStart/);
});
-test("queueLeadPageSpeedAudit dedupes per lead and schedules pagespeed action", () => {
+test("queueLeadPageSpeedAudit dedupes per lead and schedules audit workflow", () => {
const queueSource = pageSpeedSource;
assert.equal(
hasPattern(
@@ -169,10 +169,10 @@ test("queueLeadPageSpeedAudit dedupes per lead and schedules pagespeed action",
assert.equal(
hasPattern(
queueSource,
- /ctx\.scheduler\.runAfter\(\s*0,\s*internal\.pageSpeedAction\.processPageSpeedAudit,\s*\{[\s\S]*?runId/,
+ /ctx\.scheduler\.runAfter\(\s*0,\s*internal\.auditWorkflow\.startLeadAuditWorkflow,\s*\{[\s\S]*?runId/,
),
true,
- "queueLeadPageSpeedAudit must schedule internal.pageSpeedAction.processPageSpeedAudit with runAfter(0, ...).",
+ "queueLeadPageSpeedAudit must schedule internal.auditWorkflow.startLeadAuditWorkflow with runAfter(0, ...).",
);
assert.equal(
hasPattern(