import { v } from "convex/values"; import { GOOGLE_PLACES_FIELD_MASK, buildGeocodingUrl, getBlacklistLookupValues, getBlacklistMatches, getPlacesSearchSpec, normalizePlacesResponse, parseGeocodingResponse, } from "../lib/lead-discovery-google"; import { buildLeadDiscoveryLeadRecord, buildLeadDiscoveryCounters, } from "../lib/lead-discovery-run"; import { calculateNextRunAt } from "../lib/campaign-scheduling"; import { internal } from "./_generated/api"; import { Doc, Id } from "./_generated/dataModel"; import { internalAction, internalMutation } from "./_generated/server"; type CampaignDoc = Doc<"campaigns">; const nullableString = v.union(v.string(), v.null()); const nullableNumber = v.union(v.number(), v.null()); const candidateValidator = v.object({ placeId: v.string(), businessName: v.string(), address: v.string(), websiteUrl: nullableString, websiteDomain: nullableString, phone: nullableString, rating: nullableNumber, userRatingCount: nullableNumber, businessStatus: nullableString, googleTypes: v.array(v.string()), googlePrimaryType: nullableString, googleMapsUrl: nullableString, sourceProvider: v.literal("google_places"), sourceFetchedAt: v.number(), }); const eventDetailValidator = v.object({ label: v.string(), value: v.string(), source: v.optional(v.string()), }); function getRequiredEnv(key: string) { const value = process.env[key]?.trim(); if (!value) { throw new Error(`${key} ist nicht gesetzt.`); } return value; } function messageFromError(error: unknown) { return error instanceof Error ? error.message : String(error); } function getCampaignNiche(campaign: CampaignDoc) { if ( campaign.categoryMode === "custom" || campaign.category === "Anderes" ) { return campaign.customSearchTerm?.trim() || campaign.category; } return campaign.category; } async function fetchJson(url: string, init?: RequestInit) { const response = await fetch(url, init); if (!response.ok) { const body = await response.text(); throw new Error( `Google API request failed with HTTP ${response.status}: ${body.slice(0, 500)}`, ); } return await response.json(); } export const processCampaignRun = internalAction({ args: { runId: v.id("agentRuns"), }, handler: async (ctx, args) => { const started: { campaign: CampaignDoc; runId: Id<"agentRuns">; } | null = await ctx.runMutation(internal.leadDiscovery.startCampaignRun, { runId: args.runId, }); if (!started) { return null; } try { const geocodingApiKey = getRequiredEnv("GOOGLE_GEOCODING_API_KEY"); const placesApiKey = getRequiredEnv("GOOGLE_PLACES_API_KEY"); const campaign = started.campaign; const fetchedAt = Date.now(); let latitude = campaign.latitude; let longitude = campaign.longitude; if (typeof latitude !== "number" || typeof longitude !== "number") { const geocodingUrl = buildGeocodingUrl({ postalCode: campaign.postalCode, apiKey: geocodingApiKey, }); const geocodingJson = await fetchJson(geocodingUrl); const geocoding = parseGeocodingResponse(geocodingJson, fetchedAt); latitude = geocoding.latitude; longitude = geocoding.longitude; await ctx.runMutation(internal.leadDiscovery.cacheCampaignGeocode, { campaignId: campaign._id, latitude, longitude, geocodedAt: geocoding.fetchedAt, geocodingPlaceId: geocoding.placeId, geocodingFormattedAddress: geocoding.formattedAddress, }); await ctx.runMutation(internal.leadDiscovery.appendRunEvent, { runId: args.runId, level: "info", message: "PLZ geocodiert.", details: [ { label: "PLZ", value: campaign.postalCode, source: "google_geocoding" }, { label: "Koordinaten", value: `${latitude}, ${longitude}`, source: "google_geocoding", }, ], }); } else { await ctx.runMutation(internal.leadDiscovery.appendRunEvent, { runId: args.runId, level: "info", message: "Geocoding-Cache der Kampagne verwendet.", details: [ { label: "PLZ", value: campaign.postalCode }, { label: "Koordinaten", value: `${latitude}, ${longitude}` }, ], }); } const searchSpec = getPlacesSearchSpec({ categoryMode: campaign.categoryMode, category: campaign.category, customSearchTerm: campaign.customSearchTerm, postalCode: campaign.postalCode, radiusKm: campaign.radiusKm, latitude, longitude, }); const placesJson = await fetchJson( `https://places.googleapis.com/v1/places:${searchSpec.endpoint}`, { method: "POST", headers: { "Content-Type": "application/json", "X-Goog-Api-Key": placesApiKey, "X-Goog-FieldMask": GOOGLE_PLACES_FIELD_MASK, }, body: JSON.stringify(searchSpec.body), }, ); const candidates = normalizePlacesResponse(placesJson, Date.now()); if (candidates.length === 0) { await ctx.runMutation(internal.leadDiscovery.appendRunEvent, { runId: args.runId, level: "warning", message: "Google Places lieferte keine Ergebnisse.", details: [ { label: "Suchtyp", value: searchSpec.searchType, source: "google_places" }, { label: "Kategorie", value: getCampaignNiche(campaign), source: "google_places" }, ], }); } const result: { leadsFound: number; leadsCreated: number; skippedDuplicates: number; skippedBlacklisted: number; errors: number; } = await ctx.runMutation(internal.leadDiscovery.persistDiscoveredLeads, { runId: args.runId, campaignId: campaign._id, maxNewLeads: campaign.maxNewLeadsPerRun, niche: getCampaignNiche(campaign), postalCode: campaign.postalCode, candidates, }); await ctx.runMutation(internal.leadDiscovery.finishCampaignRun, { runId: args.runId, status: "succeeded", currentStep: "lead_discovery", counters: buildLeadDiscoveryCounters({ leadsFound: result.leadsFound, leadsCreated: result.leadsCreated, errors: result.errors, }), }); return result; } catch (error) { const errorSummary = messageFromError(error); await ctx.runMutation(internal.leadDiscovery.appendRunEvent, { runId: args.runId, level: "error", message: "Lead-Recherche fehlgeschlagen.", details: [{ label: "Fehler", value: errorSummary }], }); await ctx.runMutation(internal.leadDiscovery.finishCampaignRun, { runId: args.runId, status: "failed", currentStep: "lead_discovery", errorSummary, counters: buildLeadDiscoveryCounters({ leadsFound: 0, leadsCreated: 0, errors: 1, }), }); return null; } }, }); export const startCampaignRun = internalMutation({ args: { runId: v.id("agentRuns"), }, handler: async (ctx, args) => { const now = Date.now(); const run = await ctx.db.get(args.runId); if (!run || !run.campaignId || run.status !== "pending") { return null; } const activeRunning = await ctx.db .query("agentRuns") .withIndex("by_status", (q) => q.eq("status", "running")) .take(1); if (activeRunning.length > 0) { await ctx.db.patch(args.runId, { status: "canceled", currentStep: "lead_discovery", errorSummary: "Ein anderer Agentenlauf ist bereits aktiv.", finishedAt: now, updatedAt: now, }); await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "warning", message: "Lauf nicht gestartet, weil ein anderer Agentenlauf aktiv ist.", createdAt: now, }); return null; } const campaign = await ctx.db.get(run.campaignId); if (!campaign) { await ctx.db.patch(args.runId, { status: "failed", currentStep: "lead_discovery", errorSummary: "Kampagne nicht gefunden.", finishedAt: now, updatedAt: now, }); return null; } await ctx.db.patch(args.runId, { status: "running", currentStep: "lead_discovery", startedAt: now, updatedAt: now, }); await ctx.db.patch(campaign._id, { lastRunAt: now, nextRunAt: calculateNextRunAt({ recurrence: campaign.recurrence, status: campaign.status, lastRunAt: now, now, }), updatedAt: now, }); await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "info", message: "Lead-Recherche gestartet.", details: [ { label: "Kampagne", value: campaign.name }, { label: "PLZ", value: campaign.postalCode }, ], createdAt: now, }); return { runId: args.runId, campaign }; }, }); export const cacheCampaignGeocode = internalMutation({ args: { campaignId: v.id("campaigns"), latitude: v.number(), longitude: v.number(), geocodedAt: v.number(), geocodingPlaceId: v.string(), geocodingFormattedAddress: v.string(), }, handler: async (ctx, args) => { await ctx.db.patch(args.campaignId, { latitude: args.latitude, longitude: args.longitude, geocodedAt: args.geocodedAt, geocodingPlaceId: args.geocodingPlaceId, geocodingFormattedAddress: args.geocodingFormattedAddress, updatedAt: Date.now(), }); }, }); export const appendRunEvent = internalMutation({ args: { runId: v.id("agentRuns"), level: v.union(v.literal("info"), v.literal("warning"), v.literal("error")), message: v.string(), details: v.optional(v.array(eventDetailValidator)), }, handler: async (ctx, args) => { await ctx.db.insert("agentRunEvents", { ...args, createdAt: Date.now(), }); }, }); export const persistDiscoveredLeads = internalMutation({ args: { runId: v.id("agentRuns"), campaignId: v.id("campaigns"), maxNewLeads: v.number(), niche: v.string(), postalCode: v.string(), candidates: v.array(candidateValidator), }, handler: async (ctx, args) => { const now = Date.now(); let leadsCreated = 0; let skippedDuplicates = 0; let skippedBlacklisted = 0; let errors = 0; for (const candidate of args.candidates) { if (leadsCreated >= args.maxNewLeads) { await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "info", message: "Lead-Limit des Laufs erreicht.", details: [{ label: "Limit", value: String(args.maxNewLeads) }], createdAt: Date.now(), }); break; } if (!candidate.businessName.trim()) { errors += 1; await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "warning", message: "Google-Places-Ergebnis ohne Unternehmensname übersprungen.", details: [{ label: "Place ID", value: candidate.placeId }], createdAt: Date.now(), }); continue; } const existingByPlaceId = await ctx.db .query("leads") .withIndex("by_googlePlaceId", (q) => q.eq("googlePlaceId", candidate.placeId), ) .take(1); const candidateDomain = candidate.websiteDomain; const existingByDomain = candidateDomain ? await ctx.db .query("leads") .withIndex("by_websiteDomain", (q) => q.eq("websiteDomain", candidateDomain), ) .take(1) : []; if (existingByPlaceId.length > 0 || existingByDomain.length > 0) { skippedDuplicates += 1; await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "info", message: "Doppelter Lead übersprungen.", details: [ { label: "Unternehmen", value: candidate.businessName, source: "google_places" }, { label: "Place ID", value: candidate.placeId, source: "google_places" }, ], createdAt: Date.now(), }); continue; } const blacklistRows = []; for (const lookup of getBlacklistLookupValues(candidate)) { const rows = await ctx.db .query("blacklistEntries") .withIndex("by_type_and_normalizedValue", (q) => q .eq("type", lookup.type) .eq("normalizedValue", lookup.normalizedValue), ) .take(1); blacklistRows.push(...rows); } const blacklistMatches = getBlacklistMatches(candidate, blacklistRows); if (blacklistMatches.length > 0) { skippedBlacklisted += 1; await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "warning", message: "Gesperrter Lead übersprungen.", details: blacklistMatches.map((match) => ({ label: match.type, value: match.value, source: "blacklist", })), createdAt: Date.now(), }); continue; } const lead = buildLeadDiscoveryLeadRecord({ campaignId: args.campaignId, runId: args.runId, niche: args.niche, postalCode: args.postalCode, candidate, now, }); await ctx.db.insert("leads", lead); leadsCreated += 1; await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "info", message: "Lead aus Google Places gespeichert.", details: [ { label: "Unternehmen", value: candidate.businessName, source: "google_places" }, { label: "Place ID", value: candidate.placeId, source: "google_places" }, ], createdAt: Date.now(), }); } await ctx.db.insert("agentRunEvents", { runId: args.runId, level: "info", message: "Lead-Recherche abgeschlossen.", details: [ { label: "Gefunden", value: String(args.candidates.length) }, { label: "Gespeichert", value: String(leadsCreated) }, { label: "Dubletten übersprungen", value: String(skippedDuplicates) }, { label: "Sperrliste übersprungen", value: String(skippedBlacklisted) }, ], createdAt: Date.now(), }); return { leadsFound: args.candidates.length, leadsCreated, skippedDuplicates, skippedBlacklisted, errors, }; }, }); export const finishCampaignRun = internalMutation({ args: { runId: v.id("agentRuns"), status: v.union(v.literal("succeeded"), v.literal("failed"), v.literal("canceled")), currentStep: v.optional(v.string()), errorSummary: v.optional(v.string()), counters: v.object({ leadsFound: v.number(), leadsCreated: v.number(), auditsCreated: v.number(), outreachPrepared: v.number(), errors: v.number(), }), }, handler: async (ctx, args) => { const patch: { status: typeof args.status; updatedAt: number; finishedAt: number; counters: typeof args.counters; currentStep?: string; errorSummary?: string; } = { status: args.status, updatedAt: Date.now(), finishedAt: Date.now(), counters: args.counters, }; if (args.currentStep !== undefined) { patch.currentStep = args.currentStep; } if (args.errorSummary !== undefined) { patch.errorSummary = args.errorSummary; } await ctx.db.patch(args.runId, patch); }, });