Add campaign scheduling lifecycle jobs

This commit is contained in:
2026-06-05 21:38:34 +02:00
parent 3f148bcec2
commit 70951789d2
7 changed files with 384 additions and 0 deletions

View File

@@ -4,6 +4,8 @@ import { normalizeListLimit } from "./domain";
import { internalMutation, mutation, query } from "./_generated/server";
import type { MutationCtx, QueryCtx } from "./_generated/server";
export const AUDIT_REVIEW_NOTICE_AFTER_MS = 30 * 24 * 60 * 60 * 1000;
const auditStatus = v.union(
v.literal("draft"),
v.literal("approved"),
@@ -352,6 +354,9 @@ export const publishPublicAudit = mutation({
await ctx.db.patch(args.id, {
status: "published",
publishedAt: now,
reviewDueAt: now + AUDIT_REVIEW_NOTICE_AFTER_MS,
lifecycleNotificationAt: undefined,
lifecycleExtendedUntil: undefined,
deactivatedAt: undefined,
updatedAt: now,
});
@@ -373,6 +378,34 @@ export const reapprovePublicAudit = mutation({
await ctx.db.patch(args.id, {
status: "published",
publishedAt: now,
reviewDueAt: now + AUDIT_REVIEW_NOTICE_AFTER_MS,
lifecycleNotificationAt: undefined,
lifecycleExtendedUntil: undefined,
deactivatedAt: undefined,
updatedAt: now,
});
return { slug: audit.slug };
},
});
export const extendPublicAuditLifecycle = mutation({
args: {
id: v.id("audits"),
days: v.number(),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const audit = await ctx.db.get(args.id);
if (!audit) {
throw new Error("Audit wurde nicht gefunden.");
}
const now = Date.now();
await ctx.db.patch(args.id, {
status: "published",
lifecycleExtendedUntil: now + args.days * 24 * 60 * 60 * 1000,
reviewDueAt: now + args.days * 24 * 60 * 60 * 1000,
deactivatedAt: undefined,
updatedAt: now,
});

19
convex/crons.ts Normal file
View File

@@ -0,0 +1,19 @@
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
crons.interval(
"Kampagnen nach Cadence starten",
{ hours: 1 },
internal.scheduledJobs.runDueCampaigns,
);
crons.interval(
"Audit-Lifecycle prüfen",
{ hours: 24 },
internal.scheduledJobs.runAuditLifecycle,
);
export default crons;

199
convex/scheduledJobs.ts Normal file
View File

@@ -0,0 +1,199 @@
import { internal } from "./_generated/api";
import { internalMutation } from "./_generated/server";
import { canStartAgentRun, isStalePendingAgentRun } from "../lib/lead-discovery-run";
export const AUDIT_REVIEW_NOTICE_AFTER_MS = 30 * 24 * 60 * 60 * 1000;
export const AUDIT_AUTO_DEACTIVATE_AFTER_MS = 60 * 24 * 60 * 60 * 1000;
const RUN_COUNTERS_ZERO = {
leadsFound: 0,
leadsCreated: 0,
auditsCreated: 0,
outreachPrepared: 0,
errors: 0,
};
export const runDueCampaigns = internalMutation({
args: {},
handler: async (ctx) => {
const now = Date.now();
const activeRuns = [
...(await ctx.db
.query("agentRuns")
.withIndex("by_status", (q) => q.eq("status", "pending"))
.take(20)),
...(await ctx.db
.query("agentRuns")
.withIndex("by_status", (q) => q.eq("status", "running"))
.take(20)),
];
for (const run of activeRuns.filter((run) => isStalePendingAgentRun(run, now))) {
await ctx.db.patch(run._id, {
status: "canceled",
currentStep: "campaign_cron_stale_pending",
errorSummary: "Ausstehender Lauf wurde nach Timeout automatisch abgebrochen.",
finishedAt: now,
updatedAt: now,
});
await ctx.db.insert("agentRunEvents", {
runId: run._id,
level: "warning",
message: "Ausstehender Lauf wurde nach Timeout automatisch abgebrochen.",
createdAt: now,
});
}
if (!canStartAgentRun(activeRuns, now)) {
const skippedRunId = await ctx.db.insert("agentRuns", {
type: "campaign",
status: "canceled",
currentStep: "campaign_cron_skipped",
errorSummary: "Es läuft bereits ein Agentenlauf.",
counters: RUN_COUNTERS_ZERO,
createdAt: now,
updatedAt: now,
finishedAt: now,
});
await ctx.db.insert("agentRunEvents", {
runId: skippedRunId,
level: "warning",
message: "Es läuft bereits ein Agentenlauf. Kampagnen-Cron wurde übersprungen.",
createdAt: now,
});
return { started: 0, skipped: 1 };
}
const dueCampaigns = await ctx.db
.query("campaigns")
.withIndex("by_status_and_nextRunAt", (q) =>
q.eq("status", "active").lte("nextRunAt", now),
)
.take(1);
const campaign = dueCampaigns[0];
if (!campaign || campaign.recurrence === "manual") {
return { started: 0, skipped: 0 };
}
const runId = await ctx.db.insert("agentRuns", {
type: "campaign",
campaignId: campaign._id,
status: "pending",
currentStep: "campaign_cron_queued",
counters: RUN_COUNTERS_ZERO,
createdAt: now,
updatedAt: now,
});
await ctx.db.insert("agentRunEvents", {
runId,
level: "info",
message: "Kampagnenlauf wurde durch Cadence-Cron geplant.",
details: [{ label: "Kampagne", value: campaign.name }],
createdAt: now,
});
await ctx.scheduler.runAfter(0, internal.leadDiscovery.processCampaignRun, {
runId,
});
return { started: 1, skipped: 0 };
},
});
export const runAuditLifecycle = internalMutation({
args: {},
handler: async (ctx) => {
const now = Date.now();
const runId = await ctx.db.insert("agentRuns", {
type: "lifecycle",
status: "running",
currentStep: "audit_lifecycle",
counters: RUN_COUNTERS_ZERO,
startedAt: now,
createdAt: now,
updatedAt: now,
});
let notifications = 0;
let deactivated = 0;
const publishedAudits = await ctx.db
.query("audits")
.withIndex("by_status", (q) => q.eq("status", "published"))
.take(100);
for (const audit of publishedAudits) {
const publishedAt = audit.publishedAt ?? audit.updatedAt;
const extendedUntil = audit.lifecycleExtendedUntil ?? 0;
const isExtended = extendedUntil > now;
if (!isExtended && now - publishedAt >= AUDIT_AUTO_DEACTIVATE_AFTER_MS) {
await ctx.db.patch(audit._id, {
status: "deactivated",
deactivatedAt: now,
updatedAt: now,
});
await ctx.db.insert("dashboardNotifications", {
auditId: audit._id,
runId,
kind: "audit_auto_deactivated",
title: "Audit automatisch deaktiviert",
message: "Ein veröffentlichtes Audit war älter als 60 Tage und wurde deaktiviert.",
status: "unread",
createdAt: now,
updatedAt: now,
});
deactivated += 1;
continue;
}
if (
!audit.lifecycleNotificationAt &&
now - publishedAt >= AUDIT_REVIEW_NOTICE_AFTER_MS
) {
await ctx.db.patch(audit._id, {
lifecycleNotificationAt: now,
reviewDueAt: now,
updatedAt: now,
});
await ctx.db.insert("dashboardNotifications", {
auditId: audit._id,
runId,
kind: "audit_review_due",
title: "Audit-Aktivität prüfen",
message: "Soll dieses Audit aktiv bleiben? Es ist seit 30 Tagen veröffentlicht.",
status: "unread",
createdAt: now,
updatedAt: now,
});
notifications += 1;
}
}
await ctx.db.patch(runId, {
status: "succeeded",
finishedAt: now,
updatedAt: now,
counters: {
...RUN_COUNTERS_ZERO,
auditsCreated: notifications,
errors: deactivated,
},
});
await ctx.db.insert("agentRunEvents", {
runId,
level: "info",
message: "Audit-Lifecycle geprüft.",
details: [
{ label: "Hinweise", value: String(notifications) },
{ label: "Deaktiviert", value: String(deactivated) },
],
createdAt: now,
});
return { notifications, deactivated };
},
});

View File

@@ -307,6 +307,8 @@ export default defineSchema({
ctaType: v.optional(v.string()),
publishedAt: v.optional(v.number()),
reviewDueAt: v.optional(v.number()),
lifecycleNotificationAt: v.optional(v.number()),
lifecycleExtendedUntil: v.optional(v.number()),
deactivatedAt: v.optional(v.number()),
createdAt: v.number(),
updatedAt: v.number(),
@@ -592,4 +594,18 @@ export default defineSchema({
createdAt: v.number(),
updatedAt: v.number(),
}).index("by_key", ["key"]),
dashboardNotifications: defineTable({
auditId: v.optional(v.id("audits")),
runId: v.optional(v.id("agentRuns")),
kind: v.string(),
title: v.string(),
message: v.string(),
status: v.union(v.literal("unread"), v.literal("acknowledged")),
createdAt: v.number(),
updatedAt: v.number(),
})
.index("by_status_and_createdAt", ["status", "createdAt"])
.index("by_auditId", ["auditId"])
.index("by_runId", ["runId"]),
});