import { WorkflowManager, type WorkflowId } from "@convex-dev/workflow"; import { v } from "convex/values"; import { components, internal } from "./_generated/api"; import type { Id } from "./_generated/dataModel"; import { internalMutation } from "./_generated/server"; import { getAuditProgressForStep } from "../lib/audits/progress"; const MAX_ATTEMPTS = 3; export const workflow = new WorkflowManager(components.workflow, { workpoolOptions: { maxParallelism: 3, retryActionsByDefault: true, defaultRetryBehavior: { maxAttempts: 3, initialBackoffMs: 1000, base: 2, }, }, }); function progressPatch(runId: Id<"agentRuns">, currentStep: string) { const progress = getAuditProgressForStep(currentStep); return { id: runId, currentStep, progressStep: progress.step, progressTotal: progress.total, progressLabel: progress.label, progressPercent: progress.percent, }; } function errorMessage(error: unknown) { return error instanceof Error ? error.message : String(error); } export const runLeadAuditWorkflow = workflow .define({ args: { runId: v.id("agentRuns"), }, }) .handler(async (step, args): Promise> => { try { await step.runMutation( internal.runs.updateProgressInternal, { ...progressPatch(args.runId, "audit_prepared"), status: "running", maxAttempts: MAX_ATTEMPTS, }, { name: "1/6 Audit vorbereitet" }, ); const [, pageSpeedResult] = await Promise.all([ step.runMutation( internal.runs.updateProgressInternal, { ...progressPatch(args.runId, "pagespeed_insights"), status: "running", }, { name: "2/6 Messe PageSpeed" }, ), step.runAction( internal.pageSpeedAction.processPageSpeedAuditForWorkflow, { runId: args.runId }, { name: "PageSpeed mobile/desktop", retry: true }, ), ]); if (!pageSpeedResult) { throw new Error("PageSpeed-Analyse konnte nicht abgeschlossen werden."); } const auditRun = await step.runQuery( internal.runs.getAuditRunForWorkflowInternal, { id: args.runId }, { name: "Audit-Run laden" }, ); if (!auditRun?.leadId) { throw new Error("Audit-Run hat keine Lead-ID."); } if (auditRun.status === "failed" || auditRun.status === "canceled") { throw new Error("PageSpeed-Analyse ist final fehlgeschlagen."); } await step.runMutation( internal.runs.updateProgressInternal, { ...progressPatch(args.runId, "website_signals"), status: "running", }, { name: "3/6 Sammle Website-Signale" }, ); const generationRunId = await step.runMutation( internal.auditGeneration.queueLeadAuditGeneration, { leadId: auditRun.leadId, ...(auditRun.auditId ? { auditId: auditRun.auditId } : {}), parentRunId: args.runId, scheduleAction: false, }, { name: "Audit-Generierung vorbereiten" }, ); if (!generationRunId) { throw new Error("Audit-Generierung konnte nicht angelegt werden."); } await step.runMutation( internal.runs.updateProgressInternal, { ...progressPatch(args.runId, "classification"), status: "running", }, { name: "4/6 Bewerte Befunde" }, ); const generationResult = await step.runAction( internal.auditGenerationAction.processAuditGenerationForWorkflow, { runId: generationRunId, rootRunId: args.runId }, { name: "Specialists und German Copy", retry: true }, ); if (!generationResult) { throw new Error("Audit-Generierung konnte nicht abgeschlossen werden."); } await step.runMutation( internal.runs.updateProgressInternal, { ...progressPatch(args.runId, "qualityReview"), status: "succeeded", }, { name: "6/6 Speichere Audit" }, ); return args.runId; } catch (error) { const message = errorMessage(error); await step.runMutation( internal.runs.updateProgressInternal, { id: args.runId, status: "failed", errorSummary: "Audit konnte nach automatischen Versuchen nicht abgeschlossen werden.", lastRetryReason: message, }, { name: "Audit final fehlgeschlagen", unstableArgs: true }, ); throw error; } }); export const startLeadAuditWorkflow = internalMutation({ args: { runId: v.id("agentRuns"), }, handler: async (ctx, args): Promise => { const workflowId: WorkflowId = await workflow.start( ctx, internal.auditWorkflow.runLeadAuditWorkflow, { runId: args.runId }, { startAsync: true }, ); await ctx.db.patch(args.runId, { workflowId: String(workflowId), attempt: 1, maxAttempts: MAX_ATTEMPTS, updatedAt: Date.now(), }); return workflowId; }, }); export const restartAuditWorkflow = internalMutation({ args: { runId: v.id("agentRuns"), }, handler: async (ctx, args): Promise => { const run = await ctx.db.get(args.runId); if (!run || run.type !== "audit") { throw new Error("Audit-Run wurde nicht gefunden."); } const nextAttempt = (run.attempt ?? 1) + 1; const maxAttempts = run.maxAttempts ?? MAX_ATTEMPTS; if (nextAttempt > maxAttempts) { throw new Error("Maximale Anzahl an Audit-Versuchen erreicht."); } await ctx.db.patch(args.runId, { status: "pending", currentStep: "pagespeed_insights", errorSummary: undefined, lastRetryReason: "Provider war kurz nicht erreichbar, ich versuche es erneut", attempt: nextAttempt, maxAttempts, progressStep: 1, progressTotal: 6, progressLabel: "Audit vorbereitet", progressPercent: 17, startedAt: undefined, finishedAt: undefined, updatedAt: Date.now(), }); if (run.workflowId) { await workflow.restart(ctx, run.workflowId as WorkflowId, { from: 0, startAsync: true, }); return run.workflowId; } const workflowId: WorkflowId = await workflow.start( ctx, internal.auditWorkflow.runLeadAuditWorkflow, { runId: args.runId }, { startAsync: true }, ); await ctx.db.patch(args.runId, { workflowId: String(workflowId), updatedAt: Date.now(), }); return String(workflowId); }, });