Integrate local business workflow and SaaS redesign

This commit is contained in:
2026-06-12 21:08:35 +02:00
parent f00c5a3193
commit 21c7e4c9a4
88 changed files with 2683 additions and 849 deletions

View File

@@ -2,5 +2,5 @@
"guidelinesHash": "62d72acb9afcc18f658d88dd772f34b5b1da5fa60ef0402e57a784d97c458e57",
"agentsMdSectionHash": "5934f676ea9a332e7cd4a4f64aa23b59d926e9faca026c758d4b1f87d2101cc3",
"claudeMdHash": "5934f676ea9a332e7cd4a4f64aa23b59d926e9faca026c758d4b1f87d2101cc3",
"agentSkillsSha": "294d4f05edb5e7b57f3c534b79dd00e8e3d7b60d"
"agentSkillsSha": "7a6fcc6882f344577a34365fdadbd0f8f8c467d7"
}

View File

@@ -205,8 +205,8 @@ const auditGenerationUsage = v.object({
const secretHints = [
"OPENROUTER_API_KEY",
"GOOGLE_PLACES_API_KEY",
"GOOGLE_GEOCODING_API_KEY",
"LOCAL_BUSINESS_DATA_API_KEY",
"RAPIDAPI_KEY",
"PAGESPEED_API_KEY",
"SMTP_PASSWORD",
"SMTP_HOST",

View File

@@ -98,8 +98,8 @@ function sanitizeAndCapString(value: string | undefined, maxBytes: number) {
const secretHints = [
"OPENROUTER_API_KEY",
"GOOGLE_PLACES_API_KEY",
"GOOGLE_GEOCODING_API_KEY",
"LOCAL_BUSINESS_DATA_API_KEY",
"RAPIDAPI_KEY",
"PAGESPEED_API_KEY",
"SMTP_PASSWORD",
"SMTP_HOST",

View File

@@ -17,6 +17,7 @@ const blacklistType = v.union(
v.literal("phone"),
v.literal("company"),
v.literal("google_place_id"),
v.literal("source_business_id"),
);
type BlacklistType =
@@ -24,7 +25,8 @@ type BlacklistType =
| "email"
| "phone"
| "company"
| "google_place_id";
| "google_place_id"
| "source_business_id";
const BLACKLIST_APPLY_BATCH_SIZE = 100;
const BLACKLIST_REVIEW_NOTE_PREFIX =
@@ -51,6 +53,7 @@ type LeadMatchingFieldsPatch = Partial<
| "normalizedCompanyName"
| "normalizedAddress"
| "normalizedGooglePlaceId"
| "normalizedSourceBusinessId"
>
> & {
updatedAt: number;
@@ -138,6 +141,13 @@ function getLeadMatchQuery(
.withIndex("by_normalizedGooglePlaceId", (q) =>
q.eq("normalizedGooglePlaceId", normalizedValue),
);
case "source_business_id":
return () =>
ctx.db
.query("leads")
.withIndex("by_normalizedSourceBusinessId", (q) =>
q.eq("normalizedSourceBusinessId", normalizedValue),
);
default:
return null;
}
@@ -152,6 +162,9 @@ function buildLeadMatchingFieldsPatch(lead: Doc<"leads">) {
const normalizedCompanyName = normalizeText(lead.companyName);
const normalizedAddress = normalizeText(lead.address);
const normalizedGooglePlaceId = normalizeDomain(lead.googlePlaceId);
const normalizedSourceBusinessId = normalizeDomain(
lead.sourceBusinessId ?? lead.googlePlaceId,
);
if (!lead.normalizedEmail && normalizedEmail) {
patch.normalizedEmail = normalizedEmail;
@@ -168,6 +181,9 @@ function buildLeadMatchingFieldsPatch(lead: Doc<"leads">) {
if (!lead.normalizedGooglePlaceId && normalizedGooglePlaceId) {
patch.normalizedGooglePlaceId = normalizedGooglePlaceId;
}
if (!lead.normalizedSourceBusinessId && normalizedSourceBusinessId) {
patch.normalizedSourceBusinessId = normalizedSourceBusinessId;
}
return Object.keys(patch).length > 1 ? patch : null;
}
@@ -200,6 +216,7 @@ function normalizeBlacklistValue(type: BlacklistType, value: string) {
return normalizePhone(trimmed);
case "domain":
case "google_place_id":
case "source_business_id":
return normalizeDomain(trimmed);
case "company":
return normalizeText(trimmed);

View File

@@ -6,6 +6,8 @@ const SECRET_KEY_PATTERNS = [
/credential/i,
/smtp/i,
/openrouter/i,
/rapidapi/i,
/local[_-]?business[_-]?data/i,
/google[_-]?(geocoding|places)?/i,
/pagespeed/i,
/rybbit/i,
@@ -77,6 +79,7 @@ export const BLACKLIST_TYPES = [
"phone",
"company",
"google_place_id",
"source_business_id",
] as const;
export const RUN_TYPES = [
"campaign",
@@ -131,6 +134,7 @@ export const USAGE_EVENT_PROVIDERS = [
"jina",
"pagespeed",
"google_places",
"local_business_data",
] as const;
export const USAGE_EVENT_OPERATIONS = [
"audit_capture",

View File

@@ -1,23 +1,23 @@
import { v } from "convex/values";
import {
GOOGLE_PLACES_FIELD_MASK,
buildGeocodingUrl,
getBlacklistLookupValues,
getBlacklistMatches,
getCandidateEmailValues,
getPlacesSearchSpec,
getUsableContactEmail,
normalizeDomain,
normalizePhone,
normalizeText,
normalizePlacesResponse,
parseGeocodingResponse,
} from "../lib/lead-discovery-google";
import {
LOCAL_BUSINESS_DATA_HOST,
getLocalBusinessSearchSpec,
normalizeLocalBusinessSearchResponse,
} from "../lib/lead-discovery-local-business";
import {
buildLeadDiscoveryLeadRecord,
buildLeadDiscoveryCounters,
getLeadDiscoveryPriority,
shouldScheduleWebsiteEnrichment,
} from "../lib/lead-discovery-run";
import { calculateNextRunAt } from "../lib/campaign-scheduling";
@@ -26,12 +26,21 @@ import { Doc, Id } from "./_generated/dataModel";
import { internalAction, internalMutation } from "./_generated/server";
type CampaignDoc = Doc<"campaigns">;
type DuplicateEmailBackfillPatch = Partial<
Pick<
Doc<"leads">,
"normalizedEmail" | "email" | "emailSource" | "contactPerson" | "contactStatus" | "contactStatusReason"
>
> & {
updatedAt: number;
};
const nullableString = v.union(v.string(), v.null());
const nullableNumber = v.union(v.number(), v.null());
const candidateValidator = v.object({
placeId: v.string(),
sourceBusinessId: v.optional(nullableString),
businessName: v.string(),
address: v.string(),
websiteUrl: nullableString,
@@ -57,7 +66,10 @@ const candidateValidator = v.object({
}),
),
),
sourceProvider: v.literal("google_places"),
sourceProvider: v.union(
v.literal("google_places"),
v.literal("local_business_data"),
),
sourceFetchedAt: v.number(),
});
@@ -98,7 +110,7 @@ async function fetchJson(url: string, init?: RequestInit) {
if (!response.ok) {
const body = await response.text();
throw new Error(
`Google API request failed with HTTP ${response.status}: ${body.slice(0, 500)}`,
`Local Business Data request failed with HTTP ${response.status}: ${body.slice(0, 500)}`,
);
}
@@ -122,89 +134,54 @@ export const processCampaignRun = internalAction({
}
try {
const geocodingApiKey = getRequiredEnv("GOOGLE_GEOCODING_API_KEY");
const placesApiKey = getRequiredEnv("GOOGLE_PLACES_API_KEY");
const localBusinessDataApiKey = getRequiredEnv("LOCAL_BUSINESS_DATA_API_KEY");
const campaign = started.campaign;
const fetchedAt = Date.now();
let latitude = campaign.latitude;
let longitude = campaign.longitude;
if (typeof latitude !== "number" || typeof longitude !== "number") {
const geocodingUrl = buildGeocodingUrl({
postalCode: campaign.postalCode,
apiKey: geocodingApiKey,
});
const geocodingJson = await fetchJson(geocodingUrl);
const geocoding = parseGeocodingResponse(geocodingJson, fetchedAt);
latitude = geocoding.latitude;
longitude = geocoding.longitude;
await ctx.runMutation(internal.leadDiscovery.cacheCampaignGeocode, {
campaignId: campaign._id,
latitude,
longitude,
geocodedAt: geocoding.fetchedAt,
geocodingPlaceId: geocoding.placeId,
geocodingFormattedAddress: geocoding.formattedAddress,
});
await ctx.runMutation(internal.leadDiscovery.appendRunEvent, {
runId: args.runId,
level: "info",
message: "PLZ geocodiert.",
details: [
{ label: "PLZ", value: campaign.postalCode, source: "google_geocoding" },
{
label: "Koordinaten",
value: `${latitude}, ${longitude}`,
source: "google_geocoding",
},
],
});
} else {
await ctx.runMutation(internal.leadDiscovery.appendRunEvent, {
runId: args.runId,
level: "info",
message: "Geocoding-Cache der Kampagne verwendet.",
details: [
{ label: "PLZ", value: campaign.postalCode },
{ label: "Koordinaten", value: `${latitude}, ${longitude}` },
],
});
}
const searchSpec = getPlacesSearchSpec({
const searchSpec = getLocalBusinessSearchSpec({
categoryMode: campaign.categoryMode,
category: campaign.category,
customSearchTerm: campaign.customSearchTerm,
postalCode: campaign.postalCode,
radiusKm: campaign.radiusKm,
latitude,
longitude,
maxNewLeads: campaign.maxNewLeadsPerRun,
});
const placesJson = await fetchJson(
`https://places.googleapis.com/v1/places:${searchSpec.endpoint}`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Goog-Api-Key": placesApiKey,
"X-Goog-FieldMask": GOOGLE_PLACES_FIELD_MASK,
},
body: JSON.stringify(searchSpec.body),
const localBusinessJson = await fetchJson(searchSpec.url, {
method: "GET",
headers: {
"X-RapidAPI-Key": localBusinessDataApiKey,
"X-RapidAPI-Host": LOCAL_BUSINESS_DATA_HOST,
},
});
const candidates = normalizeLocalBusinessSearchResponse(
localBusinessJson,
Date.now(),
);
const candidates = normalizePlacesResponse(placesJson, Date.now());
await ctx.runMutation(internal.usageEvents.recordUsageEvent, {
provider: "local_business_data",
operation: "lead_lookup",
runId: args.runId,
estimatedCostUsd: 0,
callCounts: {
requests: 1,
lookups: candidates.length,
},
});
if (candidates.length === 0) {
await ctx.runMutation(internal.leadDiscovery.appendRunEvent, {
runId: args.runId,
level: "warning",
message: "Google Places lieferte keine Ergebnisse.",
message: "Local Business Data lieferte keine Ergebnisse.",
details: [
{ label: "Suchtyp", value: searchSpec.searchType, source: "google_places" },
{ label: "Kategorie", value: getCampaignNiche(campaign), source: "google_places" },
{
label: "Suchquery",
value: searchSpec.query,
source: "local_business_data",
},
{
label: "Kategorie",
value: getCampaignNiche(campaign),
source: "local_business_data",
},
],
});
}
@@ -215,11 +192,6 @@ export const processCampaignRun = internalAction({
skippedDuplicates: number;
skippedBlacklisted: number;
errors: number;
websiteEnrichmentQueue: Array<{
leadId: Id<"leads">;
companyName: string;
website: string;
}>;
} = await ctx.runMutation(internal.leadDiscovery.persistDiscoveredLeads, {
runId: args.runId,
campaignId: campaign._id,
@@ -229,31 +201,6 @@ export const processCampaignRun = internalAction({
candidates,
});
for (const enrichment of result.websiteEnrichmentQueue) {
await ctx.runMutation(internal.websiteEnrichment.queueLeadEnrichment, {
leadId: enrichment.leadId,
parentRunId: args.runId,
});
await ctx.runMutation(internal.leadDiscovery.appendRunEvent, {
runId: args.runId,
level: "info",
message: "Website-Kontaktanreicherung geplant.",
details: [
{
label: "Unternehmen",
value: enrichment.companyName,
source: "google_places",
},
{
label: "Website",
value: enrichment.website,
source: "google_places",
},
],
});
}
await ctx.runMutation(internal.leadDiscovery.finishCampaignRun, {
runId: args.runId,
status: "succeeded",
@@ -423,11 +370,6 @@ export const persistDiscoveredLeads = internalMutation({
let skippedDuplicates = 0;
let skippedBlacklisted = 0;
let errors = 0;
const websiteEnrichmentQueue: Array<{
leadId: Id<"leads">;
companyName: string;
website: string;
}> = [];
for (const candidate of args.candidates) {
if (leadsCreated >= args.maxNewLeads) {
@@ -446,7 +388,7 @@ export const persistDiscoveredLeads = internalMutation({
await ctx.db.insert("agentRunEvents", {
runId: args.runId,
level: "warning",
message: "Google-Places-Ergebnis ohne Unternehmensname übersprungen.",
message: "Lead-Recherche-Ergebnis ohne Unternehmensname übersprungen.",
details: [{ label: "Place ID", value: candidate.placeId }],
createdAt: Date.now(),
});
@@ -454,6 +396,9 @@ export const persistDiscoveredLeads = internalMutation({
}
const normalizedPlaceId = normalizeDomain(candidate.placeId);
const normalizedSourceBusinessId = normalizeDomain(
candidate.sourceBusinessId ?? candidate.placeId,
);
const normalizedDomain = normalizeDomain(candidate.websiteDomain);
const normalizedEmails = getCandidateEmailValues(candidate);
const normalizedPhone = normalizePhone(candidate.phone);
@@ -476,6 +421,15 @@ export const persistDiscoveredLeads = internalMutation({
.take(1)
: [];
const duplicateBySourceBusinessId = normalizedSourceBusinessId
? await ctx.db
.query("leads")
.withIndex("by_normalizedSourceBusinessId", (q) =>
q.eq("normalizedSourceBusinessId", normalizedSourceBusinessId),
)
.take(1)
: [];
const duplicateByEmailRows = [];
for (const email of normalizedEmails) {
const rows = await ctx.db
@@ -487,17 +441,84 @@ export const persistDiscoveredLeads = internalMutation({
if (
duplicateByPlaceId.length > 0 ||
duplicateBySourceBusinessId.length > 0 ||
duplicateByDomain.length > 0 ||
duplicateByEmailRows.length > 0
) {
skippedDuplicates += 1;
const duplicateLeadForEmailBackfill =
duplicateBySourceBusinessId[0] ??
duplicateByPlaceId[0] ??
duplicateByDomain[0] ??
null;
const usableEmail = getUsableContactEmail(candidate);
if (
duplicateLeadForEmailBackfill &&
usableEmail &&
!duplicateLeadForEmailBackfill.email &&
duplicateLeadForEmailBackfill.contactStatus !== "do_not_contact" &&
duplicateLeadForEmailBackfill.blacklistStatus !== "blocked" &&
duplicateLeadForEmailBackfill.priority !== "blocked" &&
duplicateByEmailRows.length === 0
) {
const emailBackfillPatch: DuplicateEmailBackfillPatch = {
normalizedEmail: usableEmail.email,
email: usableEmail.email,
updatedAt: now,
};
if (usableEmail.emailSource !== null) {
emailBackfillPatch.emailSource = usableEmail.emailSource;
}
if (usableEmail.contactPerson !== null) {
emailBackfillPatch.contactPerson = usableEmail.contactPerson;
}
if (duplicateLeadForEmailBackfill.contactStatus === "missing_contact") {
emailBackfillPatch.contactStatus = "new";
emailBackfillPatch.contactStatusReason =
"E-Mail bei erneutem Local-Business-Data-Lauf ergänzt.";
}
await ctx.db.patch(
duplicateLeadForEmailBackfill._id,
emailBackfillPatch,
);
await ctx.db.insert("agentRunEvents", {
runId: args.runId,
level: "info",
message: "E-Mail für bestehenden Lead ergänzt.",
details: [
{
label: "Unternehmen",
value: duplicateLeadForEmailBackfill.companyName,
source: candidate.sourceProvider,
},
{
label: "E-Mail-Quelle",
value: usableEmail.emailSource ?? "local_business_data",
source: candidate.sourceProvider,
},
],
createdAt: Date.now(),
});
}
await ctx.db.insert("agentRunEvents", {
runId: args.runId,
level: "info",
message: "Doppelter Lead übersprungen.",
details: [
{ label: "Unternehmen", value: candidate.businessName, source: "google_places" },
{ label: "Place ID", value: candidate.placeId, source: "google_places" },
{
label: "Unternehmen",
value: candidate.businessName,
source: candidate.sourceProvider,
},
{
label: "Source ID",
value: candidate.sourceBusinessId ?? candidate.placeId,
source: candidate.sourceProvider,
},
],
createdAt: Date.now(),
});
@@ -569,13 +590,16 @@ export const persistDiscoveredLeads = internalMutation({
const priorityResult = getLeadDiscoveryPriority({
isDuplicate: !!probableDuplicateLead,
hasWebsite,
hasWebsiteSignal: false, // plain Google-Places website hint maps to medium priority.
hasWebsiteSignal: false,
});
const isDuplicateCandidate = !!probableDuplicateLead;
if (normalizedPlaceId) {
lead.normalizedGooglePlaceId = normalizedPlaceId;
}
if (normalizedSourceBusinessId) {
lead.normalizedSourceBusinessId = normalizedSourceBusinessId;
}
if (normalizedPhone !== "") {
lead.normalizedPhone = normalizedPhone;
}
@@ -596,20 +620,21 @@ export const persistDiscoveredLeads = internalMutation({
const leadId = await ctx.db.insert("leads", lead);
leadsCreated += 1;
if (shouldScheduleWebsiteEnrichment(lead)) {
websiteEnrichmentQueue.push({
leadId,
companyName: lead.companyName,
website: lead.websiteDomain ?? lead.websiteUrl ?? "unbekannt",
});
}
await ctx.db.insert("agentRunEvents", {
runId: args.runId,
level: "info",
message: "Lead aus Google Places gespeichert.",
message: "Lead aus Local Business Data gespeichert.",
details: [
{ label: "Unternehmen", value: candidate.businessName, source: "google_places" },
{ label: "Place ID", value: candidate.placeId, source: "google_places" },
{
label: "Unternehmen",
value: candidate.businessName,
source: candidate.sourceProvider,
},
{
label: "Source ID",
value: candidate.sourceBusinessId ?? candidate.placeId,
source: candidate.sourceProvider,
},
],
createdAt: Date.now(),
});
@@ -634,7 +659,6 @@ export const persistDiscoveredLeads = internalMutation({
skippedDuplicates,
skippedBlacklisted,
errors,
websiteEnrichmentQueue,
};
},
});

View File

@@ -263,7 +263,10 @@ export const create = mutation({
googleRating: v.optional(v.number()),
googleUserRatingCount: v.optional(v.number()),
googleBusinessStatus: v.optional(v.string()),
sourceProvider: v.optional(v.literal("google_places")),
sourceProvider: v.optional(
v.union(v.literal("google_places"), v.literal("local_business_data")),
),
sourceBusinessId: v.optional(v.string()),
sourceFetchedAt: v.optional(v.number()),
websiteUrl: v.optional(v.string()),
websiteDomain: v.optional(v.string()),
@@ -285,6 +288,7 @@ export const create = mutation({
duplicateOfLeadId: v.optional(v.id("leads")),
blacklistStatus: v.optional(leadBlacklistStatus),
normalizedGooglePlaceId: v.optional(v.string()),
normalizedSourceBusinessId: v.optional(v.string()),
notes: v.optional(v.string()),
},
handler: async (ctx, args) => {
@@ -298,6 +302,7 @@ export const create = mutation({
normalizedCompanyName: args.normalizedCompanyName,
normalizedAddress: args.normalizedAddress,
normalizedGooglePlaceId: args.normalizedGooglePlaceId,
normalizedSourceBusinessId: args.normalizedSourceBusinessId,
priority: args.priority ?? "medium",
contactStatus: args.contactStatus ?? "new",
duplicateStatus: args.duplicateStatus ?? "unchecked",

View File

@@ -1,6 +1,11 @@
import { internal } from "./_generated/api";
import type { Doc, Id } from "./_generated/dataModel";
import { internalMutation } from "./_generated/server";
import {
internalMutation,
mutation,
query,
} from "./_generated/server";
import type { MutationCtx, QueryCtx } from "./_generated/server";
import { v } from "convex/values";
const PAGE_SPEED_COUNTER_TEMPLATE = {
@@ -17,6 +22,13 @@ type PageSpeedLead = Pick<
> & {
websiteUrl: string;
};
type AuditStartState = {
leadId: Id<"leads">;
canStart: boolean;
reason?: string;
activeRunId?: Id<"agentRuns">;
activeRunStatus?: Doc<"agentRuns">["status"];
};
const runStatus = v.union(
v.literal("pending"),
@@ -39,6 +51,231 @@ const pageSpeedErrorType = v.union(
v.literal("unknown"),
);
const requireOperator = async (ctx: MutationCtx | QueryCtx) => {
const identity = await ctx.auth.getUserIdentity();
if (!identity) {
throw new Error("Nicht autorisiert.");
}
};
async function getActivePageSpeedAuditRun(
ctx: MutationCtx | QueryCtx,
leadId: Id<"leads">,
) {
const existingPending = await ctx.db
.query("agentRuns")
.withIndex("by_type_and_status_and_leadId", (q) =>
q.eq("type", "audit").eq("status", "pending").eq("leadId", leadId),
)
.take(1);
if (existingPending[0]) {
return existingPending[0];
}
const existingRunning = await ctx.db
.query("agentRuns")
.withIndex("by_type_and_status_and_leadId", (q) =>
q.eq("type", "audit").eq("status", "running").eq("leadId", leadId),
)
.take(1);
return existingRunning[0] ?? null;
}
async function getActiveAuditGenerationRun(
ctx: MutationCtx | QueryCtx,
leadId: Id<"leads">,
) {
const existingPending = await ctx.db
.query("agentRuns")
.withIndex("by_type_and_status_and_leadId", (q) =>
q
.eq("type", "audit_generation")
.eq("status", "pending")
.eq("leadId", leadId),
)
.take(1);
if (existingPending[0]) {
return existingPending[0];
}
const existingRunning = await ctx.db
.query("agentRuns")
.withIndex("by_type_and_status_and_leadId", (q) =>
q
.eq("type", "audit_generation")
.eq("status", "running")
.eq("leadId", leadId),
)
.take(1);
return existingRunning[0] ?? null;
}
async function getLeadAuditStartState(
ctx: MutationCtx | QueryCtx,
leadId: Id<"leads">,
): Promise<AuditStartState> {
const lead = await ctx.db.get(leadId);
if (!lead) {
return {
leadId,
canStart: false,
reason: "Lead nicht gefunden.",
};
}
if (
lead.priority === "blocked" ||
lead.priority === "defer" ||
lead.blacklistStatus === "blocked" ||
lead.contactStatus === "do_not_contact"
) {
return {
leadId,
canStart: false,
reason: "Lead ist gesperrt oder zurueckgestellt.",
};
}
if (!lead.websiteUrl) {
return {
leadId,
canStart: false,
reason: "Keine Website hinterlegt.",
};
}
const activeAuditRun =
(await getActivePageSpeedAuditRun(ctx, leadId)) ??
(await getActiveAuditGenerationRun(ctx, leadId));
if (activeAuditRun) {
return {
leadId,
canStart: false,
reason: "Audit laeuft bereits.",
activeRunId: activeAuditRun._id,
activeRunStatus: activeAuditRun.status,
};
}
return {
leadId,
canStart: true,
};
}
async function queueLeadPageSpeedAuditForLead(
ctx: MutationCtx,
args: {
leadId: Id<"leads">;
parentRunId?: Id<"agentRuns">;
triggeredBy: "internal" | "manual";
},
): Promise<Id<"agentRuns"> | null> {
const state = await getLeadAuditStartState(ctx, args.leadId);
if (!state.canStart) {
return state.activeRunId ?? null;
}
const now = Date.now();
const runId = await ctx.db.insert("agentRuns", {
type: "audit",
leadId: args.leadId,
status: "pending",
currentStep: "pagespeed_insights",
counters: PAGE_SPEED_COUNTER_TEMPLATE,
createdAt: now,
updatedAt: now,
});
await ctx.db.insert("agentRunEvents", {
runId,
level: "info",
message:
args.triggeredBy === "manual"
? "Audit-Start wurde manuell angefordert."
: "PageSpeed-Analyse wurde in die Warteschlange gesetzt.",
details: [
{ label: "Lead", value: args.leadId },
...(args.parentRunId ? [{ label: "Parent-Run", value: args.parentRunId }] : []),
],
createdAt: now,
});
await ctx.scheduler.runAfter(
0,
internal.pageSpeedAction.processPageSpeedAudit,
{
runId,
},
);
return runId;
}
export const getLeadAuditStartStates = query({
args: {
leadIds: v.array(v.id("leads")),
},
returns: v.array(
v.object({
leadId: v.id("leads"),
canStart: v.boolean(),
reason: v.optional(v.string()),
activeRunId: v.optional(v.id("agentRuns")),
activeRunStatus: v.optional(runStatus),
}),
),
handler: async (ctx, args): Promise<AuditStartState[]> => {
await requireOperator(ctx);
const states: AuditStartState[] = [];
for (const leadId of args.leadIds.slice(0, 120)) {
states.push(await getLeadAuditStartState(ctx, leadId));
}
return states;
},
});
export const requestLeadAudit = mutation({
args: {
leadId: v.id("leads"),
},
returns: v.object({
runId: v.union(v.id("agentRuns"), v.null()),
message: v.string(),
}),
handler: async (ctx, args): Promise<{ runId: Id<"agentRuns"> | null; message: string }> => {
await requireOperator(ctx);
const state = await getLeadAuditStartState(ctx, args.leadId);
if (!state.canStart) {
return {
runId: state.activeRunId ?? null,
message: state.reason ?? "Audit kann aktuell nicht gestartet werden.",
};
}
const runId = await queueLeadPageSpeedAuditForLead(ctx, {
leadId: args.leadId,
triggeredBy: "manual",
});
return {
runId,
message: "Audit-Start wurde manuell angefordert.",
};
},
});
export const queueLeadPageSpeedAudit = internalMutation({
args: {
leadId: v.id("leads"),
@@ -46,68 +283,11 @@ export const queueLeadPageSpeedAudit = internalMutation({
},
returns: v.union(v.id("agentRuns"), v.null()),
handler: async (ctx, args): Promise<Id<"agentRuns"> | null> => {
const now = Date.now();
const lead = await ctx.db.get(args.leadId);
if (!lead || lead.priority === "blocked" || lead.priority === "defer") {
return null;
}
if (!lead.websiteUrl) {
return null;
}
const existingPending = await ctx.db
.query("agentRuns")
.withIndex("by_type_and_status_and_leadId", (q) =>
q.eq("type", "audit").eq("status", "pending").eq("leadId", args.leadId),
)
.take(1);
const existingRunning = await ctx.db
.query("agentRuns")
.withIndex("by_type_and_status_and_leadId", (q) =>
q.eq("type", "audit").eq("status", "running").eq("leadId", args.leadId),
)
.take(1);
if (existingPending.length > 0) {
return existingPending[0]._id;
}
if (existingRunning.length > 0) {
return existingRunning[0]._id;
}
const runId = await ctx.db.insert("agentRuns", {
type: "audit",
return await queueLeadPageSpeedAuditForLead(ctx, {
leadId: args.leadId,
status: "pending",
currentStep: "pagespeed_insights",
counters: PAGE_SPEED_COUNTER_TEMPLATE,
createdAt: now,
updatedAt: now,
parentRunId: args.parentRunId,
triggeredBy: "internal",
});
await ctx.db.insert("agentRunEvents", {
runId,
level: "info",
message: "PageSpeed-Analyse wurde in die Warteschlange gesetzt.",
details: [
{ label: "Lead", value: args.leadId },
...(args.parentRunId ? [{ label: "Parent-Run", value: args.parentRunId }] : []),
],
createdAt: now,
});
await ctx.scheduler.runAfter(
0,
internal.pageSpeedAction.processPageSpeedAudit,
{
runId,
},
);
return runId;
},
});

View File

@@ -87,6 +87,7 @@ const blacklistType = v.union(
v.literal("phone"),
v.literal("company"),
v.literal("google_place_id"),
v.literal("source_business_id"),
);
const websiteEnrichmentPageKind = v.union(
v.literal("homepage"),
@@ -250,13 +251,17 @@ export default defineSchema({
postalCode: v.optional(v.string()),
googlePlaceId: v.optional(v.string()),
normalizedGooglePlaceId: v.optional(v.string()),
sourceBusinessId: v.optional(v.string()),
normalizedSourceBusinessId: v.optional(v.string()),
googleMapsUrl: v.optional(v.string()),
googlePrimaryType: v.optional(v.string()),
googleTypes: v.optional(v.array(v.string())),
googleRating: v.optional(v.number()),
googleUserRatingCount: v.optional(v.number()),
googleBusinessStatus: v.optional(v.string()),
sourceProvider: v.optional(v.literal("google_places")),
sourceProvider: v.optional(
v.union(v.literal("google_places"), v.literal("local_business_data")),
),
sourceFetchedAt: v.optional(v.number()),
websiteUrl: v.optional(v.string()),
websiteDomain: v.optional(v.string()),
@@ -292,6 +297,7 @@ export default defineSchema({
"normalizedAddress",
])
.index("by_normalizedGooglePlaceId", ["normalizedGooglePlaceId"])
.index("by_normalizedSourceBusinessId", ["normalizedSourceBusinessId"])
.index("by_googlePlaceId", ["googlePlaceId"])
.index("by_websiteDomain", ["websiteDomain"])
.index("by_normalizedCompanyName", ["normalizedCompanyName"])

View File

@@ -951,27 +951,6 @@ async function processLeadEnrichmentWithoutBrowser(
});
}
try {
await ctx.runMutation(internal.pageSpeed.queueLeadPageSpeedAudit, {
leadId: lead._id,
parentRunId: runId,
});
} catch (pageSpeedQueueError) {
await ctx.runMutation(internal.runs.appendEventInternal, {
runId,
level: "warning",
message: "PageSpeed-Analyse konnte nicht in die Warteschlange gesetzt werden.",
details: [
{ label: "Lead", value: lead._id },
{
label: "Fehler",
value: messageFromError(pageSpeedQueueError),
source: "pagespeed_queue",
},
],
});
}
await ctx.runMutation(internal.websiteEnrichment.finishLeadEnrichmentRun, {
runId,
status: "succeeded",
@@ -1012,27 +991,6 @@ export const processLeadEnrichment = internalAction({
const rootUrl = normalizeCrawlUrl(started.lead.websiteUrl);
if (!rootUrl) {
try {
await ctx.runMutation(internal.pageSpeed.queueLeadPageSpeedAudit, {
leadId: started.lead._id,
parentRunId: runId,
});
} catch (pageSpeedQueueError) {
await ctx.runMutation(internal.runs.appendEventInternal, {
runId,
level: "warning",
message: "PageSpeed-Analyse konnte nicht in die Warteschlange gesetzt werden.",
details: [
{ label: "Lead", value: started.lead._id },
{
label: "Fehler",
value: messageFromError(pageSpeedQueueError),
source: "pagespeed_queue",
},
],
});
}
await ctx.runMutation(internal.websiteEnrichment.finishLeadEnrichmentRun, {
runId,
status: "failed",
@@ -1341,27 +1299,6 @@ export const processLeadEnrichment = internalAction({
});
}
try {
await ctx.runMutation(internal.pageSpeed.queueLeadPageSpeedAudit, {
leadId: started.lead._id,
parentRunId: runId,
});
} catch (pageSpeedQueueError) {
await ctx.runMutation(internal.runs.appendEventInternal, {
runId,
level: "warning",
message: "PageSpeed-Analyse konnte nicht in die Warteschlange gesetzt werden.",
details: [
{ label: "Lead", value: started.lead._id },
{
label: "Fehler",
value: messageFromError(pageSpeedQueueError),
source: "pagespeed_queue",
},
],
});
}
await ctx.runMutation(internal.websiteEnrichment.finishLeadEnrichmentRun, {
runId,
status: "succeeded",
@@ -1400,26 +1337,6 @@ export const processLeadEnrichment = internalAction({
});
if (started) {
try {
await ctx.runMutation(internal.pageSpeed.queueLeadPageSpeedAudit, {
leadId: started.lead._id,
parentRunId: runId,
});
} catch (pageSpeedQueueError) {
await ctx.runMutation(internal.runs.appendEventInternal, {
runId,
level: "warning",
message: "PageSpeed-Analyse konnte nicht in die Warteschlange gesetzt werden.",
details: [
{ label: "Lead", value: started.lead._id },
{
label: "Fehler",
value: messageFromError(pageSpeedQueueError),
source: "pagespeed_queue",
},
],
});
}
await ctx.runMutation(internal.websiteEnrichment.patchLeadFromWebsiteEnrichment, {
leadId: started.lead._id,
currentContactStatus: started.lead.contactStatus,