import { v } from "convex/values"; import { CAMPAIGN_COUNTRY_CODE, CAMPAIGN_COUNTRY_NAME, CAMPAIGN_RECURRENCES, CAMPAIGN_STATUSES, } from "../lib/campaign-form"; import { calculateNextRunAt, getCampaignCurrentRunStatus, } from "../lib/campaign-scheduling"; import { validateCampaignCreateInput, validateCampaignUpdateInput, } from "../lib/campaign-validation"; import { canStartAgentRun, isStalePendingAgentRun } from "../lib/lead-discovery-run"; import { normalizeListLimit } from "./domain"; import { internal } from "./_generated/api"; import { Doc } from "./_generated/dataModel"; import { mutation, query, QueryCtx } from "./_generated/server"; type CampaignDoc = Doc<"campaigns">; type CampaignWithRunStatus = Omit & { currentRunStatus: string; lastRunAt: number | null; }; const campaignStatus = v.union( ...CAMPAIGN_STATUSES.map((status) => v.literal(status)), ); const campaignRecurrence = v.union( ...CAMPAIGN_RECURRENCES.map((recurrence) => v.literal(recurrence)), ); const optionalNextRunAt = v.optional(v.union(v.number(), v.null())); const limitArg = v.optional(v.number()); function normalizeNextRunAt(args: { status: CampaignDoc["status"]; recurrence: CampaignDoc["recurrence"]; lastRunAt?: number | null; now: number; }): number | null { return calculateNextRunAt({ status: args.status, recurrence: args.recurrence, lastRunAt: args.lastRunAt, now: args.now, }); } async function enrichCampaignWithRunStatus( ctx: QueryCtx, campaign: CampaignDoc, ): Promise { const latestRun = await ctx.db .query("agentRuns") .withIndex("by_campaignId_and_updatedAt", (q) => q.eq("campaignId", campaign._id), ) .order("desc") .take(1); const run = latestRun.at(0) ?? null; return { ...campaign, currentRunStatus: getCampaignCurrentRunStatus({ campaignStatus: campaign.status, agentRuns: run ? [run] : [], }), lastRunAt: campaign.lastRunAt ?? run?.updatedAt ?? null, }; } export const create = mutation({ args: { name: v.string(), categoryMode: v.union(v.literal("preset"), v.literal("custom")), category: v.string(), customSearchTerm: v.optional(v.string()), postalCode: v.string(), region: v.optional(v.string()), latitude: v.optional(v.number()), longitude: v.optional(v.number()), radiusKm: v.number(), maxNewLeadsPerRun: v.number(), maxAuditsPerRun: v.number(), recurrence: campaignRecurrence, status: v.optional(campaignStatus), countryCode: v.optional(v.literal(CAMPAIGN_COUNTRY_CODE)), country: v.optional(v.literal(CAMPAIGN_COUNTRY_NAME)), nextRunAt: optionalNextRunAt, }, handler: async (ctx, args) => { const now = Date.now(); const status = args.status ?? "paused"; const sanitized = validateCampaignCreateInput({ status, recurrence: args.recurrence, postalCode: args.postalCode, radiusKm: args.radiusKm, maxNewLeadsPerRun: args.maxNewLeadsPerRun, maxAuditsPerRun: args.maxAuditsPerRun, countryCode: args.countryCode, country: args.country, }); return await ctx.db.insert("campaigns", { name: args.name, categoryMode: args.categoryMode, category: args.category, customSearchTerm: args.customSearchTerm, postalCode: args.postalCode, region: args.region, latitude: args.latitude, longitude: args.longitude, radiusKm: args.radiusKm, maxNewLeadsPerRun: args.maxNewLeadsPerRun, maxAuditsPerRun: args.maxAuditsPerRun, recurrence: sanitized.recurrence, status: sanitized.status, countryCode: sanitized.countryCode, country: sanitized.country, nextRunAt: args.nextRunAt === undefined ? normalizeNextRunAt({ status: sanitized.status, recurrence: sanitized.recurrence, now, }) : args.nextRunAt, createdAt: now, updatedAt: now, }); }, }); export const update = mutation({ args: { id: v.id("campaigns"), name: v.optional(v.string()), categoryMode: v.optional(v.union(v.literal("preset"), v.literal("custom"))), category: v.optional(v.string()), customSearchTerm: v.optional(v.string()), postalCode: v.optional(v.string()), region: v.optional(v.string()), latitude: v.optional(v.number()), longitude: v.optional(v.number()), radiusKm: v.optional(v.number()), maxNewLeadsPerRun: v.optional(v.number()), maxAuditsPerRun: v.optional(v.number()), recurrence: v.optional(campaignRecurrence), status: v.optional(campaignStatus), countryCode: v.optional(v.literal(CAMPAIGN_COUNTRY_CODE)), country: v.optional(v.literal(CAMPAIGN_COUNTRY_NAME)), nextRunAt: optionalNextRunAt, }, handler: async (ctx, args) => { const now = Date.now(); const campaign = await ctx.db.get(args.id); if (!campaign) { throw new Error("Kampagne nicht gefunden."); } const sanitized = validateCampaignUpdateInput({ postalCode: args.postalCode, radiusKm: args.radiusKm, maxNewLeadsPerRun: args.maxNewLeadsPerRun, maxAuditsPerRun: args.maxAuditsPerRun, recurrence: args.recurrence, status: args.status, countryCode: args.countryCode, country: args.country, }); const patch: Record = { updatedAt: now, countryCode: sanitized.countryCode, country: sanitized.country, }; if (args.name !== undefined) { patch.name = args.name; } if (args.categoryMode !== undefined) { patch.categoryMode = args.categoryMode; } if (args.category !== undefined) { patch.category = args.category; } if (args.customSearchTerm !== undefined) { patch.customSearchTerm = args.customSearchTerm; } if (args.postalCode !== undefined) { patch.postalCode = args.postalCode; } if (args.region !== undefined) { patch.region = args.region; } if (args.latitude !== undefined) { patch.latitude = args.latitude; } if (args.longitude !== undefined) { patch.longitude = args.longitude; } if (args.radiusKm !== undefined) { patch.radiusKm = args.radiusKm; } if (args.maxNewLeadsPerRun !== undefined) { patch.maxNewLeadsPerRun = args.maxNewLeadsPerRun; } if (args.maxAuditsPerRun !== undefined) { patch.maxAuditsPerRun = args.maxAuditsPerRun; } if (args.recurrence !== undefined) { patch.recurrence = sanitized.recurrence; } if (args.status !== undefined) { patch.status = sanitized.status; } if (args.nextRunAt !== undefined) { patch.nextRunAt = args.nextRunAt; } else if ( (args.status !== undefined && args.status !== campaign.status) || (args.recurrence !== undefined && args.recurrence !== campaign.recurrence) ) { const nextStatus = args.status ?? campaign.status; const nextRecurrence = args.recurrence ?? campaign.recurrence; patch.nextRunAt = normalizeNextRunAt({ status: nextStatus, recurrence: nextRecurrence, lastRunAt: campaign.lastRunAt, now, }); } await ctx.db.patch(args.id, patch); return args.id; }, }); export const setStatus = mutation({ args: { id: v.id("campaigns"), status: campaignStatus, }, handler: async (ctx, args) => { const now = Date.now(); const campaign = await ctx.db.get(args.id); if (!campaign) { throw new Error("Kampagne nicht gefunden."); } await ctx.db.patch(args.id, { status: args.status, nextRunAt: args.status === "paused" ? null : calculateNextRunAt({ recurrence: campaign.recurrence, status: args.status, lastRunAt: campaign.lastRunAt, now, }), updatedAt: now, }); return args.id; }, }); export const requestRun = mutation({ args: { id: v.id("campaigns"), }, handler: async (ctx, args) => { const now = Date.now(); const campaign = await ctx.db.get(args.id); if (!campaign) { throw new Error("Kampagne nicht gefunden."); } const possiblyActiveRuns = [ ...(await ctx.db .query("agentRuns") .withIndex("by_status", (q) => q.eq("status", "pending")) .take(20)), ...(await ctx.db .query("agentRuns") .withIndex("by_status", (q) => q.eq("status", "running")) .take(1)), ]; const stalePendingRuns = possiblyActiveRuns.filter((run) => isStalePendingAgentRun(run, now), ); for (const staleRun of stalePendingRuns) { await ctx.db.patch(staleRun._id, { status: "canceled", currentStep: "lead_discovery", errorSummary: "Ausstehender Lauf wurde nach Timeout automatisch abgebrochen.", finishedAt: now, updatedAt: now, }); await ctx.db.insert("agentRunEvents", { runId: staleRun._id, level: "warning", message: "Ausstehender Lauf wurde nach Timeout automatisch abgebrochen.", details: [{ label: "Alter Status", value: "pending" }], createdAt: now, }); } if (!canStartAgentRun(possiblyActiveRuns, now)) { throw new Error("Es läuft bereits ein Agentenlauf."); } const runId = await ctx.db.insert("agentRuns", { type: "campaign", campaignId: args.id, status: "pending", counters: { leadsFound: 0, leadsCreated: 0, auditsCreated: 0, outreachPrepared: 0, errors: 0, }, createdAt: now, updatedAt: now, }); await ctx.scheduler.runAfter(0, internal.leadDiscovery.processCampaignRun, { runId, }); return runId; }, }); export const get = query({ args: { id: v.id("campaigns") }, handler: async (ctx, args) => { const campaign = await ctx.db.get(args.id); if (!campaign) { return null; } return await enrichCampaignWithRunStatus(ctx, campaign); }, }); export const list = query({ args: { status: v.optional(v.union(v.literal("active"), v.literal("paused"))), limit: limitArg, }, handler: async (ctx, args) => { const limit = normalizeListLimit(args.limit); let campaigns; if (args.status !== undefined) { const status = args.status; campaigns = await ctx.db .query("campaigns") .withIndex("by_status", (q) => q.eq("status", status)) .order("desc") .take(limit); } else { campaigns = await ctx.db.query("campaigns").order("desc").take(limit); } return await Promise.all(campaigns.map((campaign) => enrichCampaignWithRunStatus(ctx, campaign))); }, });