/** * convex/campaigns.ts * * Orchestrierung eines Kampagnenlaufs: Places-Suche → Dedup → Lead-Anlage. * * Aufbau (Convex-Pattern): * - Mutations/Queries: deterministisch, DB-Zugriff, KEIN fetch. * - Actions: dürfen fetch (Google Places) + Secrets entschlüsseln, aber * greifen auf die DB nur über runQuery/runMutation zu. * * Mandanten-Invariante: jeder DB-Zugriff ist orgId-gescoped. */ import { v } from "convex/values"; import { mutation, internalAction, internalMutation, internalQuery, } from "./_generated/server"; import { internal } from "./_generated/api"; import type { Id } from "./_generated/dataModel"; import { searchPlacesPage, leadLookupCostUsd } from "./lib/googlePlaces"; import { decryptSecret } from "./lib/crypto"; // ---- Auth-Helfer ------------------------------------------------------------- /** Org des Aufrufers ermitteln und sicherstellen, dass die Kampagne dazugehört. */ async function requireOwnedCampaign( ctx: { auth: any; db: any }, campaignId: Id<"campaigns">, ) { const identity = await ctx.auth.getUserIdentity(); if (!identity) throw new Error("Nicht authentifiziert"); const user = await ctx.db .query("users") .withIndex("by_auth", (q: any) => q.eq("authRef", identity.subject)) .first(); if (!user) throw new Error("Kein Nutzer-/Org-Kontext"); const campaign = await ctx.db.get(campaignId); if (!campaign || campaign.orgId !== user.orgId) { throw new Error("Kampagne nicht gefunden oder nicht berechtigt"); } return { orgId: user.orgId as Id<"organizations">, campaign }; } // ---- Öffentlicher Trigger ("Jetzt ausführen") -------------------------------- export const triggerCampaignRun = mutation({ args: { campaignId: v.id("campaigns") }, handler: async (ctx, { campaignId }) => { const { orgId } = await requireOwnedCampaign(ctx, campaignId); // Nur EIN aktiver Lauf pro Org gleichzeitig. const running = await ctx.db .query("runLogs") .withIndex("by_org", (q) => q.eq("orgId", orgId)) .filter((q) => q.eq(q.field("status"), "running")) .first(); if (running) throw new Error("Es läuft bereits ein Kampagnenlauf"); const runLogId = await ctx.db.insert("runLogs", { orgId, campaignId, status: "running", startedAt: Date.now(), }); await ctx.scheduler.runAfter(0, internal.campaigns.runCampaign, { orgId, campaignId, runLogId, }); return runLogId; }, }); // ---- Interne Helfer (DB) ----------------------------------------------------- export const getCampaignForRun = internalQuery({ args: { orgId: v.id("organizations"), campaignId: v.id("campaigns") }, handler: async (ctx, { orgId, campaignId }) => { const campaign = await ctx.db.get(campaignId); if (!campaign || campaign.orgId !== orgId) return null; const org = await ctx.db.get(orgId); const keyEntry = await ctx.db .query("apiKeyVault") .withIndex("by_org_provider", (q) => q.eq("orgId", orgId).eq("provider", "google"), ) .first(); return { campaign, killSwitch: org?.killSwitch ?? false, budgetCapUsdMonthly: org?.budgetCapUsdMonthly, googleKeyMode: keyEntry?.mode ?? null, googleKeyCipher: keyEntry?.encryptedKey ?? null, }; }, }); /** Monatsverbrauch summieren und gegen das Budget prüfen. */ export const checkMonthlyBudget = internalQuery({ args: { orgId: v.id("organizations"), capUsd: v.optional(v.number()) }, handler: async (ctx, { orgId, capUsd }) => { if (capUsd === undefined) return { ok: true, spentUsd: 0 }; const monthStart = new Date(); monthStart.setDate(1); monthStart.setHours(0, 0, 0, 0); const events = await ctx.db .query("usageEvents") .withIndex("by_org", (q) => q.eq("orgId", orgId)) .filter((q) => q.gte(q.field("_creationTime"), monthStart.getTime())) .collect(); const spentUsd = events.reduce((s, e) => s + e.costEstimateUsd, 0); return { ok: spentUsd < capUsd, spentUsd }; }, }); export const finishRunLog = internalMutation({ args: { runLogId: v.id("runLogs"), campaignId: v.id("campaigns"), status: v.union( v.literal("completed"), v.literal("failed"), v.literal("canceled"), ), leadsFound: v.optional(v.number()), googleError: v.optional(v.string()), }, handler: async (ctx, a) => { await ctx.db.patch(a.runLogId, { status: a.status, leadsFound: a.leadsFound, finishedAt: Date.now(), ...(a.googleError ? { errorByService: { google: a.googleError } } : {}), }); await ctx.db.patch(a.campaignId, { lastRunAt: Date.now() }); }, }); // ---- Die eigentliche Recherche-Action ---------------------------------------- export const runCampaign = internalAction({ args: { orgId: v.id("organizations"), campaignId: v.id("campaigns"), runLogId: v.id("runLogs"), }, handler: async (ctx, { orgId, campaignId, runLogId }) => { let leadsFound = 0; try { const ctxData = await ctx.runQuery(internal.campaigns.getCampaignForRun, { orgId, campaignId, }); if (!ctxData) throw new Error("Kampagne nicht gefunden"); const { campaign, killSwitch, budgetCapUsdMonthly } = ctxData; if (killSwitch) throw new Error("Kill-Switch aktiv — Lauf abgebrochen"); if (!campaign.coordinates) throw new Error("Kampagne ohne Koordinaten"); // Budget-Cap prüfen const budget = await ctx.runQuery(internal.campaigns.checkMonthlyBudget, { orgId, capUsd: budgetCapUsdMonthly, }); if (!budget.ok) throw new Error("Monats-Budget erreicht"); // BYO-Key entschlüsseln (Managed-Modus würde hier den Plattform-Key nutzen) if (!ctxData.googleKeyCipher) { throw new Error("Kein Google-API-Key hinterlegt"); } const apiKey = await decryptSecret(ctxData.googleKeyCipher); const costPerLead = leadLookupCostUsd(); let pageToken: string | undefined = undefined; // Seitenweise suchen, bis das Limit erreicht ist oder Google nichts mehr liefert. while (leadsFound < campaign.maxLeadsPerRun) { const page = await searchPlacesPage({ apiKey, textQuery: campaign.niche, lat: campaign.coordinates.lat, lng: campaign.coordinates.lng, radiusMeters: campaign.radiusMeters, pageToken, }); for (const place of page.places) { if (leadsFound >= campaign.maxLeadsPerRun) break; const result = await ctx.runMutation( internal.leads.upsertLeadFromPlace, { orgId, campaignId, niche: campaign.niche, place: { placeId: place.placeId, name: place.name, address: place.address, phone: place.phone, website: place.website, rating: place.rating, }, leadCostUsd: costPerLead, }, ); if (result.status === "inserted") leadsFound++; // duplicate/blocked werden übersprungen. } if (!page.nextPageToken) break; pageToken = page.nextPageToken; } await ctx.runMutation(internal.campaigns.finishRunLog, { runLogId, campaignId, status: "completed", leadsFound, }); // Höchstpriorisierte neue Leads an die Audit-Pipeline übergeben // (eigenständig eingeplant, damit der Recherche-Lauf sauber abschließt). if (campaign.maxAuditsPerRun > 0 && leadsFound > 0) { await ctx.scheduler.runAfter(0, internal.audits.runAuditBatch, { orgId, campaignId, limit: campaign.maxAuditsPerRun, }); } } catch (err) { await ctx.runMutation(internal.campaigns.finishRunLog, { runLogId, campaignId, status: "failed", leadsFound, googleError: err instanceof Error ? err.message : String(err), }); } }, }); // ---- Cron-Dispatcher --------------------------------------------------------- /** Findet fällige Kampagnen und stößt je einen Lauf an (von crons.ts aufgerufen). */ export const runDueCampaigns = internalMutation({ args: {}, handler: async (ctx) => { const now = Date.now(); const due = await ctx.db .query("campaigns") .withIndex("by_next_run", (q) => q.lte("nextRunAt", now)) .filter((q) => q.eq(q.field("status"), "active")) .take(25); for (const campaign of due) { // Doppelläufe vermeiden const running = await ctx.db .query("runLogs") .withIndex("by_org", (q) => q.eq("orgId", campaign.orgId)) .filter((q) => q.eq(q.field("status"), "running")) .first(); if (running) continue; const runLogId = await ctx.db.insert("runLogs", { orgId: campaign.orgId, campaignId: campaign._id, status: "running", startedAt: now, }); await ctx.scheduler.runAfter(0, internal.campaigns.runCampaign, { orgId: campaign.orgId, campaignId: campaign._id, runLogId, }); // nextRunAt neu setzen (vereinfachte Wochen-Kadenz; rrule später) await ctx.db.patch(campaign._id, { nextRunAt: now + 7 * 24 * 60 * 60 * 1000, }); } }, });