From 70951789d22def4a26569d21457bcb23816d23d3 Mon Sep 17 00:00:00 2001 From: Matthias Meister Date: Fri, 5 Jun 2026 21:38:34 +0200 Subject: [PATCH] Add campaign scheduling lifecycle jobs --- components/campaigns/campaigns-board.tsx | 59 +++++++ convex/audits.ts | 33 ++++ convex/crons.ts | 19 +++ convex/scheduledJobs.ts | 199 +++++++++++++++++++++++ convex/schema.ts | 16 ++ tests/campaigns-board-layout.test.ts | 10 ++ tests/scheduled-jobs-source.test.ts | 48 ++++++ 7 files changed, 384 insertions(+) create mode 100644 convex/crons.ts create mode 100644 convex/scheduledJobs.ts create mode 100644 tests/scheduled-jobs-source.test.ts diff --git a/components/campaigns/campaigns-board.tsx b/components/campaigns/campaigns-board.tsx index fbad89b..f7f6d9d 100644 --- a/components/campaigns/campaigns-board.tsx +++ b/components/campaigns/campaigns-board.tsx @@ -16,7 +16,9 @@ import { Separator } from "@/components/ui/separator"; import { CampaignFormDialog } from "@/components/campaigns/campaign-form-dialog"; type CampaignsListResult = FunctionReturnType; +type CampaignRunsListResult = FunctionReturnType; type CampaignRow = NonNullable[number]; +type CampaignRunRow = NonNullable[number]; type RecurrenceLabel = Record; type CurrentRunStatusLabel = { @@ -40,6 +42,13 @@ const statusLabel: CurrentRunStatusLabel = { paused: "Pausiert", }; +const stepLabel: Record = { + campaign_cron_queued: "Cron geplant", + campaign_cron_skipped: "Cron übersprungen", + campaign_cron_stale_pending: "Timeout bereinigt", + lead_discovery: "Lead-Recherche", +}; + const dateFormatter = new Intl.DateTimeFormat("de-DE", { dateStyle: "short", timeStyle: "short", @@ -84,6 +93,10 @@ const formatNiche = (campaign: CampaignRow): string => { export function CampaignsBoard() { const campaigns = useQuery(api.campaigns.list, { limit: 100 }); + const recentCampaignRuns = useQuery(api.runs.list, { + limit: 8, + type: "campaign", + }); const createCampaign = useMutation(api.campaigns.create); const updateCampaign = useMutation(api.campaigns.update); const setStatus = useMutation(api.campaigns.setStatus); @@ -130,6 +143,10 @@ export function CampaignsBoard() { return [...campaigns].sort((a, b) => b.createdAt - a.createdAt); }, [campaigns]); + const visibleRuns = useMemo(() => { + return recentCampaignRuns ?? []; + }, [recentCampaignRuns]); + const closeDialog = () => { setEditingCampaign(null); setIsFormOpen(false); @@ -343,6 +360,48 @@ export function CampaignsBoard() { ))} )} + + + + Aktuelle Run-Logs + + Letzte Kampagnenläufe inklusive Cron-Skips und Fehlerhinweisen. + + + + {recentCampaignRuns === undefined ? ( + + ) : visibleRuns.length === 0 ? ( +

Noch keine Kampagnenläufe.

+ ) : ( + visibleRuns.map((run) => ( +
+
+

+ {statusLabel[run.status] ?? run.status} +

+

+ {formatDateTime(run.updatedAt)} +

+
+

+ {stepLabel[run.currentStep ?? ""] ?? run.currentStep ?? "Schritt offen"} +

+ {run.currentStep === "campaign_cron_skipped" ? ( +

+ Cron wurde übersprungen, weil bereits ein Agentenlauf aktiv war. +

+ ) : null} + {run.errorSummary ? ( +

+ {run.errorSummary} +

+ ) : null} +
+ )) + )} +
+
); } diff --git a/convex/audits.ts b/convex/audits.ts index 2c4a44f..c903080 100644 --- a/convex/audits.ts +++ b/convex/audits.ts @@ -4,6 +4,8 @@ import { normalizeListLimit } from "./domain"; import { internalMutation, mutation, query } from "./_generated/server"; import type { MutationCtx, QueryCtx } from "./_generated/server"; +export const AUDIT_REVIEW_NOTICE_AFTER_MS = 30 * 24 * 60 * 60 * 1000; + const auditStatus = v.union( v.literal("draft"), v.literal("approved"), @@ -352,6 +354,9 @@ export const publishPublicAudit = mutation({ await ctx.db.patch(args.id, { status: "published", publishedAt: now, + reviewDueAt: now + AUDIT_REVIEW_NOTICE_AFTER_MS, + lifecycleNotificationAt: undefined, + lifecycleExtendedUntil: undefined, deactivatedAt: undefined, updatedAt: now, }); @@ -373,6 +378,34 @@ export const reapprovePublicAudit = mutation({ await ctx.db.patch(args.id, { status: "published", publishedAt: now, + reviewDueAt: now + AUDIT_REVIEW_NOTICE_AFTER_MS, + lifecycleNotificationAt: undefined, + lifecycleExtendedUntil: undefined, + deactivatedAt: undefined, + updatedAt: now, + }); + + return { slug: audit.slug }; + }, +}); + +export const extendPublicAuditLifecycle = mutation({ + args: { + id: v.id("audits"), + days: v.number(), + }, + handler: async (ctx, args) => { + await requireOperator(ctx); + const audit = await ctx.db.get(args.id); + if (!audit) { + throw new Error("Audit wurde nicht gefunden."); + } + + const now = Date.now(); + await ctx.db.patch(args.id, { + status: "published", + lifecycleExtendedUntil: now + args.days * 24 * 60 * 60 * 1000, + reviewDueAt: now + args.days * 24 * 60 * 60 * 1000, deactivatedAt: undefined, updatedAt: now, }); diff --git a/convex/crons.ts b/convex/crons.ts new file mode 100644 index 0000000..7371150 --- /dev/null +++ b/convex/crons.ts @@ -0,0 +1,19 @@ +import { cronJobs } from "convex/server"; + +import { internal } from "./_generated/api"; + +const crons = cronJobs(); + +crons.interval( + "Kampagnen nach Cadence starten", + { hours: 1 }, + internal.scheduledJobs.runDueCampaigns, +); + +crons.interval( + "Audit-Lifecycle prüfen", + { hours: 24 }, + internal.scheduledJobs.runAuditLifecycle, +); + +export default crons; diff --git a/convex/scheduledJobs.ts b/convex/scheduledJobs.ts new file mode 100644 index 0000000..cb8132e --- /dev/null +++ b/convex/scheduledJobs.ts @@ -0,0 +1,199 @@ +import { internal } from "./_generated/api"; +import { internalMutation } from "./_generated/server"; +import { canStartAgentRun, isStalePendingAgentRun } from "../lib/lead-discovery-run"; + +export const AUDIT_REVIEW_NOTICE_AFTER_MS = 30 * 24 * 60 * 60 * 1000; +export const AUDIT_AUTO_DEACTIVATE_AFTER_MS = 60 * 24 * 60 * 60 * 1000; + +const RUN_COUNTERS_ZERO = { + leadsFound: 0, + leadsCreated: 0, + auditsCreated: 0, + outreachPrepared: 0, + errors: 0, +}; + +export const runDueCampaigns = internalMutation({ + args: {}, + handler: async (ctx) => { + const now = Date.now(); + const activeRuns = [ + ...(await ctx.db + .query("agentRuns") + .withIndex("by_status", (q) => q.eq("status", "pending")) + .take(20)), + ...(await ctx.db + .query("agentRuns") + .withIndex("by_status", (q) => q.eq("status", "running")) + .take(20)), + ]; + + for (const run of activeRuns.filter((run) => isStalePendingAgentRun(run, now))) { + await ctx.db.patch(run._id, { + status: "canceled", + currentStep: "campaign_cron_stale_pending", + errorSummary: "Ausstehender Lauf wurde nach Timeout automatisch abgebrochen.", + finishedAt: now, + updatedAt: now, + }); + await ctx.db.insert("agentRunEvents", { + runId: run._id, + level: "warning", + message: "Ausstehender Lauf wurde nach Timeout automatisch abgebrochen.", + createdAt: now, + }); + } + + if (!canStartAgentRun(activeRuns, now)) { + const skippedRunId = await ctx.db.insert("agentRuns", { + type: "campaign", + status: "canceled", + currentStep: "campaign_cron_skipped", + errorSummary: "Es läuft bereits ein Agentenlauf.", + counters: RUN_COUNTERS_ZERO, + createdAt: now, + updatedAt: now, + finishedAt: now, + }); + + await ctx.db.insert("agentRunEvents", { + runId: skippedRunId, + level: "warning", + message: "Es läuft bereits ein Agentenlauf. Kampagnen-Cron wurde übersprungen.", + createdAt: now, + }); + + return { started: 0, skipped: 1 }; + } + + const dueCampaigns = await ctx.db + .query("campaigns") + .withIndex("by_status_and_nextRunAt", (q) => + q.eq("status", "active").lte("nextRunAt", now), + ) + .take(1); + const campaign = dueCampaigns[0]; + + if (!campaign || campaign.recurrence === "manual") { + return { started: 0, skipped: 0 }; + } + + const runId = await ctx.db.insert("agentRuns", { + type: "campaign", + campaignId: campaign._id, + status: "pending", + currentStep: "campaign_cron_queued", + counters: RUN_COUNTERS_ZERO, + createdAt: now, + updatedAt: now, + }); + + await ctx.db.insert("agentRunEvents", { + runId, + level: "info", + message: "Kampagnenlauf wurde durch Cadence-Cron geplant.", + details: [{ label: "Kampagne", value: campaign.name }], + createdAt: now, + }); + + await ctx.scheduler.runAfter(0, internal.leadDiscovery.processCampaignRun, { + runId, + }); + + return { started: 1, skipped: 0 }; + }, +}); + +export const runAuditLifecycle = internalMutation({ + args: {}, + handler: async (ctx) => { + const now = Date.now(); + const runId = await ctx.db.insert("agentRuns", { + type: "lifecycle", + status: "running", + currentStep: "audit_lifecycle", + counters: RUN_COUNTERS_ZERO, + startedAt: now, + createdAt: now, + updatedAt: now, + }); + let notifications = 0; + let deactivated = 0; + + const publishedAudits = await ctx.db + .query("audits") + .withIndex("by_status", (q) => q.eq("status", "published")) + .take(100); + + for (const audit of publishedAudits) { + const publishedAt = audit.publishedAt ?? audit.updatedAt; + const extendedUntil = audit.lifecycleExtendedUntil ?? 0; + const isExtended = extendedUntil > now; + + if (!isExtended && now - publishedAt >= AUDIT_AUTO_DEACTIVATE_AFTER_MS) { + await ctx.db.patch(audit._id, { + status: "deactivated", + deactivatedAt: now, + updatedAt: now, + }); + await ctx.db.insert("dashboardNotifications", { + auditId: audit._id, + runId, + kind: "audit_auto_deactivated", + title: "Audit automatisch deaktiviert", + message: "Ein veröffentlichtes Audit war älter als 60 Tage und wurde deaktiviert.", + status: "unread", + createdAt: now, + updatedAt: now, + }); + deactivated += 1; + continue; + } + + if ( + !audit.lifecycleNotificationAt && + now - publishedAt >= AUDIT_REVIEW_NOTICE_AFTER_MS + ) { + await ctx.db.patch(audit._id, { + lifecycleNotificationAt: now, + reviewDueAt: now, + updatedAt: now, + }); + await ctx.db.insert("dashboardNotifications", { + auditId: audit._id, + runId, + kind: "audit_review_due", + title: "Audit-Aktivität prüfen", + message: "Soll dieses Audit aktiv bleiben? Es ist seit 30 Tagen veröffentlicht.", + status: "unread", + createdAt: now, + updatedAt: now, + }); + notifications += 1; + } + } + + await ctx.db.patch(runId, { + status: "succeeded", + finishedAt: now, + updatedAt: now, + counters: { + ...RUN_COUNTERS_ZERO, + auditsCreated: notifications, + errors: deactivated, + }, + }); + await ctx.db.insert("agentRunEvents", { + runId, + level: "info", + message: "Audit-Lifecycle geprüft.", + details: [ + { label: "Hinweise", value: String(notifications) }, + { label: "Deaktiviert", value: String(deactivated) }, + ], + createdAt: now, + }); + + return { notifications, deactivated }; + }, +}); diff --git a/convex/schema.ts b/convex/schema.ts index 3edcf9c..13921f6 100644 --- a/convex/schema.ts +++ b/convex/schema.ts @@ -307,6 +307,8 @@ export default defineSchema({ ctaType: v.optional(v.string()), publishedAt: v.optional(v.number()), reviewDueAt: v.optional(v.number()), + lifecycleNotificationAt: v.optional(v.number()), + lifecycleExtendedUntil: v.optional(v.number()), deactivatedAt: v.optional(v.number()), createdAt: v.number(), updatedAt: v.number(), @@ -592,4 +594,18 @@ export default defineSchema({ createdAt: v.number(), updatedAt: v.number(), }).index("by_key", ["key"]), + + dashboardNotifications: defineTable({ + auditId: v.optional(v.id("audits")), + runId: v.optional(v.id("agentRuns")), + kind: v.string(), + title: v.string(), + message: v.string(), + status: v.union(v.literal("unread"), v.literal("acknowledged")), + createdAt: v.number(), + updatedAt: v.number(), + }) + .index("by_status_and_createdAt", ["status", "createdAt"]) + .index("by_auditId", ["auditId"]) + .index("by_runId", ["runId"]), }); diff --git a/tests/campaigns-board-layout.test.ts b/tests/campaigns-board-layout.test.ts index c31a7f8..6a13591 100644 --- a/tests/campaigns-board-layout.test.ts +++ b/tests/campaigns-board-layout.test.ts @@ -28,3 +28,13 @@ test("campaign board renders campaigns as responsive cards", async () => { assert.match(source, /toggleCampaign\(campaign\)/); assert.match(source, /runCampaign\(campaign\)/); }); + +test("campaign board surfaces recent run logs", async () => { + const source = await readFile(campaignsBoardPath, "utf8"); + + assert.match(source, /api\.runs\.list/); + assert.match(source, /type:\s*"campaign"/); + assert.match(source, /Aktuelle Run-Logs/); + assert.match(source, /run\.errorSummary/); + assert.match(source, /campaign_cron_skipped/); +}); diff --git a/tests/scheduled-jobs-source.test.ts b/tests/scheduled-jobs-source.test.ts new file mode 100644 index 0000000..38428b3 --- /dev/null +++ b/tests/scheduled-jobs-source.test.ts @@ -0,0 +1,48 @@ +import assert from "node:assert/strict"; +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; +import test from "node:test"; + +const cronsPath = join(process.cwd(), "convex", "crons.ts"); +const jobsPath = join(process.cwd(), "convex", "scheduledJobs.ts"); +const auditsPath = join(process.cwd(), "convex", "audits.ts"); +const schemaPath = join(process.cwd(), "convex", "schema.ts"); + +const cronsSource = existsSync(cronsPath) ? readFileSync(cronsPath, "utf8") : ""; +const jobsSource = existsSync(jobsPath) ? readFileSync(jobsPath, "utf8") : ""; +const auditsSource = readFileSync(auditsPath, "utf8"); +const schemaSource = readFileSync(schemaPath, "utf8"); + +test("Convex crons register campaign cadence and audit lifecycle jobs", () => { + assert.equal(existsSync(cronsPath), true, "convex/crons.ts should exist."); + assert.match(cronsSource, /cronJobs\(\)/); + assert.match(cronsSource, /internal\.scheduledJobs\.runDueCampaigns/); + assert.match(cronsSource, /internal\.scheduledJobs\.runAuditLifecycle/); +}); + +test("scheduled jobs guard single active agent run and log skipped campaign cadence", () => { + assert.equal(existsSync(jobsPath), true, "convex/scheduledJobs.ts should exist."); + assert.match(jobsSource, /export const runDueCampaigns = internalMutation/); + assert.match(jobsSource, /canStartAgentRun/); + assert.match(jobsSource, /status:\s*"canceled"[\s\S]*currentStep:\s*"campaign_cron_skipped"/); + assert.match(jobsSource, /Es läuft bereits ein Agentenlauf/); + assert.match(jobsSource, /ctx\.scheduler\.runAfter\(0,\s*internal\.leadDiscovery\.processCampaignRun/); +}); + +test("audit lifecycle stores notifications and deactivates old published audits", () => { + assert.match(schemaSource, /dashboardNotifications:\s*defineTable/); + assert.match(schemaSource, /lifecycleNotificationAt:\s*v\.optional\(v\.number\(\)\)/); + assert.match(schemaSource, /lifecycleExtendedUntil:\s*v\.optional\(v\.number\(\)\)/); + assert.match(jobsSource, /export const runAuditLifecycle = internalMutation/); + assert.match(jobsSource, /AUDIT_REVIEW_NOTICE_AFTER_MS/); + assert.match(jobsSource, /AUDIT_AUTO_DEACTIVATE_AFTER_MS/); + assert.match(jobsSource, /status:\s*"deactivated"/); + assert.match(jobsSource, /Soll dieses Audit aktiv bleiben/); +}); + +test("publishing and lifecycle controls set review due dates and allow manual extension", () => { + assert.match(auditsSource, /reviewDueAt:\s*now\s*\+\s*AUDIT_REVIEW_NOTICE_AFTER_MS/); + assert.match(auditsSource, /export const extendPublicAuditLifecycle = mutation/); + assert.match(auditsSource, /lifecycleExtendedUntil:\s*now\s*\+\s*args\.days\s*\*\s*24\s*\*\s*60\s*\*\s*60\s*\*\s*1000/); + assert.match(auditsSource, /status:\s*"published"[\s\S]*deactivatedAt:\s*undefined/); +});