355 lines
9.5 KiB
TypeScript
355 lines
9.5 KiB
TypeScript
import { v } from "convex/values";
|
|
import {
|
|
CAMPAIGN_COUNTRY_CODE,
|
|
CAMPAIGN_COUNTRY_NAME,
|
|
CAMPAIGN_RECURRENCES,
|
|
CAMPAIGN_STATUSES,
|
|
} from "../lib/campaign-form";
|
|
import {
|
|
calculateNextRunAt,
|
|
getCampaignCurrentRunStatus,
|
|
} from "../lib/campaign-scheduling";
|
|
import {
|
|
validateCampaignCreateInput,
|
|
validateCampaignUpdateInput,
|
|
} from "../lib/campaign-validation";
|
|
|
|
import { normalizeListLimit } from "./domain";
|
|
import { Doc } from "./_generated/dataModel";
|
|
import { mutation, query, QueryCtx } from "./_generated/server";
|
|
|
|
type CampaignDoc = Doc<"campaigns">;
|
|
|
|
type CampaignWithRunStatus = Omit<CampaignDoc, "lastRunAt"> & {
|
|
currentRunStatus: string;
|
|
lastRunAt: number | null;
|
|
};
|
|
|
|
const campaignStatus = v.union(
|
|
...CAMPAIGN_STATUSES.map((status) => v.literal(status)),
|
|
);
|
|
const campaignRecurrence = v.union(
|
|
...CAMPAIGN_RECURRENCES.map((recurrence) => v.literal(recurrence)),
|
|
);
|
|
const optionalNextRunAt = v.optional(v.union(v.number(), v.null()));
|
|
const limitArg = v.optional(v.number());
|
|
|
|
function normalizeNextRunAt(args: {
|
|
status: CampaignDoc["status"];
|
|
recurrence: CampaignDoc["recurrence"];
|
|
lastRunAt?: number | null;
|
|
now: number;
|
|
}): number | null {
|
|
return calculateNextRunAt({
|
|
status: args.status,
|
|
recurrence: args.recurrence,
|
|
lastRunAt: args.lastRunAt,
|
|
now: args.now,
|
|
});
|
|
}
|
|
|
|
async function enrichCampaignWithRunStatus(
|
|
ctx: QueryCtx,
|
|
campaign: CampaignDoc,
|
|
): Promise<CampaignWithRunStatus> {
|
|
const latestRun = await ctx.db
|
|
.query("agentRuns")
|
|
.withIndex("by_campaignId_and_updatedAt", (q) =>
|
|
q.eq("campaignId", campaign._id),
|
|
)
|
|
.order("desc")
|
|
.take(1);
|
|
|
|
const run = latestRun.at(0) ?? null;
|
|
|
|
return {
|
|
...campaign,
|
|
currentRunStatus: getCampaignCurrentRunStatus({
|
|
campaignStatus: campaign.status,
|
|
agentRuns: run ? [run] : [],
|
|
}),
|
|
lastRunAt: campaign.lastRunAt ?? run?.updatedAt ?? null,
|
|
};
|
|
}
|
|
|
|
export const create = mutation({
|
|
args: {
|
|
name: v.string(),
|
|
categoryMode: v.union(v.literal("preset"), v.literal("custom")),
|
|
category: v.string(),
|
|
customSearchTerm: v.optional(v.string()),
|
|
postalCode: v.string(),
|
|
region: v.optional(v.string()),
|
|
latitude: v.optional(v.number()),
|
|
longitude: v.optional(v.number()),
|
|
radiusKm: v.number(),
|
|
maxNewLeadsPerRun: v.number(),
|
|
maxAuditsPerRun: v.number(),
|
|
recurrence: campaignRecurrence,
|
|
status: v.optional(campaignStatus),
|
|
countryCode: v.optional(v.literal(CAMPAIGN_COUNTRY_CODE)),
|
|
country: v.optional(v.literal(CAMPAIGN_COUNTRY_NAME)),
|
|
nextRunAt: optionalNextRunAt,
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const now = Date.now();
|
|
const status = args.status ?? "paused";
|
|
|
|
const sanitized = validateCampaignCreateInput({
|
|
status,
|
|
recurrence: args.recurrence,
|
|
postalCode: args.postalCode,
|
|
radiusKm: args.radiusKm,
|
|
maxNewLeadsPerRun: args.maxNewLeadsPerRun,
|
|
maxAuditsPerRun: args.maxAuditsPerRun,
|
|
countryCode: args.countryCode,
|
|
country: args.country,
|
|
});
|
|
|
|
return await ctx.db.insert("campaigns", {
|
|
name: args.name,
|
|
categoryMode: args.categoryMode,
|
|
category: args.category,
|
|
customSearchTerm: args.customSearchTerm,
|
|
postalCode: args.postalCode,
|
|
region: args.region,
|
|
latitude: args.latitude,
|
|
longitude: args.longitude,
|
|
radiusKm: args.radiusKm,
|
|
maxNewLeadsPerRun: args.maxNewLeadsPerRun,
|
|
maxAuditsPerRun: args.maxAuditsPerRun,
|
|
recurrence: sanitized.recurrence,
|
|
status: sanitized.status,
|
|
countryCode: sanitized.countryCode,
|
|
country: sanitized.country,
|
|
nextRunAt:
|
|
args.nextRunAt === undefined
|
|
? normalizeNextRunAt({
|
|
status: sanitized.status,
|
|
recurrence: sanitized.recurrence,
|
|
now,
|
|
})
|
|
: args.nextRunAt,
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
},
|
|
});
|
|
|
|
export const update = mutation({
|
|
args: {
|
|
id: v.id("campaigns"),
|
|
name: v.optional(v.string()),
|
|
categoryMode: v.optional(v.union(v.literal("preset"), v.literal("custom"))),
|
|
category: v.optional(v.string()),
|
|
customSearchTerm: v.optional(v.string()),
|
|
postalCode: v.optional(v.string()),
|
|
region: v.optional(v.string()),
|
|
latitude: v.optional(v.number()),
|
|
longitude: v.optional(v.number()),
|
|
radiusKm: v.optional(v.number()),
|
|
maxNewLeadsPerRun: v.optional(v.number()),
|
|
maxAuditsPerRun: v.optional(v.number()),
|
|
recurrence: v.optional(campaignRecurrence),
|
|
status: v.optional(campaignStatus),
|
|
countryCode: v.optional(v.literal(CAMPAIGN_COUNTRY_CODE)),
|
|
country: v.optional(v.literal(CAMPAIGN_COUNTRY_NAME)),
|
|
nextRunAt: optionalNextRunAt,
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const now = Date.now();
|
|
const campaign = await ctx.db.get(args.id);
|
|
|
|
if (!campaign) {
|
|
throw new Error("Kampagne nicht gefunden.");
|
|
}
|
|
|
|
const sanitized = validateCampaignUpdateInput({
|
|
postalCode: args.postalCode,
|
|
radiusKm: args.radiusKm,
|
|
maxNewLeadsPerRun: args.maxNewLeadsPerRun,
|
|
maxAuditsPerRun: args.maxAuditsPerRun,
|
|
recurrence: args.recurrence,
|
|
status: args.status,
|
|
countryCode: args.countryCode,
|
|
country: args.country,
|
|
});
|
|
|
|
const patch: Record<string, unknown> = {
|
|
updatedAt: now,
|
|
countryCode: sanitized.countryCode,
|
|
country: sanitized.country,
|
|
};
|
|
|
|
if (args.name !== undefined) {
|
|
patch.name = args.name;
|
|
}
|
|
if (args.categoryMode !== undefined) {
|
|
patch.categoryMode = args.categoryMode;
|
|
}
|
|
if (args.category !== undefined) {
|
|
patch.category = args.category;
|
|
}
|
|
if (args.customSearchTerm !== undefined) {
|
|
patch.customSearchTerm = args.customSearchTerm;
|
|
}
|
|
if (args.postalCode !== undefined) {
|
|
patch.postalCode = args.postalCode;
|
|
}
|
|
if (args.region !== undefined) {
|
|
patch.region = args.region;
|
|
}
|
|
if (args.latitude !== undefined) {
|
|
patch.latitude = args.latitude;
|
|
}
|
|
if (args.longitude !== undefined) {
|
|
patch.longitude = args.longitude;
|
|
}
|
|
if (args.radiusKm !== undefined) {
|
|
patch.radiusKm = args.radiusKm;
|
|
}
|
|
if (args.maxNewLeadsPerRun !== undefined) {
|
|
patch.maxNewLeadsPerRun = args.maxNewLeadsPerRun;
|
|
}
|
|
if (args.maxAuditsPerRun !== undefined) {
|
|
patch.maxAuditsPerRun = args.maxAuditsPerRun;
|
|
}
|
|
if (args.recurrence !== undefined) {
|
|
patch.recurrence = sanitized.recurrence;
|
|
}
|
|
if (args.status !== undefined) {
|
|
patch.status = sanitized.status;
|
|
}
|
|
if (args.nextRunAt !== undefined) {
|
|
patch.nextRunAt = args.nextRunAt;
|
|
} else if (
|
|
(args.status !== undefined && args.status !== campaign.status)
|
|
|| (args.recurrence !== undefined && args.recurrence !== campaign.recurrence)
|
|
) {
|
|
const nextStatus = args.status ?? campaign.status;
|
|
const nextRecurrence = args.recurrence ?? campaign.recurrence;
|
|
|
|
patch.nextRunAt = normalizeNextRunAt({
|
|
status: nextStatus,
|
|
recurrence: nextRecurrence,
|
|
lastRunAt: campaign.lastRunAt,
|
|
now,
|
|
});
|
|
}
|
|
|
|
await ctx.db.patch(args.id, patch);
|
|
return args.id;
|
|
},
|
|
});
|
|
|
|
export const setStatus = mutation({
|
|
args: {
|
|
id: v.id("campaigns"),
|
|
status: campaignStatus,
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const now = Date.now();
|
|
const campaign = await ctx.db.get(args.id);
|
|
|
|
if (!campaign) {
|
|
throw new Error("Kampagne nicht gefunden.");
|
|
}
|
|
|
|
await ctx.db.patch(args.id, {
|
|
status: args.status,
|
|
nextRunAt:
|
|
args.status === "paused"
|
|
? null
|
|
: calculateNextRunAt({
|
|
recurrence: campaign.recurrence,
|
|
status: args.status,
|
|
lastRunAt: campaign.lastRunAt,
|
|
now,
|
|
}),
|
|
updatedAt: now,
|
|
});
|
|
return args.id;
|
|
},
|
|
});
|
|
|
|
export const requestRun = mutation({
|
|
args: {
|
|
id: v.id("campaigns"),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const now = Date.now();
|
|
const campaign = await ctx.db.get(args.id);
|
|
|
|
if (!campaign) {
|
|
throw new Error("Kampagne nicht gefunden.");
|
|
}
|
|
|
|
const runId = await ctx.db.insert("agentRuns", {
|
|
type: "campaign",
|
|
campaignId: args.id,
|
|
status: "pending",
|
|
counters: {
|
|
leadsFound: 0,
|
|
leadsCreated: 0,
|
|
auditsCreated: 0,
|
|
outreachPrepared: 0,
|
|
errors: 0,
|
|
},
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
|
|
const nextRunAt = calculateNextRunAt({
|
|
recurrence: campaign.recurrence,
|
|
status: campaign.status,
|
|
lastRunAt: now,
|
|
now,
|
|
});
|
|
|
|
await ctx.db.patch(args.id, {
|
|
lastRunAt: now,
|
|
nextRunAt,
|
|
updatedAt: now,
|
|
});
|
|
|
|
return runId;
|
|
},
|
|
});
|
|
|
|
export const get = query({
|
|
args: { id: v.id("campaigns") },
|
|
handler: async (ctx, args) => {
|
|
const campaign = await ctx.db.get(args.id);
|
|
|
|
if (!campaign) {
|
|
return null;
|
|
}
|
|
|
|
return await enrichCampaignWithRunStatus(ctx, campaign);
|
|
},
|
|
});
|
|
|
|
export const list = query({
|
|
args: {
|
|
status: v.optional(v.union(v.literal("active"), v.literal("paused"))),
|
|
limit: limitArg,
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const limit = normalizeListLimit(args.limit);
|
|
|
|
let campaigns;
|
|
if (args.status !== undefined) {
|
|
const status = args.status;
|
|
campaigns = await ctx.db
|
|
.query("campaigns")
|
|
.withIndex("by_status", (q) => q.eq("status", status))
|
|
.order("desc")
|
|
.take(limit);
|
|
} else {
|
|
campaigns = await ctx.db.query("campaigns").order("desc").take(limit);
|
|
}
|
|
|
|
return await Promise.all(campaigns.map((campaign) => enrichCampaignWithRunStatus(ctx, campaign)));
|
|
},
|
|
});
|