Files
pitchfast/v2_elemente/campaigns Kopie.ts

287 lines
9.2 KiB
TypeScript

/**
* convex/campaigns.ts
*
* Orchestrierung eines Kampagnenlaufs: Places-Suche → Dedup → Lead-Anlage.
*
* Aufbau (Convex-Pattern):
* - Mutations/Queries: deterministisch, DB-Zugriff, KEIN fetch.
* - Actions: dürfen fetch (Google Places) + Secrets entschlüsseln, aber
* greifen auf die DB nur über runQuery/runMutation zu.
*
* Mandanten-Invariante: jeder DB-Zugriff ist orgId-gescoped.
*/
import { v } from "convex/values";
import {
mutation,
internalAction,
internalMutation,
internalQuery,
} from "./_generated/server";
import { internal } from "./_generated/api";
import type { Id } from "./_generated/dataModel";
import { searchPlacesPage, leadLookupCostUsd } from "./lib/googlePlaces";
import { decryptSecret } from "./lib/crypto";
// ---- Auth-Helfer -------------------------------------------------------------
/** Org des Aufrufers ermitteln und sicherstellen, dass die Kampagne dazugehört. */
async function requireOwnedCampaign(
ctx: { auth: any; db: any },
campaignId: Id<"campaigns">,
) {
const identity = await ctx.auth.getUserIdentity();
if (!identity) throw new Error("Nicht authentifiziert");
const user = await ctx.db
.query("users")
.withIndex("by_auth", (q: any) => q.eq("authRef", identity.subject))
.first();
if (!user) throw new Error("Kein Nutzer-/Org-Kontext");
const campaign = await ctx.db.get(campaignId);
if (!campaign || campaign.orgId !== user.orgId) {
throw new Error("Kampagne nicht gefunden oder nicht berechtigt");
}
return { orgId: user.orgId as Id<"organizations">, campaign };
}
// ---- Öffentlicher Trigger ("Jetzt ausführen") --------------------------------
export const triggerCampaignRun = mutation({
args: { campaignId: v.id("campaigns") },
handler: async (ctx, { campaignId }) => {
const { orgId } = await requireOwnedCampaign(ctx, campaignId);
// Nur EIN aktiver Lauf pro Org gleichzeitig.
const running = await ctx.db
.query("runLogs")
.withIndex("by_org", (q) => q.eq("orgId", orgId))
.filter((q) => q.eq(q.field("status"), "running"))
.first();
if (running) throw new Error("Es läuft bereits ein Kampagnenlauf");
const runLogId = await ctx.db.insert("runLogs", {
orgId,
campaignId,
status: "running",
startedAt: Date.now(),
});
await ctx.scheduler.runAfter(0, internal.campaigns.runCampaign, {
orgId,
campaignId,
runLogId,
});
return runLogId;
},
});
// ---- Interne Helfer (DB) -----------------------------------------------------
export const getCampaignForRun = internalQuery({
args: { orgId: v.id("organizations"), campaignId: v.id("campaigns") },
handler: async (ctx, { orgId, campaignId }) => {
const campaign = await ctx.db.get(campaignId);
if (!campaign || campaign.orgId !== orgId) return null;
const org = await ctx.db.get(orgId);
const keyEntry = await ctx.db
.query("apiKeyVault")
.withIndex("by_org_provider", (q) =>
q.eq("orgId", orgId).eq("provider", "google"),
)
.first();
return {
campaign,
killSwitch: org?.killSwitch ?? false,
budgetCapUsdMonthly: org?.budgetCapUsdMonthly,
googleKeyMode: keyEntry?.mode ?? null,
googleKeyCipher: keyEntry?.encryptedKey ?? null,
};
},
});
/** Monatsverbrauch summieren und gegen das Budget prüfen. */
export const checkMonthlyBudget = internalQuery({
args: { orgId: v.id("organizations"), capUsd: v.optional(v.number()) },
handler: async (ctx, { orgId, capUsd }) => {
if (capUsd === undefined) return { ok: true, spentUsd: 0 };
const monthStart = new Date();
monthStart.setDate(1);
monthStart.setHours(0, 0, 0, 0);
const events = await ctx.db
.query("usageEvents")
.withIndex("by_org", (q) => q.eq("orgId", orgId))
.filter((q) => q.gte(q.field("_creationTime"), monthStart.getTime()))
.collect();
const spentUsd = events.reduce((s, e) => s + e.costEstimateUsd, 0);
return { ok: spentUsd < capUsd, spentUsd };
},
});
export const finishRunLog = internalMutation({
args: {
runLogId: v.id("runLogs"),
campaignId: v.id("campaigns"),
status: v.union(
v.literal("completed"),
v.literal("failed"),
v.literal("canceled"),
),
leadsFound: v.optional(v.number()),
googleError: v.optional(v.string()),
},
handler: async (ctx, a) => {
await ctx.db.patch(a.runLogId, {
status: a.status,
leadsFound: a.leadsFound,
finishedAt: Date.now(),
...(a.googleError ? { errorByService: { google: a.googleError } } : {}),
});
await ctx.db.patch(a.campaignId, { lastRunAt: Date.now() });
},
});
// ---- Die eigentliche Recherche-Action ----------------------------------------
export const runCampaign = internalAction({
args: {
orgId: v.id("organizations"),
campaignId: v.id("campaigns"),
runLogId: v.id("runLogs"),
},
handler: async (ctx, { orgId, campaignId, runLogId }) => {
let leadsFound = 0;
try {
const ctxData = await ctx.runQuery(internal.campaigns.getCampaignForRun, {
orgId,
campaignId,
});
if (!ctxData) throw new Error("Kampagne nicht gefunden");
const { campaign, killSwitch, budgetCapUsdMonthly } = ctxData;
if (killSwitch) throw new Error("Kill-Switch aktiv — Lauf abgebrochen");
if (!campaign.coordinates) throw new Error("Kampagne ohne Koordinaten");
// Budget-Cap prüfen
const budget = await ctx.runQuery(internal.campaigns.checkMonthlyBudget, {
orgId,
capUsd: budgetCapUsdMonthly,
});
if (!budget.ok) throw new Error("Monats-Budget erreicht");
// BYO-Key entschlüsseln (Managed-Modus würde hier den Plattform-Key nutzen)
if (!ctxData.googleKeyCipher) {
throw new Error("Kein Google-API-Key hinterlegt");
}
const apiKey = await decryptSecret(ctxData.googleKeyCipher);
const costPerLead = leadLookupCostUsd();
let pageToken: string | undefined = undefined;
// Seitenweise suchen, bis das Limit erreicht ist oder Google nichts mehr liefert.
while (leadsFound < campaign.maxLeadsPerRun) {
const page = await searchPlacesPage({
apiKey,
textQuery: campaign.niche,
lat: campaign.coordinates.lat,
lng: campaign.coordinates.lng,
radiusMeters: campaign.radiusMeters,
pageToken,
});
for (const place of page.places) {
if (leadsFound >= campaign.maxLeadsPerRun) break;
const result = await ctx.runMutation(
internal.leads.upsertLeadFromPlace,
{
orgId,
campaignId,
niche: campaign.niche,
place: {
placeId: place.placeId,
name: place.name,
address: place.address,
phone: place.phone,
website: place.website,
rating: place.rating,
},
leadCostUsd: costPerLead,
},
);
if (result.status === "inserted") leadsFound++;
// duplicate/blocked werden übersprungen.
}
if (!page.nextPageToken) break;
pageToken = page.nextPageToken;
}
await ctx.runMutation(internal.campaigns.finishRunLog, {
runLogId,
campaignId,
status: "completed",
leadsFound,
});
// Höchstpriorisierte neue Leads an die Audit-Pipeline übergeben
// (eigenständig eingeplant, damit der Recherche-Lauf sauber abschließt).
if (campaign.maxAuditsPerRun > 0 && leadsFound > 0) {
await ctx.scheduler.runAfter(0, internal.audits.runAuditBatch, {
orgId,
campaignId,
limit: campaign.maxAuditsPerRun,
});
}
} catch (err) {
await ctx.runMutation(internal.campaigns.finishRunLog, {
runLogId,
campaignId,
status: "failed",
leadsFound,
googleError: err instanceof Error ? err.message : String(err),
});
}
},
});
// ---- Cron-Dispatcher ---------------------------------------------------------
/** Findet fällige Kampagnen und stößt je einen Lauf an (von crons.ts aufgerufen). */
export const runDueCampaigns = internalMutation({
args: {},
handler: async (ctx) => {
const now = Date.now();
const due = await ctx.db
.query("campaigns")
.withIndex("by_next_run", (q) => q.lte("nextRunAt", now))
.filter((q) => q.eq(q.field("status"), "active"))
.take(25);
for (const campaign of due) {
// Doppelläufe vermeiden
const running = await ctx.db
.query("runLogs")
.withIndex("by_org", (q) => q.eq("orgId", campaign.orgId))
.filter((q) => q.eq(q.field("status"), "running"))
.first();
if (running) continue;
const runLogId = await ctx.db.insert("runLogs", {
orgId: campaign.orgId,
campaignId: campaign._id,
status: "running",
startedAt: now,
});
await ctx.scheduler.runAfter(0, internal.campaigns.runCampaign, {
orgId: campaign.orgId,
campaignId: campaign._id,
runLogId,
});
// nextRunAt neu setzen (vereinfachte Wochen-Kadenz; rrule später)
await ctx.db.patch(campaign._id, {
nextRunAt: now + 7 * 24 * 60 * 60 * 1000,
});
}
},
});