239 lines
6.5 KiB
TypeScript
239 lines
6.5 KiB
TypeScript
import { WorkflowManager, type WorkflowId } from "@convex-dev/workflow";
|
|
import { v } from "convex/values";
|
|
|
|
import { components, internal } from "./_generated/api";
|
|
import type { Id } from "./_generated/dataModel";
|
|
import { internalMutation } from "./_generated/server";
|
|
import { getAuditProgressForStep } from "../lib/audits/progress";
|
|
|
|
const MAX_ATTEMPTS = 3;
|
|
|
|
export const workflow = new WorkflowManager(components.workflow, {
|
|
workpoolOptions: {
|
|
maxParallelism: 3,
|
|
retryActionsByDefault: true,
|
|
defaultRetryBehavior: {
|
|
maxAttempts: 3,
|
|
initialBackoffMs: 1000,
|
|
base: 2,
|
|
},
|
|
},
|
|
});
|
|
|
|
function progressPatch(runId: Id<"agentRuns">, currentStep: string) {
|
|
const progress = getAuditProgressForStep(currentStep);
|
|
|
|
return {
|
|
id: runId,
|
|
currentStep,
|
|
progressStep: progress.step,
|
|
progressTotal: progress.total,
|
|
progressLabel: progress.label,
|
|
progressPercent: progress.percent,
|
|
};
|
|
}
|
|
|
|
function errorMessage(error: unknown) {
|
|
return error instanceof Error ? error.message : String(error);
|
|
}
|
|
|
|
export const runLeadAuditWorkflow = workflow
|
|
.define({
|
|
args: {
|
|
runId: v.id("agentRuns"),
|
|
},
|
|
})
|
|
.handler(async (step, args): Promise<Id<"agentRuns">> => {
|
|
try {
|
|
await step.runMutation(
|
|
internal.runs.updateProgressInternal,
|
|
{
|
|
...progressPatch(args.runId, "audit_prepared"),
|
|
status: "running",
|
|
maxAttempts: MAX_ATTEMPTS,
|
|
},
|
|
{ name: "1/6 Audit vorbereitet" },
|
|
);
|
|
|
|
const [, pageSpeedResult] = await Promise.all([
|
|
step.runMutation(
|
|
internal.runs.updateProgressInternal,
|
|
{
|
|
...progressPatch(args.runId, "pagespeed_insights"),
|
|
status: "running",
|
|
},
|
|
{ name: "2/6 Messe PageSpeed" },
|
|
),
|
|
step.runAction(
|
|
internal.pageSpeedAction.processPageSpeedAuditForWorkflow,
|
|
{ runId: args.runId },
|
|
{ name: "PageSpeed mobile/desktop", retry: true },
|
|
),
|
|
]);
|
|
|
|
if (!pageSpeedResult) {
|
|
throw new Error("PageSpeed-Analyse konnte nicht abgeschlossen werden.");
|
|
}
|
|
|
|
const auditRun = await step.runQuery(
|
|
internal.runs.getAuditRunForWorkflowInternal,
|
|
{ id: args.runId },
|
|
{ name: "Audit-Run laden" },
|
|
);
|
|
|
|
if (!auditRun?.leadId) {
|
|
throw new Error("Audit-Run hat keine Lead-ID.");
|
|
}
|
|
if (auditRun.status === "failed" || auditRun.status === "canceled") {
|
|
throw new Error("PageSpeed-Analyse ist final fehlgeschlagen.");
|
|
}
|
|
|
|
await step.runMutation(
|
|
internal.runs.updateProgressInternal,
|
|
{
|
|
...progressPatch(args.runId, "website_signals"),
|
|
status: "running",
|
|
},
|
|
{ name: "3/6 Sammle Website-Signale" },
|
|
);
|
|
|
|
const generationRunId = await step.runMutation(
|
|
internal.auditGeneration.queueLeadAuditGeneration,
|
|
{
|
|
leadId: auditRun.leadId,
|
|
...(auditRun.auditId ? { auditId: auditRun.auditId } : {}),
|
|
parentRunId: args.runId,
|
|
scheduleAction: false,
|
|
},
|
|
{ name: "Audit-Generierung vorbereiten" },
|
|
);
|
|
|
|
if (!generationRunId) {
|
|
throw new Error("Audit-Generierung konnte nicht angelegt werden.");
|
|
}
|
|
|
|
await step.runMutation(
|
|
internal.runs.updateProgressInternal,
|
|
{
|
|
...progressPatch(args.runId, "classification"),
|
|
status: "running",
|
|
},
|
|
{ name: "4/6 Bewerte Befunde" },
|
|
);
|
|
|
|
const generationResult = await step.runAction(
|
|
internal.auditGenerationAction.processAuditGenerationForWorkflow,
|
|
{ runId: generationRunId, rootRunId: args.runId },
|
|
{ name: "Specialists und German Copy", retry: true },
|
|
);
|
|
|
|
if (!generationResult) {
|
|
throw new Error("Audit-Generierung konnte nicht abgeschlossen werden.");
|
|
}
|
|
|
|
await step.runMutation(
|
|
internal.runs.updateProgressInternal,
|
|
{
|
|
...progressPatch(args.runId, "qualityReview"),
|
|
status: "succeeded",
|
|
},
|
|
{ name: "6/6 Speichere Audit" },
|
|
);
|
|
|
|
return args.runId;
|
|
} catch (error) {
|
|
const message = errorMessage(error);
|
|
await step.runMutation(
|
|
internal.runs.updateProgressInternal,
|
|
{
|
|
id: args.runId,
|
|
status: "failed",
|
|
errorSummary: "Audit konnte nach automatischen Versuchen nicht abgeschlossen werden.",
|
|
lastRetryReason: message,
|
|
},
|
|
{ name: "Audit final fehlgeschlagen", unstableArgs: true },
|
|
);
|
|
throw error;
|
|
}
|
|
});
|
|
|
|
export const startLeadAuditWorkflow = internalMutation({
|
|
args: {
|
|
runId: v.id("agentRuns"),
|
|
},
|
|
handler: async (ctx, args): Promise<WorkflowId> => {
|
|
const workflowId: WorkflowId = await workflow.start(
|
|
ctx,
|
|
internal.auditWorkflow.runLeadAuditWorkflow,
|
|
{ runId: args.runId },
|
|
{ startAsync: true },
|
|
);
|
|
|
|
await ctx.db.patch(args.runId, {
|
|
workflowId: String(workflowId),
|
|
attempt: 1,
|
|
maxAttempts: MAX_ATTEMPTS,
|
|
updatedAt: Date.now(),
|
|
});
|
|
|
|
return workflowId;
|
|
},
|
|
});
|
|
|
|
export const restartAuditWorkflow = internalMutation({
|
|
args: {
|
|
runId: v.id("agentRuns"),
|
|
},
|
|
handler: async (ctx, args): Promise<string> => {
|
|
const run = await ctx.db.get(args.runId);
|
|
if (!run || run.type !== "audit") {
|
|
throw new Error("Audit-Run wurde nicht gefunden.");
|
|
}
|
|
|
|
const nextAttempt = (run.attempt ?? 1) + 1;
|
|
const maxAttempts = run.maxAttempts ?? MAX_ATTEMPTS;
|
|
if (nextAttempt > maxAttempts) {
|
|
throw new Error("Maximale Anzahl an Audit-Versuchen erreicht.");
|
|
}
|
|
|
|
await ctx.db.patch(args.runId, {
|
|
status: "pending",
|
|
currentStep: "pagespeed_insights",
|
|
errorSummary: undefined,
|
|
lastRetryReason:
|
|
"Provider war kurz nicht erreichbar, ich versuche es erneut",
|
|
attempt: nextAttempt,
|
|
maxAttempts,
|
|
progressStep: 1,
|
|
progressTotal: 6,
|
|
progressLabel: "Audit vorbereitet",
|
|
progressPercent: 17,
|
|
startedAt: undefined,
|
|
finishedAt: undefined,
|
|
updatedAt: Date.now(),
|
|
});
|
|
|
|
if (run.workflowId) {
|
|
await workflow.restart(ctx, run.workflowId as WorkflowId, {
|
|
from: 0,
|
|
startAsync: true,
|
|
});
|
|
return run.workflowId;
|
|
}
|
|
|
|
const workflowId: WorkflowId = await workflow.start(
|
|
ctx,
|
|
internal.auditWorkflow.runLeadAuditWorkflow,
|
|
{ runId: args.runId },
|
|
{ startAsync: true },
|
|
);
|
|
|
|
await ctx.db.patch(args.runId, {
|
|
workflowId: String(workflowId),
|
|
updatedAt: Date.now(),
|
|
});
|
|
|
|
return String(workflowId);
|
|
},
|
|
});
|