208 lines
4.8 KiB
TypeScript
208 lines
4.8 KiB
TypeScript
import { v } from "convex/values";
|
|
|
|
import {
|
|
RUN_EVENT_LEVELS,
|
|
RUN_STATUSES,
|
|
RUN_TYPES,
|
|
normalizeListLimit,
|
|
} from "./domain";
|
|
import type { Id } from "./_generated/dataModel";
|
|
import { internalMutation, mutation, query } from "./_generated/server";
|
|
import type { MutationCtx, QueryCtx } from "./_generated/server";
|
|
|
|
const runType = v.union(...RUN_TYPES.map((type) => v.literal(type)));
|
|
const runStatus = v.union(...RUN_STATUSES.map((status) => v.literal(status)));
|
|
const eventLevel = v.union(
|
|
...RUN_EVENT_LEVELS.map((level) => v.literal(level)),
|
|
);
|
|
const appendEventArgs = {
|
|
runId: v.id("agentRuns"),
|
|
level: eventLevel,
|
|
message: v.string(),
|
|
details: v.optional(
|
|
v.array(
|
|
v.object({
|
|
label: v.string(),
|
|
value: v.string(),
|
|
source: v.optional(v.string()),
|
|
}),
|
|
),
|
|
),
|
|
};
|
|
|
|
type AppendEventArgs = {
|
|
runId: Id<"agentRuns">;
|
|
level: (typeof RUN_EVENT_LEVELS)[number];
|
|
message: string;
|
|
details?: { label: string; value: string; source?: string }[];
|
|
};
|
|
|
|
const requireOperator = async (ctx: MutationCtx | QueryCtx) => {
|
|
const identity = await ctx.auth.getUserIdentity();
|
|
if (!identity) {
|
|
throw new Error("Nicht autorisiert.");
|
|
}
|
|
};
|
|
|
|
async function appendRunEvent(
|
|
ctx: MutationCtx,
|
|
args: AppendEventArgs,
|
|
) {
|
|
return await ctx.db.insert("agentRunEvents", {
|
|
...args,
|
|
createdAt: Date.now(),
|
|
});
|
|
}
|
|
|
|
export const create = mutation({
|
|
args: {
|
|
type: runType,
|
|
campaignId: v.optional(v.id("campaigns")),
|
|
leadId: v.optional(v.id("leads")),
|
|
auditId: v.optional(v.id("audits")),
|
|
status: v.optional(runStatus),
|
|
currentStep: v.optional(v.string()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
await requireOperator(ctx);
|
|
const now = Date.now();
|
|
|
|
return await ctx.db.insert("agentRuns", {
|
|
...args,
|
|
status: args.status ?? "pending",
|
|
counters: {
|
|
leadsFound: 0,
|
|
leadsCreated: 0,
|
|
auditsCreated: 0,
|
|
outreachPrepared: 0,
|
|
errors: 0,
|
|
},
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
},
|
|
});
|
|
|
|
export const updateStatus = mutation({
|
|
args: {
|
|
id: v.id("agentRuns"),
|
|
status: runStatus,
|
|
currentStep: v.optional(v.string()),
|
|
errorSummary: v.optional(v.string()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
await requireOperator(ctx);
|
|
const now = Date.now();
|
|
const patch: {
|
|
status: typeof args.status;
|
|
updatedAt: number;
|
|
currentStep?: string;
|
|
errorSummary?: string;
|
|
startedAt?: number;
|
|
finishedAt?: number;
|
|
} = {
|
|
status: args.status,
|
|
updatedAt: now,
|
|
};
|
|
|
|
if (args.currentStep !== undefined) {
|
|
patch.currentStep = args.currentStep;
|
|
}
|
|
if (args.errorSummary !== undefined) {
|
|
patch.errorSummary = args.errorSummary;
|
|
}
|
|
if (args.status === "running") {
|
|
patch.startedAt = now;
|
|
}
|
|
if (
|
|
args.status === "succeeded" ||
|
|
args.status === "failed" ||
|
|
args.status === "canceled"
|
|
) {
|
|
patch.finishedAt = now;
|
|
}
|
|
|
|
await ctx.db.patch(args.id, patch);
|
|
return args.id;
|
|
},
|
|
});
|
|
|
|
export const list = query({
|
|
args: {
|
|
status: v.optional(runStatus),
|
|
type: v.optional(runType),
|
|
limit: v.optional(v.number()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
await requireOperator(ctx);
|
|
const limit = normalizeListLimit(args.limit);
|
|
|
|
if (args.type && args.status) {
|
|
const type = args.type;
|
|
const status = args.status;
|
|
|
|
return await ctx.db
|
|
.query("agentRuns")
|
|
.withIndex("by_type_and_status", (q) =>
|
|
q.eq("type", type).eq("status", status),
|
|
)
|
|
.order("desc")
|
|
.take(limit);
|
|
}
|
|
|
|
if (args.type) {
|
|
const type = args.type;
|
|
|
|
return await ctx.db
|
|
.query("agentRuns")
|
|
.withIndex("by_type", (q) => q.eq("type", type))
|
|
.order("desc")
|
|
.take(limit);
|
|
}
|
|
|
|
if (args.status) {
|
|
const status = args.status;
|
|
|
|
return await ctx.db
|
|
.query("agentRuns")
|
|
.withIndex("by_status", (q) => q.eq("status", status))
|
|
.order("desc")
|
|
.take(limit);
|
|
}
|
|
|
|
return await ctx.db.query("agentRuns").order("desc").take(limit);
|
|
},
|
|
});
|
|
|
|
export const appendEvent = mutation({
|
|
args: appendEventArgs,
|
|
handler: async (ctx, args) => {
|
|
await requireOperator(ctx);
|
|
return await appendRunEvent(ctx, args);
|
|
},
|
|
});
|
|
|
|
export const appendEventInternal = internalMutation({
|
|
args: appendEventArgs,
|
|
handler: async (ctx, args) => {
|
|
return await appendRunEvent(ctx, args);
|
|
},
|
|
});
|
|
|
|
export const listEvents = query({
|
|
args: {
|
|
runId: v.id("agentRuns"),
|
|
limit: v.optional(v.number()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
await requireOperator(ctx);
|
|
const limit = normalizeListLimit(args.limit);
|
|
|
|
return await ctx.db
|
|
.query("agentRunEvents")
|
|
.withIndex("by_runId_and_createdAt", (q) => q.eq("runId", args.runId))
|
|
.order("desc")
|
|
.take(limit);
|
|
},
|
|
});
|