import { v } from "convex/values"; import { RUN_EVENT_LEVELS, RUN_STATUSES, RUN_TYPES, normalizeListLimit, } from "./domain"; import type { Id } from "./_generated/dataModel"; import { internalMutation, internalQuery, mutation, query } from "./_generated/server"; import type { MutationCtx, QueryCtx } from "./_generated/server"; const runType = v.union(...RUN_TYPES.map((type) => v.literal(type))); const runStatus = v.union(...RUN_STATUSES.map((status) => v.literal(status))); const eventLevel = v.union( ...RUN_EVENT_LEVELS.map((level) => v.literal(level)), ); const appendEventArgs = { runId: v.id("agentRuns"), level: eventLevel, message: v.string(), details: v.optional( v.array( v.object({ label: v.string(), value: v.string(), source: v.optional(v.string()), }), ), ), }; type AppendEventArgs = { runId: Id<"agentRuns">; level: (typeof RUN_EVENT_LEVELS)[number]; message: string; details?: { label: string; value: string; source?: string }[]; }; const requireOperator = async (ctx: MutationCtx | QueryCtx) => { const identity = await ctx.auth.getUserIdentity(); if (!identity) { throw new Error("Nicht autorisiert."); } }; async function appendRunEvent( ctx: MutationCtx, args: AppendEventArgs, ) { return await ctx.db.insert("agentRunEvents", { ...args, createdAt: Date.now(), }); } export const create = mutation({ args: { type: runType, campaignId: v.optional(v.id("campaigns")), leadId: v.optional(v.id("leads")), auditId: v.optional(v.id("audits")), status: v.optional(runStatus), currentStep: v.optional(v.string()), }, handler: async (ctx, args) => { await requireOperator(ctx); const now = Date.now(); return await ctx.db.insert("agentRuns", { ...args, status: args.status ?? "pending", counters: { leadsFound: 0, leadsCreated: 0, auditsCreated: 0, outreachPrepared: 0, errors: 0, }, createdAt: now, updatedAt: now, }); }, }); export const updateStatus = mutation({ args: { id: v.id("agentRuns"), status: runStatus, currentStep: v.optional(v.string()), errorSummary: v.optional(v.string()), }, handler: async (ctx, args) => { await requireOperator(ctx); const now = Date.now(); const patch: { status: typeof args.status; updatedAt: number; currentStep?: string; errorSummary?: string; startedAt?: number; finishedAt?: number; } = { status: args.status, updatedAt: now, }; if (args.currentStep !== undefined) { patch.currentStep = args.currentStep; } if (args.errorSummary !== undefined) { patch.errorSummary = args.errorSummary; } if (args.status === "running") { patch.startedAt = now; } if ( args.status === "succeeded" || args.status === "failed" || args.status === "canceled" ) { patch.finishedAt = now; } await ctx.db.patch(args.id, patch); return args.id; }, }); export const updateProgressInternal = internalMutation({ args: { id: v.id("agentRuns"), status: v.optional(runStatus), currentStep: v.optional(v.string()), errorSummary: v.optional(v.string()), workflowId: v.optional(v.string()), attempt: v.optional(v.number()), maxAttempts: v.optional(v.number()), progressStep: v.optional(v.number()), progressTotal: v.optional(v.number()), progressLabel: v.optional(v.string()), progressPercent: v.optional(v.number()), lastRetryReason: v.optional(v.string()), }, handler: async (ctx, args) => { const now = Date.now(); const patch: { status?: (typeof RUN_STATUSES)[number]; updatedAt: number; currentStep?: string; errorSummary?: string; workflowId?: string; attempt?: number; maxAttempts?: number; progressStep?: number; progressTotal?: number; progressLabel?: string; progressPercent?: number; lastRetryReason?: string; startedAt?: number; finishedAt?: number; } = { updatedAt: now, }; if (args.status !== undefined) { patch.status = args.status; if (args.status === "running") { patch.startedAt = now; patch.finishedAt = undefined; } if ( args.status === "succeeded" || args.status === "failed" || args.status === "canceled" ) { patch.finishedAt = now; } } if (args.currentStep !== undefined) { patch.currentStep = args.currentStep; } if (args.errorSummary !== undefined) { patch.errorSummary = args.errorSummary; } if (args.workflowId !== undefined) { patch.workflowId = args.workflowId; } if (args.attempt !== undefined) { patch.attempt = args.attempt; } if (args.maxAttempts !== undefined) { patch.maxAttempts = args.maxAttempts; } if (args.progressStep !== undefined) { patch.progressStep = args.progressStep; } if (args.progressTotal !== undefined) { patch.progressTotal = args.progressTotal; } if (args.progressLabel !== undefined) { patch.progressLabel = args.progressLabel; } if (args.progressPercent !== undefined) { patch.progressPercent = args.progressPercent; } if (args.lastRetryReason !== undefined) { patch.lastRetryReason = args.lastRetryReason; } await ctx.db.patch(args.id, patch); return args.id; }, }); export const getAuditRunForWorkflowInternal = internalQuery({ args: { id: v.id("agentRuns"), }, handler: async (ctx, args) => { const run = await ctx.db.get(args.id); if (!run || run.type !== "audit") { return null; } return { _id: run._id, leadId: run.leadId ?? null, auditId: run.auditId ?? null, status: run.status, currentStep: run.currentStep ?? null, }; }, }); export const list = query({ args: { status: v.optional(runStatus), type: v.optional(runType), limit: v.optional(v.number()), }, handler: async (ctx, args) => { await requireOperator(ctx); const limit = normalizeListLimit(args.limit); if (args.type && args.status) { const type = args.type; const status = args.status; return await ctx.db .query("agentRuns") .withIndex("by_type_and_status", (q) => q.eq("type", type).eq("status", status), ) .order("desc") .take(limit); } if (args.type) { const type = args.type; return await ctx.db .query("agentRuns") .withIndex("by_type", (q) => q.eq("type", type)) .order("desc") .take(limit); } if (args.status) { const status = args.status; return await ctx.db .query("agentRuns") .withIndex("by_status", (q) => q.eq("status", status)) .order("desc") .take(limit); } return await ctx.db.query("agentRuns").order("desc").take(limit); }, }); export const appendEvent = mutation({ args: appendEventArgs, handler: async (ctx, args) => { await requireOperator(ctx); return await appendRunEvent(ctx, args); }, }); export const appendEventInternal = internalMutation({ args: appendEventArgs, handler: async (ctx, args) => { return await appendRunEvent(ctx, args); }, }); export const listEvents = query({ args: { runId: v.id("agentRuns"), limit: v.optional(v.number()), }, handler: async (ctx, args) => { await requireOperator(ctx); const limit = normalizeListLimit(args.limit); return await ctx.db .query("agentRunEvents") .withIndex("by_runId_and_createdAt", (q) => q.eq("runId", args.runId)) .order("desc") .take(limit); }, });