Refactor Convex image generation into queued background sub-processes

This commit is contained in:
2026-03-31 20:39:44 +02:00
parent 3ac8857025
commit df73c389a0
5 changed files with 359 additions and 147 deletions

View File

@@ -114,7 +114,6 @@ export default function AiImageNode({
} }
const modelId = nodeData.model ?? DEFAULT_MODEL_ID; const modelId = nodeData.model ?? DEFAULT_MODEL_ID;
const regenCreditCost = getModel(modelId)?.creditCost ?? 4;
await toast.promise( await toast.promise(
generateImage({ generateImage({
@@ -128,10 +127,10 @@ export default function AiImageNode({
}), }),
{ {
loading: msg.ai.generating.title, loading: msg.ai.generating.title,
success: msg.ai.generated.title, success: msg.ai.generationQueued.title,
error: msg.ai.generationFailed.title, error: msg.ai.generationFailed.title,
description: { description: {
success: msg.ai.generatedDesc(regenCreditCost), success: msg.ai.generationQueuedDesc,
error: msg.ai.creditsNotCharged, error: msg.ai.creditsNotCharged,
}, },
}, },

View File

@@ -249,10 +249,10 @@ export default function PromptNode({
}), }),
{ {
loading: msg.ai.generating.title, loading: msg.ai.generating.title,
success: msg.ai.generated.title, success: msg.ai.generationQueued.title,
error: msg.ai.generationFailed.title, error: msg.ai.generationFailed.title,
description: { description: {
success: msg.ai.generatedDesc(creditCost), success: msg.ai.generationQueuedDesc,
error: msg.ai.creditsNotCharged, error: msg.ai.creditsNotCharged,
}, },
}, },

View File

@@ -1,11 +1,12 @@
import { v } from "convex/values"; import { v } from "convex/values";
import { action } from "./_generated/server"; import { action, internalAction, internalMutation } from "./_generated/server";
import { api, internal } from "./_generated/api"; import { api, internal } from "./_generated/api";
import { import {
generateImageViaOpenRouter, generateImageViaOpenRouter,
DEFAULT_IMAGE_MODEL, DEFAULT_IMAGE_MODEL,
IMAGE_MODELS, IMAGE_MODELS,
} from "./openrouter"; } from "./openrouter";
import type { Id } from "./_generated/dataModel";
const MAX_IMAGE_RETRIES = 2; const MAX_IMAGE_RETRIES = 2;
@@ -156,60 +157,123 @@ async function generateImageWithAutoRetry(
throw lastError ?? new Error("Generation failed"); throw lastError ?? new Error("Generation failed");
} }
export const generateImage = action({ export const markNodeExecuting = internalMutation({
args: {
nodeId: v.id("nodes"),
},
handler: async (ctx, { nodeId }) => {
await ctx.db.patch(nodeId, {
status: "executing",
retryCount: 0,
statusMessage: undefined,
});
},
});
export const markNodeRetry = internalMutation({
args: {
nodeId: v.id("nodes"),
retryCount: v.number(),
maxRetries: v.number(),
failureMessage: v.string(),
},
handler: async (ctx, { nodeId, retryCount, maxRetries, failureMessage }) => {
const reason =
typeof failureMessage === "string" && failureMessage.trim().length > 0
? failureMessage
: "temporärer Fehler";
await ctx.db.patch(nodeId, {
status: "executing",
retryCount,
statusMessage: `Retry ${retryCount}/${maxRetries}${reason}`,
});
},
});
export const finalizeImageSuccess = internalMutation({
args: { args: {
canvasId: v.id("canvases"),
nodeId: v.id("nodes"), nodeId: v.id("nodes"),
prompt: v.string(), prompt: v.string(),
referenceStorageId: v.optional(v.id("_storage")), modelId: v.string(),
referenceImageUrl: v.optional(v.string()), storageId: v.id("_storage"),
model: v.optional(v.string()),
aspectRatio: v.optional(v.string()), aspectRatio: v.optional(v.string()),
retryCount: v.number(),
}, },
handler: async (ctx, args) => { handler: async (
// Auth: über requireAuth in runMutation — kein verschachteltes getCurrentUser (ConvexError → generische Client-Fehler). ctx,
const internalCreditsEnabled = { nodeId, prompt, modelId, storageId, aspectRatio, retryCount }
process.env.INTERNAL_CREDITS_ENABLED === "true"; ) => {
const apiKey = process.env.OPENROUTER_API_KEY;
if (!apiKey) {
throw new Error("OPENROUTER_API_KEY is not set");
}
const modelId = args.model ?? DEFAULT_IMAGE_MODEL;
const modelConfig = IMAGE_MODELS[modelId]; const modelConfig = IMAGE_MODELS[modelId];
if (!modelConfig) { if (!modelConfig) {
throw new Error(`Unknown model: ${modelId}`); throw new Error(`Unknown model: ${modelId}`);
} }
// Abuse-Check vor allem anderen — immer, unabhängig von Credits const existing = await ctx.db.get(nodeId);
await ctx.runMutation(internal.credits.checkAbuseLimits, {}); if (!existing) {
throw new Error("Node not found");
const reservationId = internalCreditsEnabled
? await ctx.runMutation(api.credits.reserve, {
estimatedCost: modelConfig.creditCost,
description: `Bildgenerierung — ${modelConfig.name}`,
model: modelId,
nodeId: args.nodeId,
canvasId: args.canvasId,
})
: null;
// Usage-Tracking wenn Credits deaktiviert (reserve übernimmt das bei aktivierten Credits)
if (!internalCreditsEnabled) {
await ctx.runMutation(internal.credits.incrementUsage, {});
} }
let retryCount = 0; const prev =
existing.data && typeof existing.data === "object"
? (existing.data as Record<string, unknown>)
: {};
const creditCost = modelConfig.creditCost;
const resolvedAspectRatio =
aspectRatio?.trim() ||
(typeof prev.aspectRatio === "string" ? prev.aspectRatio : undefined);
try { await ctx.db.patch(nodeId, {
// Status auf "executing" setzen — im try-Block damit Fehler den catch erreichen status: "done",
await ctx.runMutation(api.nodes.updateStatus, { retryCount,
nodeId: args.nodeId, statusMessage: undefined,
status: "executing", data: {
retryCount: 0, ...prev,
storageId,
prompt,
model: modelId,
modelLabel: modelConfig.name,
modelTier: modelConfig.tier,
generatedAt: Date.now(),
creditCost,
...(resolvedAspectRatio ? { aspectRatio: resolvedAspectRatio } : {}),
},
}); });
return { creditCost };
},
});
export const finalizeImageFailure = internalMutation({
args: {
nodeId: v.id("nodes"),
retryCount: v.number(),
statusMessage: v.string(),
},
handler: async (ctx, { nodeId, retryCount, statusMessage }) => {
await ctx.db.patch(nodeId, {
status: "error",
retryCount,
statusMessage,
});
},
});
export const generateAndStoreImage = internalAction({
args: {
nodeId: v.id("nodes"),
prompt: v.string(),
referenceStorageId: v.optional(v.id("_storage")),
referenceImageUrl: v.optional(v.string()),
model: v.string(),
aspectRatio: v.optional(v.string()),
},
handler: async (ctx, args) => {
const apiKey = process.env.OPENROUTER_API_KEY;
if (!apiKey) {
throw new Error("OPENROUTER_API_KEY is not set");
}
const retryCount = 0;
let referenceImageUrl = args.referenceImageUrl?.trim() || undefined; let referenceImageUrl = args.referenceImageUrl?.trim() || undefined;
if (args.referenceStorageId) { if (args.referenceStorageId) {
referenceImageUrl = referenceImageUrl =
@@ -221,20 +285,16 @@ export const generateImage = action({
generateImageViaOpenRouter(apiKey, { generateImageViaOpenRouter(apiKey, {
prompt: args.prompt, prompt: args.prompt,
referenceImageUrl, referenceImageUrl,
model: modelId, model: args.model,
aspectRatio: args.aspectRatio, aspectRatio: args.aspectRatio,
}), }),
async (nextRetryCount, maxRetries, failure) => { async (nextRetryCount, maxRetries, failure) => {
retryCount = nextRetryCount; retryCount = nextRetryCount;
const reason = await ctx.runMutation(internal.ai.markNodeRetry, {
typeof failure.message === "string"
? failure.message
: "temporärer Fehler";
await ctx.runMutation(api.nodes.updateStatus, {
nodeId: args.nodeId, nodeId: args.nodeId,
status: "executing",
retryCount: nextRetryCount, retryCount: nextRetryCount,
statusMessage: `Retry ${nextRetryCount}/${maxRetries}${reason}`, maxRetries,
failureMessage: failure.message,
}); });
} }
); );
@@ -248,61 +308,155 @@ export const generateImage = action({
const blob = new Blob([bytes], { type: result.mimeType }); const blob = new Blob([bytes], { type: result.mimeType });
const storageId = await ctx.storage.store(blob); const storageId = await ctx.storage.store(blob);
const existing = await ctx.runQuery(api.nodes.get, { nodeId: args.nodeId }); return {
if (!existing) throw new Error("Node not found"); storageId: storageId as Id<"_storage">,
const prev = (existing.data ?? {}) as Record<string, unknown>; retryCount,
const creditCost = modelConfig.creditCost; };
const aspectRatio =
args.aspectRatio?.trim() ||
(typeof prev.aspectRatio === "string" ? prev.aspectRatio : undefined);
await ctx.runMutation(api.nodes.updateData, {
nodeId: args.nodeId,
data: {
...prev,
storageId,
prompt: args.prompt,
model: modelId,
modelLabel: modelConfig.name,
modelTier: modelConfig.tier,
generatedAt: Date.now(),
creditCost,
...(aspectRatio ? { aspectRatio } : {}),
}, },
}); });
await ctx.runMutation(api.nodes.updateStatus, { export const processImageGeneration = internalAction({
args: {
nodeId: v.id("nodes"),
prompt: v.string(),
modelId: v.string(),
referenceStorageId: v.optional(v.id("_storage")),
referenceImageUrl: v.optional(v.string()),
aspectRatio: v.optional(v.string()),
reservationId: v.optional(v.id("creditTransactions")),
shouldDecrementConcurrency: v.boolean(),
},
handler: async (ctx, args) => {
let retryCount = 0;
try {
const result = await ctx.runAction(internal.ai.generateAndStoreImage, {
nodeId: args.nodeId, nodeId: args.nodeId,
status: "done", prompt: args.prompt,
referenceStorageId: args.referenceStorageId,
referenceImageUrl: args.referenceImageUrl,
model: args.modelId,
aspectRatio: args.aspectRatio,
});
retryCount = result.retryCount;
const { creditCost } = await ctx.runMutation(internal.ai.finalizeImageSuccess, {
nodeId: args.nodeId,
prompt: args.prompt,
modelId: args.modelId,
storageId: result.storageId,
aspectRatio: args.aspectRatio,
retryCount, retryCount,
}); });
if (reservationId) { if (args.reservationId) {
await ctx.runMutation(api.credits.commit, { await ctx.runMutation(internal.credits.commitInternal, {
transactionId: reservationId, transactionId: args.reservationId,
actualCost: creditCost, actualCost: creditCost,
}); });
} }
} catch (error) {
if (args.reservationId) {
try {
await ctx.runMutation(internal.credits.releaseInternal, {
transactionId: args.reservationId,
});
} catch {
// Keep node status updates best-effort even if credit release fails.
}
}
await ctx.runMutation(internal.ai.finalizeImageFailure, {
nodeId: args.nodeId,
retryCount,
statusMessage: formatTerminalStatusMessage(error),
});
} finally {
if (args.shouldDecrementConcurrency) {
await ctx.runMutation(internal.credits.decrementConcurrency, {});
}
}
},
});
export const generateImage = action({
args: {
canvasId: v.id("canvases"),
nodeId: v.id("nodes"),
prompt: v.string(),
referenceStorageId: v.optional(v.id("_storage")),
referenceImageUrl: v.optional(v.string()),
model: v.optional(v.string()),
aspectRatio: v.optional(v.string()),
},
handler: async (ctx, args) => {
const internalCreditsEnabled =
process.env.INTERNAL_CREDITS_ENABLED === "true";
const modelId = args.model ?? DEFAULT_IMAGE_MODEL;
const modelConfig = IMAGE_MODELS[modelId];
if (!modelConfig) {
throw new Error(`Unknown model: ${modelId}`);
}
await ctx.runMutation(internal.credits.checkAbuseLimits, {});
let usageIncremented = false;
const reservationId: Id<"creditTransactions"> | null = internalCreditsEnabled
? await ctx.runMutation(api.credits.reserve, {
estimatedCost: modelConfig.creditCost,
description: `Bildgenerierung — ${modelConfig.name}`,
model: modelId,
nodeId: args.nodeId,
canvasId: args.canvasId,
})
: null;
if (!internalCreditsEnabled) {
await ctx.runMutation(internal.credits.incrementUsage, {});
usageIncremented = true;
}
const retryCount = 0;
let backgroundJobScheduled = false;
try {
await ctx.runMutation(internal.ai.markNodeExecuting, {
nodeId: args.nodeId,
});
await ctx.scheduler.runAfter(0, internal.ai.processImageGeneration, {
nodeId: args.nodeId,
prompt: args.prompt,
modelId,
referenceStorageId: args.referenceStorageId,
referenceImageUrl: args.referenceImageUrl,
aspectRatio: args.aspectRatio,
reservationId: reservationId ?? undefined,
shouldDecrementConcurrency: usageIncremented,
});
backgroundJobScheduled = true;
return { queued: true as const, nodeId: args.nodeId };
} catch (error) { } catch (error) {
if (reservationId) { if (reservationId) {
try {
await ctx.runMutation(api.credits.release, { await ctx.runMutation(api.credits.release, {
transactionId: reservationId, transactionId: reservationId,
}); });
} catch {
// Prefer returning a clear node error over masking with cleanup failures.
}
} }
await ctx.runMutation(api.nodes.updateStatus, { await ctx.runMutation(internal.ai.finalizeImageFailure, {
nodeId: args.nodeId, nodeId: args.nodeId,
status: "error",
retryCount, retryCount,
statusMessage: formatTerminalStatusMessage(error), statusMessage: formatTerminalStatusMessage(error),
}); });
throw error; throw error;
} finally { } finally {
// Concurrency freigeben wenn Credits deaktiviert if (usageIncremented && !backgroundJobScheduled) {
// (commit/release übernehmen das bei aktivierten Credits)
if (!internalCreditsEnabled) {
await ctx.runMutation(internal.credits.decrementConcurrency, {}); await ctx.runMutation(internal.credits.decrementConcurrency, {});
} }
} }

View File

@@ -394,22 +394,25 @@ export const reserve = mutation({
}); });
/** /**
* Reservation committen — nach erfolgreichem KI-Call. * Reservation committen — interne Variante ohne Auth-Kontext.
*
* Schreibt die tatsächlichen Kosten ab (können von Reservation abweichen).
*/ */
export const commit = mutation({ export const commitInternal = internalMutation({
args: { args: {
transactionId: v.id("creditTransactions"), transactionId: v.id("creditTransactions"),
actualCost: v.number(), // Tatsächliche Kosten in Cent actualCost: v.number(),
openRouterCost: v.optional(v.number()), // Echte API-Kosten openRouterCost: v.optional(v.number()),
}, },
handler: async (ctx, { transactionId, actualCost, openRouterCost }) => { handler: async (ctx, { transactionId, actualCost, openRouterCost }) => {
const user = await requireAuth(ctx);
const transaction = await ctx.db.get(transactionId); const transaction = await ctx.db.get(transactionId);
if (!transaction || transaction.userId !== user.userId) { if (!transaction) {
throw new Error("Transaction not found"); throw new Error("Transaction not found");
} }
if (transaction.status === "committed") {
return { status: "already_committed" as const };
}
if (transaction.status === "released") {
return { status: "already_released" as const };
}
if (transaction.status !== "reserved") { if (transaction.status !== "reserved") {
throw new Error(`Transaction is ${transaction.status}, expected reserved`); throw new Error(`Transaction is ${transaction.status}, expected reserved`);
} }
@@ -419,13 +422,13 @@ export const commit = mutation({
// Balance aktualisieren // Balance aktualisieren
const balance = await ctx.db const balance = await ctx.db
.query("creditBalances") .query("creditBalances")
.withIndex("by_user", (q) => q.eq("userId", user.userId)) .withIndex("by_user", (q) => q.eq("userId", transaction.userId))
.unique(); .unique();
if (!balance) throw new Error("No credit balance found"); if (!balance) throw new Error("No credit balance found");
await ctx.db.patch(balance._id, { await ctx.db.patch(balance._id, {
balance: balance.balance - actualCost, balance: balance.balance - actualCost,
reserved: balance.reserved - estimatedCost, reserved: Math.max(0, balance.reserved - estimatedCost),
updatedAt: Date.now(), updatedAt: Date.now(),
}); });
@@ -442,7 +445,7 @@ export const commit = mutation({
const dailyUsage = await ctx.db const dailyUsage = await ctx.db
.query("dailyUsage") .query("dailyUsage")
.withIndex("by_user_date", (q) => .withIndex("by_user_date", (q) =>
q.eq("userId", user.userId).eq("date", today) q.eq("userId", transaction.userId).eq("date", today)
) )
.unique(); .unique();
if (dailyUsage && dailyUsage.concurrentJobs > 0) { if (dailyUsage && dailyUsage.concurrentJobs > 0) {
@@ -450,6 +453,94 @@ export const commit = mutation({
concurrentJobs: dailyUsage.concurrentJobs - 1, concurrentJobs: dailyUsage.concurrentJobs - 1,
}); });
} }
return { status: "committed" as const };
},
});
/**
* Reservation committen — nach erfolgreichem KI-Call.
*
* Schreibt die tatsächlichen Kosten ab (können von Reservation abweichen).
*/
export const commit = mutation({
args: {
transactionId: v.id("creditTransactions"),
actualCost: v.number(),
openRouterCost: v.optional(v.number()),
},
handler: async (ctx, { transactionId, actualCost, openRouterCost }) => {
const user = await requireAuth(ctx);
const transaction = await ctx.db.get(transactionId);
if (!transaction || transaction.userId !== user.userId) {
throw new Error("Transaction not found");
}
return await ctx.runMutation(internal.credits.commitInternal, {
transactionId,
actualCost,
openRouterCost,
});
},
});
/**
* Reservation freigeben — interne Variante ohne Auth-Kontext.
*/
export const releaseInternal = internalMutation({
args: {
transactionId: v.id("creditTransactions"),
},
handler: async (ctx, { transactionId }) => {
const transaction = await ctx.db.get(transactionId);
if (!transaction) {
throw new Error("Transaction not found");
}
if (transaction.status === "released") {
return { status: "already_released" as const };
}
if (transaction.status === "committed") {
return { status: "already_committed" as const };
}
if (transaction.status !== "reserved") {
throw new Error(`Transaction is ${transaction.status}, expected reserved`);
}
const estimatedCost = Math.abs(transaction.amount);
// Credits freigeben
const balance = await ctx.db
.query("creditBalances")
.withIndex("by_user", (q) => q.eq("userId", transaction.userId))
.unique();
if (!balance) throw new Error("No credit balance found");
await ctx.db.patch(balance._id, {
reserved: Math.max(0, balance.reserved - estimatedCost),
updatedAt: Date.now(),
});
// Transaktion als released markieren
await ctx.db.patch(transactionId, {
status: "released",
});
// Concurrent Jobs dekrementieren
const today = new Date().toISOString().split("T")[0];
const dailyUsage = await ctx.db
.query("dailyUsage")
.withIndex("by_user_date", (q) =>
q.eq("userId", transaction.userId).eq("date", today)
)
.unique();
if (dailyUsage && dailyUsage.concurrentJobs > 0) {
await ctx.db.patch(dailyUsage._id, {
concurrentJobs: dailyUsage.concurrentJobs - 1,
});
}
// Generation Count NICHT zurücksetzen — der Versuch zählt
return { status: "released" as const };
}, },
}); });
@@ -468,44 +559,10 @@ export const release = mutation({
if (!transaction || transaction.userId !== user.userId) { if (!transaction || transaction.userId !== user.userId) {
throw new Error("Transaction not found"); throw new Error("Transaction not found");
} }
if (transaction.status !== "reserved") {
throw new Error(`Transaction is ${transaction.status}, expected reserved`);
}
const estimatedCost = Math.abs(transaction.amount); return await ctx.runMutation(internal.credits.releaseInternal, {
transactionId,
// Credits freigeben
const balance = await ctx.db
.query("creditBalances")
.withIndex("by_user", (q) => q.eq("userId", user.userId))
.unique();
if (!balance) throw new Error("No credit balance found");
await ctx.db.patch(balance._id, {
reserved: balance.reserved - estimatedCost,
updatedAt: Date.now(),
}); });
// Transaktion als released markieren
await ctx.db.patch(transactionId, {
status: "released",
});
// Concurrent Jobs dekrementieren
const today = new Date().toISOString().split("T")[0];
const dailyUsage = await ctx.db
.query("dailyUsage")
.withIndex("by_user_date", (q) =>
q.eq("userId", user.userId).eq("date", today)
)
.unique();
if (dailyUsage && dailyUsage.concurrentJobs > 0) {
await ctx.db.patch(dailyUsage._id, {
concurrentJobs: dailyUsage.concurrentJobs - 1,
});
}
// Generation Count NICHT zurücksetzen — der Versuch zählt
}, },
}); });

View File

@@ -70,6 +70,8 @@ export const msg = {
generating: { title: "Bild wird generiert…" }, generating: { title: "Bild wird generiert…" },
generated: { title: "Bild generiert" }, generated: { title: "Bild generiert" },
generatedDesc: (credits: number) => `${credits} Credits verbraucht`, generatedDesc: (credits: number) => `${credits} Credits verbraucht`,
generationQueued: { title: "Generierung gestartet" },
generationQueuedDesc: "Das Bild erscheint automatisch, sobald es fertig ist.",
generationFailed: { title: "Generierung fehlgeschlagen" }, generationFailed: { title: "Generierung fehlgeschlagen" },
creditsNotCharged: "Credits wurden nicht abgebucht", creditsNotCharged: "Credits wurden nicht abgebucht",
insufficientCredits: (needed: number, available: number) => ({ insufficientCredits: (needed: number, available: number) => ({