Externalize audit pipeline services

This commit is contained in:
2026-06-07 23:06:31 +02:00
parent 470fb0f348
commit a45b92ea0a
42 changed files with 3141 additions and 247 deletions

View File

@@ -28,6 +28,7 @@ import type * as runs from "../runs.js";
import type * as scheduledJobs from "../scheduledJobs.js";
import type * as settings from "../settings.js";
import type * as storage from "../storage.js";
import type * as usageEvents from "../usageEvents.js";
import type * as websiteEnrichment from "../websiteEnrichment.js";
import type * as websiteEnrichmentAction from "../websiteEnrichmentAction.js";
@@ -58,6 +59,7 @@ declare const fullApi: ApiFromModules<{
scheduledJobs: typeof scheduledJobs;
settings: typeof settings;
storage: typeof storage;
usageEvents: typeof usageEvents;
websiteEnrichment: typeof websiteEnrichment;
websiteEnrichmentAction: typeof websiteEnrichmentAction;
}>;

View File

@@ -89,6 +89,7 @@ type AuditGenerationEvidence = {
technicalChecks: AuditGenerationEvidenceTechnicalCheck[];
screenshots: AuditGenerationEvidenceScreenshot[];
pageSpeedInputs: PageSpeedMinimalAuditResult[];
externalMarkdown?: string;
};
function byteLength(value: string) {
@@ -199,6 +200,8 @@ const secretHints = [
"SMTP_USER",
"BETTER_AUTH_SECRET",
"RYBBIT_API_KEY",
"SCREENSHOTONE_API_KEY",
"JINA_API_KEY",
];
function sanitizeSecretCandidates(value: string | undefined): string | undefined {
@@ -226,7 +229,7 @@ function sanitizeSecretCandidates(value: string | undefined): string | undefined
}
function escapeRegExp(value: string) {
return value.replace(/[.*+?^${}()|[\\]\\]/g, "\\$&");
return value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
type StartLeadSnapshot = Pick<
@@ -549,6 +552,35 @@ export const persistAuditGenerationResult = internalMutation({
},
});
export const persistExternalCaptureScreenshot = internalMutation({
args: {
leadId: v.id("leads"),
runId: v.id("agentRuns"),
storageId: v.id("_storage"),
viewport: v.union(v.literal("desktop"), v.literal("mobile")),
sourceUrl: v.string(),
capturedAt: v.number(),
width: v.number(),
height: v.number(),
mimeType: v.string(),
},
returns: v.id("websiteCrawlScreenshots"),
handler: async (ctx, args): Promise<Id<"websiteCrawlScreenshots">> => {
return await ctx.db.insert("websiteCrawlScreenshots", {
leadId: args.leadId,
runId: args.runId,
storageId: args.storageId,
viewport: args.viewport,
sourceUrl: args.sourceUrl,
capturedAt: args.capturedAt,
width: args.width,
height: args.height,
mimeType: args.mimeType,
createdAt: Date.now(),
});
},
});
export const finishAuditGenerationRun = internalMutation({
args: {
runId: v.id("agentRuns"),

View File

@@ -15,8 +15,9 @@ const auditStatus = v.union(
);
const usedSkillsValidator = v.array(
v.object({
id: v.optional(v.string()),
name: v.string(),
category: v.string(),
category: v.optional(v.string()),
version: v.optional(v.string()),
source: v.optional(v.string()),
}),
@@ -179,6 +180,8 @@ export const create = mutation({
ctaType: v.optional(v.string()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const now = Date.now();
const existing = await ctx.db
.query("audits")
@@ -201,6 +204,8 @@ export const create = mutation({
export const getDetail = query({
args: { id: v.id("audits") },
handler: async (ctx, args) => {
await requireOperator(ctx);
const audit = await ctx.db.get(args.id);
if (!audit) {
return null;
@@ -214,6 +219,8 @@ export const getDetail = query({
export const get = query({
args: { id: v.id("audits") },
handler: async (ctx, args) => {
await requireOperator(ctx);
return await ctx.db.get(args.id);
},
});
@@ -302,6 +309,8 @@ export const upsertFromAuditGeneration = internalMutation({
export const getBySlug = query({
args: { slug: v.string() },
handler: async (ctx, args) => {
await requireOperator(ctx);
const audits = await ctx.db
.query("audits")
.withIndex("by_slug", (q) => q.eq("slug", args.slug))
@@ -496,6 +505,8 @@ export const list = query({
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const limit = normalizeListLimit(args.limit);
if (args.leadId) {

View File

@@ -119,6 +119,18 @@ export const PAGE_SPEED_ERROR_TYPES = [
"api_error",
"unknown",
] as const;
export const USAGE_EVENT_PROVIDERS = [
"openrouter",
"screenshotone",
"jina",
"pagespeed",
"google_places",
] as const;
export const USAGE_EVENT_OPERATIONS = [
"audit_capture",
"audit_generation",
"lead_lookup",
] as const;
export type CampaignStatus = (typeof CAMPAIGN_STATUSES)[number];
export type LeadPriority = (typeof LEAD_PRIORITIES)[number];
@@ -143,6 +155,8 @@ export type ScreenshotViewport = (typeof SCREENSHOT_VIEWPORTS)[number];
export type PageSpeedStrategy = (typeof PAGE_SPEED_STRATEGIES)[number];
export type PageSpeedResultStatus = (typeof PAGE_SPEED_RESULT_STATUSES)[number];
export type PageSpeedErrorType = (typeof PAGE_SPEED_ERROR_TYPES)[number];
export type UsageEventProvider = (typeof USAGE_EVENT_PROVIDERS)[number];
export type UsageEventOperation = (typeof USAGE_EVENT_OPERATIONS)[number];
export type SettingsRow = {
key: string;

View File

@@ -3,7 +3,13 @@ import { v } from "convex/values";
import { getUsableContactEmailFromEntries } from "../lib/lead-discovery-google";
import { normalizeListLimit } from "./domain";
import type { Doc, Id } from "./_generated/dataModel";
import { mutation, query } from "./_generated/server";
import {
internalMutation,
internalQuery,
mutation,
query,
} from "./_generated/server";
import type { MutationCtx, QueryCtx } from "./_generated/server";
type LeadDoc = Doc<"leads">;
@@ -37,6 +43,74 @@ type LeadReviewPatch = {
contactPerson?: string;
};
type LeadReviewUpdateArgs = {
id: Id<"leads">;
priority?: LeadDoc["priority"];
priorityReason?: string;
contactStatus?: LeadDoc["contactStatus"];
contactStatusReason?: string;
notes?: string;
duplicateStatus?: LeadDoc["duplicateStatus"];
duplicateReason?: string;
blacklistStatus?: LeadDoc["blacklistStatus"];
blacklistReason?: string;
duplicateOfLeadId?: Id<"leads">;
applyBlacklist?: boolean;
reviewEmail?: string;
reviewEmailSource?: string;
reviewContactPerson?: string;
reviewIsBusinessContactAddress?: boolean;
};
const leadPriority = v.union(
v.literal("high"),
v.literal("medium"),
v.literal("low"),
v.literal("defer"),
v.literal("blocked"),
);
const leadContactStatus = v.union(
v.literal("new"),
v.literal("missing_contact"),
v.literal("audit_ready"),
v.literal("outreach_ready"),
v.literal("contacted"),
v.literal("replied"),
v.literal("do_not_contact"),
);
const leadDuplicateStatus = v.union(
v.literal("unchecked"),
v.literal("unique"),
v.literal("possible_duplicate"),
v.literal("duplicate"),
);
const leadBlacklistStatus = v.union(v.literal("clear"), v.literal("blocked"));
const reviewUpdateArgs = {
id: v.id("leads"),
priority: v.optional(leadPriority),
priorityReason: v.optional(v.string()),
contactStatus: v.optional(leadContactStatus),
contactStatusReason: v.optional(v.string()),
notes: v.optional(v.string()),
duplicateStatus: v.optional(leadDuplicateStatus),
duplicateReason: v.optional(v.string()),
blacklistStatus: v.optional(leadBlacklistStatus),
blacklistReason: v.optional(v.string()),
duplicateOfLeadId: v.optional(v.id("leads")),
applyBlacklist: v.optional(v.boolean()),
reviewEmail: v.optional(v.string()),
reviewEmailSource: v.optional(v.string()),
reviewContactPerson: v.optional(v.string()),
reviewIsBusinessContactAddress: v.optional(v.boolean()),
};
const requireOperator = async (ctx: MutationCtx | QueryCtx) => {
const identity = await ctx.auth.getUserIdentity();
if (!identity) {
throw new Error("Nicht autorisiert.");
}
};
function buildReviewContactPatch(args: {
email?: string;
emailSource?: string;
@@ -88,6 +162,91 @@ function buildReviewContactPatch(args: {
});
}
async function reviewUpdateLead(ctx: MutationCtx, args: LeadReviewUpdateArgs) {
const lead = await ctx.db.get(args.id);
if (!lead) {
return null;
}
const now = Date.now();
const patch: LeadReviewPatch = {
updatedAt: now,
};
if (args.priority !== undefined) {
patch.priority = args.priority;
}
if (args.priorityReason !== undefined) {
patch.priorityReason = args.priorityReason;
}
if (args.contactStatus !== undefined) {
patch.contactStatus = args.contactStatus;
}
if (args.contactStatusReason !== undefined) {
patch.contactStatusReason = args.contactStatusReason;
}
if (args.notes !== undefined) {
patch.notes = args.notes;
}
if (args.duplicateStatus !== undefined) {
patch.duplicateStatus = args.duplicateStatus;
}
if (args.duplicateReason !== undefined) {
patch.duplicateReason = args.duplicateReason;
}
if (args.duplicateOfLeadId !== undefined) {
patch.duplicateOfLeadId = args.duplicateOfLeadId;
}
if (args.applyBlacklist) {
patch.blacklistStatus = "blocked";
if (args.blacklistReason !== undefined) {
patch.blacklistReason = args.blacklistReason;
} else if (lead.blacklistReason === undefined) {
patch.blacklistReason = "Manuell in der Review als Sperrgrund gesetzt.";
}
if (args.priority === undefined || args.priority !== "blocked") {
patch.priority = "blocked";
}
} else if (args.applyBlacklist === false && args.blacklistStatus !== undefined) {
patch.blacklistStatus = args.blacklistStatus;
patch.blacklistReason = args.blacklistReason;
} else if (args.blacklistStatus !== undefined) {
patch.blacklistStatus = args.blacklistStatus;
patch.blacklistReason = args.blacklistReason;
}
const reviewContactPatch = buildReviewContactPatch({
email: args.reviewEmail,
emailSource: args.reviewEmailSource,
contactPerson: args.reviewContactPerson,
isBusinessContactAddress: args.reviewIsBusinessContactAddress,
explicitContactStatus: args.contactStatus !== undefined,
currentContactStatus: lead.contactStatus,
});
if (reviewContactPatch?.patch) {
Object.assign(patch, reviewContactPatch.patch);
}
if (
reviewContactPatch !== null &&
reviewContactPatch.setContactStatus !== undefined &&
args.contactStatus === undefined
) {
patch.contactStatus = reviewContactPatch.setContactStatus;
}
if (args.blacklistReason !== undefined && patch.blacklistStatus === undefined) {
patch.blacklistStatus = "blocked";
patch.blacklistReason = args.blacklistReason;
}
await ctx.db.patch(args.id, patch);
return args.id;
}
export const create = mutation({
args: {
campaignId: v.optional(v.id("campaigns")),
@@ -116,44 +275,20 @@ export const create = mutation({
email: v.optional(v.string()),
emailSource: v.optional(v.string()),
contactPerson: v.optional(v.string()),
priority: v.optional(
v.union(
v.literal("high"),
v.literal("medium"),
v.literal("low"),
v.literal("defer"),
v.literal("blocked"),
),
),
priority: v.optional(leadPriority),
priorityReason: v.optional(v.string()),
contactStatus: v.optional(
v.union(
v.literal("new"),
v.literal("missing_contact"),
v.literal("audit_ready"),
v.literal("outreach_ready"),
v.literal("contacted"),
v.literal("replied"),
v.literal("do_not_contact"),
),
),
contactStatus: v.optional(leadContactStatus),
contactStatusReason: v.optional(v.string()),
duplicateStatus: v.optional(
v.union(
v.literal("unchecked"),
v.literal("unique"),
v.literal("possible_duplicate"),
v.literal("duplicate"),
),
),
duplicateStatus: v.optional(leadDuplicateStatus),
duplicateReason: v.optional(v.string()),
blacklistReason: v.optional(v.string()),
duplicateOfLeadId: v.optional(v.id("leads")),
blacklistStatus: v.optional(v.union(v.literal("clear"), v.literal("blocked"))),
blacklistStatus: v.optional(leadBlacklistStatus),
normalizedGooglePlaceId: v.optional(v.string()),
notes: v.optional(v.string()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const now = Date.now();
return await ctx.db.insert("leads", {
@@ -174,136 +309,29 @@ export const create = mutation({
});
export const reviewUpdate = mutation({
args: {
id: v.id("leads"),
priority: v.optional(
v.union(
v.literal("high"),
v.literal("medium"),
v.literal("low"),
v.literal("defer"),
v.literal("blocked"),
),
),
priorityReason: v.optional(v.string()),
contactStatus: v.optional(
v.union(
v.literal("new"),
v.literal("missing_contact"),
v.literal("audit_ready"),
v.literal("outreach_ready"),
v.literal("contacted"),
v.literal("replied"),
v.literal("do_not_contact"),
),
),
contactStatusReason: v.optional(v.string()),
notes: v.optional(v.string()),
duplicateStatus: v.optional(
v.union(
v.literal("unchecked"),
v.literal("unique"),
v.literal("possible_duplicate"),
v.literal("duplicate"),
),
),
duplicateReason: v.optional(v.string()),
blacklistStatus: v.optional(v.union(v.literal("clear"), v.literal("blocked"))),
blacklistReason: v.optional(v.string()),
duplicateOfLeadId: v.optional(v.id("leads")),
applyBlacklist: v.optional(v.boolean()),
reviewEmail: v.optional(v.string()),
reviewEmailSource: v.optional(v.string()),
reviewContactPerson: v.optional(v.string()),
reviewIsBusinessContactAddress: v.optional(v.boolean()),
},
args: reviewUpdateArgs,
handler: async (ctx, args) => {
const lead = await ctx.db.get(args.id);
await requireOperator(ctx);
return await reviewUpdateLead(ctx, args);
},
});
if (!lead) {
return null;
}
const now = Date.now();
const patch: LeadReviewPatch = {
updatedAt: now,
};
if (args.priority !== undefined) {
patch.priority = args.priority;
}
if (args.priorityReason !== undefined) {
patch.priorityReason = args.priorityReason;
}
if (args.contactStatus !== undefined) {
patch.contactStatus = args.contactStatus;
}
if (args.contactStatusReason !== undefined) {
patch.contactStatusReason = args.contactStatusReason;
}
if (args.notes !== undefined) {
patch.notes = args.notes;
}
if (args.duplicateStatus !== undefined) {
patch.duplicateStatus = args.duplicateStatus;
}
if (args.duplicateReason !== undefined) {
patch.duplicateReason = args.duplicateReason;
}
if (args.duplicateOfLeadId !== undefined) {
patch.duplicateOfLeadId = args.duplicateOfLeadId;
}
if (args.applyBlacklist) {
patch.blacklistStatus = "blocked";
if (args.blacklistReason !== undefined) {
patch.blacklistReason = args.blacklistReason;
} else if (lead.blacklistReason === undefined) {
patch.blacklistReason = "Manuell in der Review als Sperrgrund gesetzt.";
}
if (args.priority === undefined || args.priority !== "blocked") {
patch.priority = "blocked";
}
} else if (args.applyBlacklist === false && args.blacklistStatus !== undefined) {
patch.blacklistStatus = args.blacklistStatus;
patch.blacklistReason = args.blacklistReason;
} else if (args.blacklistStatus !== undefined) {
patch.blacklistStatus = args.blacklistStatus;
patch.blacklistReason = args.blacklistReason;
}
const reviewContactPatch = buildReviewContactPatch({
email: args.reviewEmail,
emailSource: args.reviewEmailSource,
contactPerson: args.reviewContactPerson,
isBusinessContactAddress: args.reviewIsBusinessContactAddress,
explicitContactStatus: args.contactStatus !== undefined,
currentContactStatus: lead.contactStatus,
});
if (reviewContactPatch?.patch) {
Object.assign(patch, reviewContactPatch.patch);
}
if (
reviewContactPatch !== null &&
reviewContactPatch.setContactStatus !== undefined &&
args.contactStatus === undefined
) {
patch.contactStatus = reviewContactPatch.setContactStatus;
}
if (args.blacklistReason !== undefined && patch.blacklistStatus === undefined) {
patch.blacklistStatus = "blocked";
patch.blacklistReason = args.blacklistReason;
}
await ctx.db.patch(args.id, patch);
return args.id;
export const reviewUpdateInternal = internalMutation({
args: reviewUpdateArgs,
handler: async (ctx, args) => {
return await reviewUpdateLead(ctx, args);
},
});
export const get = query({
args: { id: v.id("leads") },
handler: async (ctx, args) => {
await requireOperator(ctx);
return await ctx.db.get(args.id);
},
});
export const getInternal = internalQuery({
args: { id: v.id("leads") },
handler: async (ctx, args) => {
return await ctx.db.get(args.id);
@@ -313,20 +341,11 @@ export const get = query({
export const list = query({
args: {
campaignId: v.optional(v.id("campaigns")),
contactStatus: v.optional(
v.union(
v.literal("new"),
v.literal("missing_contact"),
v.literal("audit_ready"),
v.literal("outreach_ready"),
v.literal("contacted"),
v.literal("replied"),
v.literal("do_not_contact"),
),
),
contactStatus: v.optional(leadContactStatus),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const limit = normalizeListLimit(args.limit);
if (args.campaignId) {
@@ -360,6 +379,7 @@ export const listFunnel = query({
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const limit = normalizeListLimit(args.limit);
const leads = await ctx.db.query("leads").order("desc").take(limit);

View File

@@ -1,6 +1,6 @@
"use node";
import { api, internal } from "./_generated/api";
import { internal } from "./_generated/api";
import { internalAction } from "./_generated/server";
import type { Id } from "./_generated/dataModel";
import type { ActionCtx } from "./_generated/server";
@@ -122,7 +122,7 @@ async function queueAuditGenerationAfterPageSpeed(
parentRunId: runId,
});
} catch (auditQueueError) {
await ctx.runMutation(api.runs.appendEvent, {
await ctx.runMutation(internal.runs.appendEventInternal, {
runId,
level: "warning",
message: "Audit-Generierung konnte nicht in die Warteschlange gesetzt werden.",
@@ -164,7 +164,7 @@ export const processPageSpeedAudit = internalAction({
errorSummary,
});
await ctx.runMutation(api.runs.appendEvent, {
await ctx.runMutation(internal.runs.appendEventInternal, {
runId: args.runId,
level: "error",
message: "PageSpeed-Analyse fehlgeschlagen.",
@@ -210,7 +210,7 @@ export const processPageSpeedAudit = internalAction({
fetchedAt,
});
await ctx.runMutation(api.runs.appendEvent, {
await ctx.runMutation(internal.runs.appendEventInternal, {
runId: args.runId,
level: "warning",
message: `PageSpeed-Analyse für ${strategy} fehlgeschlagen.`,
@@ -248,7 +248,7 @@ export const processPageSpeedAudit = internalAction({
normalized: toPersistedPageSpeedNormalizedResult(normalized),
});
await ctx.runMutation(api.runs.appendEvent, {
await ctx.runMutation(internal.runs.appendEventInternal, {
runId: args.runId,
level: "info",
message: `PageSpeed-Analyse für ${strategy} abgeschlossen.`,
@@ -274,7 +274,7 @@ export const processPageSpeedAudit = internalAction({
fetchedAt,
});
await ctx.runMutation(api.runs.appendEvent, {
await ctx.runMutation(internal.runs.appendEventInternal, {
runId: args.runId,
level: "warning",
message: `PageSpeed-Analyse für ${strategy} fehlgeschlagen.`,
@@ -310,7 +310,7 @@ export const processPageSpeedAudit = internalAction({
errorSummary,
});
await ctx.runMutation(api.runs.appendEvent, {
await ctx.runMutation(internal.runs.appendEventInternal, {
runId: args.runId,
level: "error",
message: "PageSpeed-Analyse fehlgeschlagen.",

View File

@@ -6,13 +6,53 @@ import {
RUN_TYPES,
normalizeListLimit,
} from "./domain";
import { mutation, query } from "./_generated/server";
import type { Id } from "./_generated/dataModel";
import { internalMutation, mutation, query } from "./_generated/server";
import type { MutationCtx, QueryCtx } from "./_generated/server";
const runType = v.union(...RUN_TYPES.map((type) => v.literal(type)));
const runStatus = v.union(...RUN_STATUSES.map((status) => v.literal(status)));
const eventLevel = v.union(
...RUN_EVENT_LEVELS.map((level) => v.literal(level)),
);
const appendEventArgs = {
runId: v.id("agentRuns"),
level: eventLevel,
message: v.string(),
details: v.optional(
v.array(
v.object({
label: v.string(),
value: v.string(),
source: v.optional(v.string()),
}),
),
),
};
type AppendEventArgs = {
runId: Id<"agentRuns">;
level: (typeof RUN_EVENT_LEVELS)[number];
message: string;
details?: { label: string; value: string; source?: string }[];
};
const requireOperator = async (ctx: MutationCtx | QueryCtx) => {
const identity = await ctx.auth.getUserIdentity();
if (!identity) {
throw new Error("Nicht autorisiert.");
}
};
async function appendRunEvent(
ctx: MutationCtx,
args: AppendEventArgs,
) {
return await ctx.db.insert("agentRunEvents", {
...args,
createdAt: Date.now(),
});
}
export const create = mutation({
args: {
@@ -24,6 +64,7 @@ export const create = mutation({
currentStep: v.optional(v.string()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const now = Date.now();
return await ctx.db.insert("agentRuns", {
@@ -50,6 +91,7 @@ export const updateStatus = mutation({
errorSummary: v.optional(v.string()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const now = Date.now();
const patch: {
status: typeof args.status;
@@ -92,6 +134,7 @@ export const list = query({
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const limit = normalizeListLimit(args.limit);
if (args.type && args.status) {
@@ -132,25 +175,17 @@ export const list = query({
});
export const appendEvent = mutation({
args: {
runId: v.id("agentRuns"),
level: eventLevel,
message: v.string(),
details: v.optional(
v.array(
v.object({
label: v.string(),
value: v.string(),
source: v.optional(v.string()),
}),
),
),
},
args: appendEventArgs,
handler: async (ctx, args) => {
return await ctx.db.insert("agentRunEvents", {
...args,
createdAt: Date.now(),
});
await requireOperator(ctx);
return await appendRunEvent(ctx, args);
},
});
export const appendEventInternal = internalMutation({
args: appendEventArgs,
handler: async (ctx, args) => {
return await appendRunEvent(ctx, args);
},
});
@@ -160,6 +195,7 @@ export const listEvents = query({
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
await requireOperator(ctx);
const limit = normalizeListLimit(args.limit);
return await ctx.db

View File

@@ -7,6 +7,8 @@ import {
RUN_EVENT_LEVELS,
RUN_STATUSES,
RUN_TYPES,
USAGE_EVENT_OPERATIONS,
USAGE_EVENT_PROVIDERS,
} from "./domain";
const campaignStatus = v.union(v.literal("active"), v.literal("paused"));
@@ -146,6 +148,12 @@ const pageSpeedErrorType = v.union(
v.literal("api_error"),
v.literal("unknown"),
);
const usageEventProvider = v.union(
...USAGE_EVENT_PROVIDERS.map((provider) => v.literal(provider)),
);
const usageEventOperation = v.union(
...USAGE_EVENT_OPERATIONS.map((operation) => v.literal(operation)),
);
const settingsValue = v.union(v.string(), v.number(), v.boolean(), v.null());
const auditMetricSummary = v.object({
performanceScore: v.optional(v.number()),
@@ -282,8 +290,9 @@ export default defineSchema({
usedSkills: v.optional(
v.array(
v.object({
id: v.optional(v.string()),
name: v.string(),
category: v.string(),
category: v.optional(v.string()),
version: v.optional(v.string()),
source: v.optional(v.string()),
}),
@@ -399,6 +408,39 @@ export default defineSchema({
.index("by_stage", ["stage"])
.index("by_leadId_and_stage", ["leadId", "stage"]),
usageEvents: defineTable({
provider: usageEventProvider,
operation: usageEventOperation,
runId: v.optional(v.id("agentRuns")),
leadId: v.optional(v.id("leads")),
auditId: v.optional(v.id("audits")),
estimatedCostUsd: v.number(),
tokens: v.optional(
v.object({
inputTokens: v.optional(v.number()),
outputTokens: v.optional(v.number()),
promptTokens: v.optional(v.number()),
completionTokens: v.optional(v.number()),
totalTokens: v.optional(v.number()),
cacheReadTokens: v.optional(v.number()),
}),
),
callCounts: v.optional(
v.object({
requests: v.optional(v.number()),
pages: v.optional(v.number()),
screenshots: v.optional(v.number()),
lookups: v.optional(v.number()),
}),
),
createdAt: v.number(),
})
.index("by_runId_and_createdAt", ["runId", "createdAt"])
.index("by_leadId_and_createdAt", ["leadId", "createdAt"])
.index("by_auditId_and_createdAt", ["auditId", "createdAt"])
.index("by_provider_and_createdAt", ["provider", "createdAt"])
.index("by_createdAt", ["createdAt"]),
websiteCrawlPages: defineTable({
leadId: v.id("leads"),
runId: v.optional(v.id("agentRuns")),

223
convex/usageEvents.ts Normal file
View File

@@ -0,0 +1,223 @@
import type { Doc, Id } from "./_generated/dataModel";
import { internalMutation, query } from "./_generated/server";
import type { QueryCtx } from "./_generated/server";
import {
normalizeListLimit,
USAGE_EVENT_OPERATIONS,
USAGE_EVENT_PROVIDERS,
} from "./domain";
import { v } from "convex/values";
const usageEventProvider = v.union(
...USAGE_EVENT_PROVIDERS.map((provider) => v.literal(provider)),
);
const usageEventOperation = v.union(
...USAGE_EVENT_OPERATIONS.map((operation) => v.literal(operation)),
);
const usageEventTokens = v.object({
inputTokens: v.optional(v.number()),
outputTokens: v.optional(v.number()),
promptTokens: v.optional(v.number()),
completionTokens: v.optional(v.number()),
totalTokens: v.optional(v.number()),
cacheReadTokens: v.optional(v.number()),
});
const usageEventCallCounts = v.object({
requests: v.optional(v.number()),
pages: v.optional(v.number()),
screenshots: v.optional(v.number()),
lookups: v.optional(v.number()),
});
const usageEventDoc = v.object({
_id: v.id("usageEvents"),
_creationTime: v.number(),
provider: usageEventProvider,
operation: usageEventOperation,
runId: v.optional(v.id("agentRuns")),
leadId: v.optional(v.id("leads")),
auditId: v.optional(v.id("audits")),
estimatedCostUsd: v.number(),
tokens: v.optional(usageEventTokens),
callCounts: v.optional(usageEventCallCounts),
createdAt: v.number(),
});
type UsageEventTokens = {
inputTokens?: number;
outputTokens?: number;
promptTokens?: number;
completionTokens?: number;
totalTokens?: number;
cacheReadTokens?: number;
};
type UsageEventCallCounts = {
requests?: number;
pages?: number;
screenshots?: number;
lookups?: number;
};
type UsageEventNumberArgs = {
estimatedCostUsd: number;
tokens?: UsageEventTokens;
callCounts?: UsageEventCallCounts;
};
const requireOperator = async (ctx: QueryCtx) => {
const identity = await ctx.auth.getUserIdentity();
if (!identity) {
throw new Error("Nicht autorisiert.");
}
};
function assertFiniteNonNegativeNumber(value: number, fieldName: string) {
if (!Number.isFinite(value) || value < 0) {
throw new Error(`${fieldName} must be a finite non-negative number.`);
}
}
function assertFiniteNonNegativeInteger(
value: number | undefined,
fieldName: string,
) {
if (value === undefined) {
return;
}
if (!Number.isFinite(value) || value < 0 || !Number.isInteger(value)) {
throw new Error(`${fieldName} must be a finite non-negative integer.`);
}
}
function assertValidUsageEventNumbers(args: UsageEventNumberArgs) {
assertFiniteNonNegativeNumber(args.estimatedCostUsd, "estimatedCostUsd");
assertFiniteNonNegativeInteger(args.tokens?.inputTokens, "tokens.inputTokens");
assertFiniteNonNegativeInteger(args.tokens?.outputTokens, "tokens.outputTokens");
assertFiniteNonNegativeInteger(args.tokens?.promptTokens, "tokens.promptTokens");
assertFiniteNonNegativeInteger(args.tokens?.completionTokens, "tokens.completionTokens");
assertFiniteNonNegativeInteger(args.tokens?.totalTokens, "tokens.totalTokens");
assertFiniteNonNegativeInteger(args.tokens?.cacheReadTokens, "tokens.cacheReadTokens");
assertFiniteNonNegativeInteger(args.callCounts?.requests, "callCounts.requests");
assertFiniteNonNegativeInteger(args.callCounts?.pages, "callCounts.pages");
assertFiniteNonNegativeInteger(args.callCounts?.screenshots, "callCounts.screenshots");
assertFiniteNonNegativeInteger(args.callCounts?.lookups, "callCounts.lookups");
}
export const recordUsageEvent = internalMutation({
args: {
provider: usageEventProvider,
operation: usageEventOperation,
runId: v.optional(v.id("agentRuns")),
leadId: v.optional(v.id("leads")),
auditId: v.optional(v.id("audits")),
estimatedCostUsd: v.number(),
tokens: v.optional(usageEventTokens),
callCounts: v.optional(usageEventCallCounts),
createdAt: v.optional(v.number()),
},
returns: v.id("usageEvents"),
handler: async (ctx, args): Promise<Id<"usageEvents">> => {
assertValidUsageEventNumbers(args);
const now = args.createdAt ?? Date.now();
return await ctx.db.insert("usageEvents", {
provider: args.provider,
operation: args.operation,
...(args.runId ? { runId: args.runId } : {}),
...(args.leadId ? { leadId: args.leadId } : {}),
...(args.auditId ? { auditId: args.auditId } : {}),
estimatedCostUsd: args.estimatedCostUsd,
...(args.tokens ? { tokens: args.tokens } : {}),
...(args.callCounts ? { callCounts: args.callCounts } : {}),
createdAt: now,
});
},
});
export const listLatestUsageEvents = query({
args: {
limit: v.optional(v.number()),
},
returns: v.array(usageEventDoc),
handler: async (ctx, args): Promise<Doc<"usageEvents">[]> => {
await requireOperator(ctx);
return await ctx.db
.query("usageEvents")
.withIndex("by_createdAt")
.order("desc")
.take(normalizeListLimit(args.limit));
},
});
export const listUsageEventsByRun = query({
args: {
runId: v.id("agentRuns"),
limit: v.optional(v.number()),
},
returns: v.array(usageEventDoc),
handler: async (ctx, args): Promise<Doc<"usageEvents">[]> => {
await requireOperator(ctx);
return await ctx.db
.query("usageEvents")
.withIndex("by_runId_and_createdAt", (q) => q.eq("runId", args.runId))
.order("desc")
.take(normalizeListLimit(args.limit));
},
});
export const listUsageEventsByLead = query({
args: {
leadId: v.id("leads"),
limit: v.optional(v.number()),
},
returns: v.array(usageEventDoc),
handler: async (ctx, args): Promise<Doc<"usageEvents">[]> => {
await requireOperator(ctx);
return await ctx.db
.query("usageEvents")
.withIndex("by_leadId_and_createdAt", (q) => q.eq("leadId", args.leadId))
.order("desc")
.take(normalizeListLimit(args.limit));
},
});
export const listUsageEventsByAudit = query({
args: {
auditId: v.id("audits"),
limit: v.optional(v.number()),
},
returns: v.array(usageEventDoc),
handler: async (ctx, args): Promise<Doc<"usageEvents">[]> => {
await requireOperator(ctx);
return await ctx.db
.query("usageEvents")
.withIndex("by_auditId_and_createdAt", (q) => q.eq("auditId", args.auditId))
.order("desc")
.take(normalizeListLimit(args.limit));
},
});
export const listUsageEventsByProvider = query({
args: {
provider: usageEventProvider,
limit: v.optional(v.number()),
},
returns: v.array(usageEventDoc),
handler: async (ctx, args): Promise<Doc<"usageEvents">[]> => {
await requireOperator(ctx);
return await ctx.db
.query("usageEvents")
.withIndex("by_provider_and_createdAt", (q) =>
q.eq("provider", args.provider),
)
.order("desc")
.take(normalizeListLimit(args.limit));
},
});