diff --git a/components/canvas/nodes/ai-image-node.tsx b/components/canvas/nodes/ai-image-node.tsx index 57f9dc7..0bc9978 100644 --- a/components/canvas/nodes/ai-image-node.tsx +++ b/components/canvas/nodes/ai-image-node.tsx @@ -114,7 +114,6 @@ export default function AiImageNode({ } const modelId = nodeData.model ?? DEFAULT_MODEL_ID; - const regenCreditCost = getModel(modelId)?.creditCost ?? 4; await toast.promise( generateImage({ @@ -128,10 +127,10 @@ export default function AiImageNode({ }), { loading: msg.ai.generating.title, - success: msg.ai.generated.title, + success: msg.ai.generationQueued.title, error: msg.ai.generationFailed.title, description: { - success: msg.ai.generatedDesc(regenCreditCost), + success: msg.ai.generationQueuedDesc, error: msg.ai.creditsNotCharged, }, }, diff --git a/components/canvas/nodes/prompt-node.tsx b/components/canvas/nodes/prompt-node.tsx index 635f97a..c59e20a 100644 --- a/components/canvas/nodes/prompt-node.tsx +++ b/components/canvas/nodes/prompt-node.tsx @@ -249,10 +249,10 @@ export default function PromptNode({ }), { loading: msg.ai.generating.title, - success: msg.ai.generated.title, + success: msg.ai.generationQueued.title, error: msg.ai.generationFailed.title, description: { - success: msg.ai.generatedDesc(creditCost), + success: msg.ai.generationQueuedDesc, error: msg.ai.creditsNotCharged, }, }, diff --git a/convex/ai.ts b/convex/ai.ts index b10dabb..7cccc8b 100644 --- a/convex/ai.ts +++ b/convex/ai.ts @@ -1,11 +1,12 @@ import { v } from "convex/values"; -import { action } from "./_generated/server"; +import { action, internalAction, internalMutation } from "./_generated/server"; import { api, internal } from "./_generated/api"; import { generateImageViaOpenRouter, DEFAULT_IMAGE_MODEL, IMAGE_MODELS, } from "./openrouter"; +import type { Id } from "./_generated/dataModel"; const MAX_IMAGE_RETRIES = 2; @@ -156,6 +157,228 @@ async function generateImageWithAutoRetry( throw lastError ?? new Error("Generation failed"); } +export const markNodeExecuting = internalMutation({ + args: { + nodeId: v.id("nodes"), + }, + handler: async (ctx, { nodeId }) => { + await ctx.db.patch(nodeId, { + status: "executing", + retryCount: 0, + statusMessage: undefined, + }); + }, +}); + +export const markNodeRetry = internalMutation({ + args: { + nodeId: v.id("nodes"), + retryCount: v.number(), + maxRetries: v.number(), + failureMessage: v.string(), + }, + handler: async (ctx, { nodeId, retryCount, maxRetries, failureMessage }) => { + const reason = + typeof failureMessage === "string" && failureMessage.trim().length > 0 + ? failureMessage + : "temporärer Fehler"; + await ctx.db.patch(nodeId, { + status: "executing", + retryCount, + statusMessage: `Retry ${retryCount}/${maxRetries} — ${reason}`, + }); + }, +}); + +export const finalizeImageSuccess = internalMutation({ + args: { + nodeId: v.id("nodes"), + prompt: v.string(), + modelId: v.string(), + storageId: v.id("_storage"), + aspectRatio: v.optional(v.string()), + retryCount: v.number(), + }, + handler: async ( + ctx, + { nodeId, prompt, modelId, storageId, aspectRatio, retryCount } + ) => { + const modelConfig = IMAGE_MODELS[modelId]; + if (!modelConfig) { + throw new Error(`Unknown model: ${modelId}`); + } + + const existing = await ctx.db.get(nodeId); + if (!existing) { + throw new Error("Node not found"); + } + + const prev = + existing.data && typeof existing.data === "object" + ? (existing.data as Record) + : {}; + const creditCost = modelConfig.creditCost; + const resolvedAspectRatio = + aspectRatio?.trim() || + (typeof prev.aspectRatio === "string" ? prev.aspectRatio : undefined); + + await ctx.db.patch(nodeId, { + status: "done", + retryCount, + statusMessage: undefined, + data: { + ...prev, + storageId, + prompt, + model: modelId, + modelLabel: modelConfig.name, + modelTier: modelConfig.tier, + generatedAt: Date.now(), + creditCost, + ...(resolvedAspectRatio ? { aspectRatio: resolvedAspectRatio } : {}), + }, + }); + + return { creditCost }; + }, +}); + +export const finalizeImageFailure = internalMutation({ + args: { + nodeId: v.id("nodes"), + retryCount: v.number(), + statusMessage: v.string(), + }, + handler: async (ctx, { nodeId, retryCount, statusMessage }) => { + await ctx.db.patch(nodeId, { + status: "error", + retryCount, + statusMessage, + }); + }, +}); + +export const generateAndStoreImage = internalAction({ + args: { + nodeId: v.id("nodes"), + prompt: v.string(), + referenceStorageId: v.optional(v.id("_storage")), + referenceImageUrl: v.optional(v.string()), + model: v.string(), + aspectRatio: v.optional(v.string()), + }, + handler: async (ctx, args) => { + const apiKey = process.env.OPENROUTER_API_KEY; + if (!apiKey) { + throw new Error("OPENROUTER_API_KEY is not set"); + } + + const retryCount = 0; + let referenceImageUrl = args.referenceImageUrl?.trim() || undefined; + if (args.referenceStorageId) { + referenceImageUrl = + (await ctx.storage.getUrl(args.referenceStorageId)) ?? undefined; + } + + const result = await generateImageWithAutoRetry( + () => + generateImageViaOpenRouter(apiKey, { + prompt: args.prompt, + referenceImageUrl, + model: args.model, + aspectRatio: args.aspectRatio, + }), + async (nextRetryCount, maxRetries, failure) => { + retryCount = nextRetryCount; + await ctx.runMutation(internal.ai.markNodeRetry, { + nodeId: args.nodeId, + retryCount: nextRetryCount, + maxRetries, + failureMessage: failure.message, + }); + } + ); + + const binaryString = atob(result.imageBase64); + const bytes = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + + const blob = new Blob([bytes], { type: result.mimeType }); + const storageId = await ctx.storage.store(blob); + + return { + storageId: storageId as Id<"_storage">, + retryCount, + }; + }, +}); + +export const processImageGeneration = internalAction({ + args: { + nodeId: v.id("nodes"), + prompt: v.string(), + modelId: v.string(), + referenceStorageId: v.optional(v.id("_storage")), + referenceImageUrl: v.optional(v.string()), + aspectRatio: v.optional(v.string()), + reservationId: v.optional(v.id("creditTransactions")), + shouldDecrementConcurrency: v.boolean(), + }, + handler: async (ctx, args) => { + let retryCount = 0; + + try { + const result = await ctx.runAction(internal.ai.generateAndStoreImage, { + nodeId: args.nodeId, + prompt: args.prompt, + referenceStorageId: args.referenceStorageId, + referenceImageUrl: args.referenceImageUrl, + model: args.modelId, + aspectRatio: args.aspectRatio, + }); + retryCount = result.retryCount; + + const { creditCost } = await ctx.runMutation(internal.ai.finalizeImageSuccess, { + nodeId: args.nodeId, + prompt: args.prompt, + modelId: args.modelId, + storageId: result.storageId, + aspectRatio: args.aspectRatio, + retryCount, + }); + + if (args.reservationId) { + await ctx.runMutation(internal.credits.commitInternal, { + transactionId: args.reservationId, + actualCost: creditCost, + }); + } + } catch (error) { + if (args.reservationId) { + try { + await ctx.runMutation(internal.credits.releaseInternal, { + transactionId: args.reservationId, + }); + } catch { + // Keep node status updates best-effort even if credit release fails. + } + } + + await ctx.runMutation(internal.ai.finalizeImageFailure, { + nodeId: args.nodeId, + retryCount, + statusMessage: formatTerminalStatusMessage(error), + }); + } finally { + if (args.shouldDecrementConcurrency) { + await ctx.runMutation(internal.credits.decrementConcurrency, {}); + } + } + }, +}); + export const generateImage = action({ args: { canvasId: v.id("canvases"), @@ -167,25 +390,19 @@ export const generateImage = action({ aspectRatio: v.optional(v.string()), }, handler: async (ctx, args) => { - // Auth: über requireAuth in runMutation — kein verschachteltes getCurrentUser (ConvexError → generische Client-Fehler). const internalCreditsEnabled = process.env.INTERNAL_CREDITS_ENABLED === "true"; - const apiKey = process.env.OPENROUTER_API_KEY; - if (!apiKey) { - throw new Error("OPENROUTER_API_KEY is not set"); - } - const modelId = args.model ?? DEFAULT_IMAGE_MODEL; const modelConfig = IMAGE_MODELS[modelId]; if (!modelConfig) { throw new Error(`Unknown model: ${modelId}`); } - // Abuse-Check vor allem anderen — immer, unabhängig von Credits await ctx.runMutation(internal.credits.checkAbuseLimits, {}); - const reservationId = internalCreditsEnabled + let usageIncremented = false; + const reservationId: Id<"creditTransactions"> | null = internalCreditsEnabled ? await ctx.runMutation(api.credits.reserve, { estimatedCost: modelConfig.creditCost, description: `Bildgenerierung — ${modelConfig.name}`, @@ -195,114 +412,51 @@ export const generateImage = action({ }) : null; - // Usage-Tracking wenn Credits deaktiviert (reserve übernimmt das bei aktivierten Credits) if (!internalCreditsEnabled) { await ctx.runMutation(internal.credits.incrementUsage, {}); + usageIncremented = true; } - let retryCount = 0; + const retryCount = 0; + let backgroundJobScheduled = false; try { - // Status auf "executing" setzen — im try-Block damit Fehler den catch erreichen - await ctx.runMutation(api.nodes.updateStatus, { + await ctx.runMutation(internal.ai.markNodeExecuting, { nodeId: args.nodeId, - status: "executing", - retryCount: 0, }); - let referenceImageUrl = args.referenceImageUrl?.trim() || undefined; - if (args.referenceStorageId) { - referenceImageUrl = - (await ctx.storage.getUrl(args.referenceStorageId)) ?? undefined; - } - - const result = await generateImageWithAutoRetry( - () => - generateImageViaOpenRouter(apiKey, { - prompt: args.prompt, - referenceImageUrl, - model: modelId, - aspectRatio: args.aspectRatio, - }), - async (nextRetryCount, maxRetries, failure) => { - retryCount = nextRetryCount; - const reason = - typeof failure.message === "string" - ? failure.message - : "temporärer Fehler"; - await ctx.runMutation(api.nodes.updateStatus, { - nodeId: args.nodeId, - status: "executing", - retryCount: nextRetryCount, - statusMessage: `Retry ${nextRetryCount}/${maxRetries} — ${reason}`, - }); - } - ); - - const binaryString = atob(result.imageBase64); - const bytes = new Uint8Array(binaryString.length); - for (let i = 0; i < binaryString.length; i++) { - bytes[i] = binaryString.charCodeAt(i); - } - - const blob = new Blob([bytes], { type: result.mimeType }); - const storageId = await ctx.storage.store(blob); - - const existing = await ctx.runQuery(api.nodes.get, { nodeId: args.nodeId }); - if (!existing) throw new Error("Node not found"); - const prev = (existing.data ?? {}) as Record; - const creditCost = modelConfig.creditCost; - - const aspectRatio = - args.aspectRatio?.trim() || - (typeof prev.aspectRatio === "string" ? prev.aspectRatio : undefined); - - await ctx.runMutation(api.nodes.updateData, { + await ctx.scheduler.runAfter(0, internal.ai.processImageGeneration, { nodeId: args.nodeId, - data: { - ...prev, - storageId, - prompt: args.prompt, - model: modelId, - modelLabel: modelConfig.name, - modelTier: modelConfig.tier, - generatedAt: Date.now(), - creditCost, - ...(aspectRatio ? { aspectRatio } : {}), - }, + prompt: args.prompt, + modelId, + referenceStorageId: args.referenceStorageId, + referenceImageUrl: args.referenceImageUrl, + aspectRatio: args.aspectRatio, + reservationId: reservationId ?? undefined, + shouldDecrementConcurrency: usageIncremented, }); - - await ctx.runMutation(api.nodes.updateStatus, { - nodeId: args.nodeId, - status: "done", - retryCount, - }); - - if (reservationId) { - await ctx.runMutation(api.credits.commit, { - transactionId: reservationId, - actualCost: creditCost, - }); - } + backgroundJobScheduled = true; + return { queued: true as const, nodeId: args.nodeId }; } catch (error) { if (reservationId) { - await ctx.runMutation(api.credits.release, { - transactionId: reservationId, - }); + try { + await ctx.runMutation(api.credits.release, { + transactionId: reservationId, + }); + } catch { + // Prefer returning a clear node error over masking with cleanup failures. + } } - await ctx.runMutation(api.nodes.updateStatus, { + await ctx.runMutation(internal.ai.finalizeImageFailure, { nodeId: args.nodeId, - status: "error", retryCount, statusMessage: formatTerminalStatusMessage(error), }); throw error; } finally { - // Concurrency freigeben wenn Credits deaktiviert - // (commit/release übernehmen das bei aktivierten Credits) - if (!internalCreditsEnabled) { + if (usageIncremented && !backgroundJobScheduled) { await ctx.runMutation(internal.credits.decrementConcurrency, {}); } } diff --git a/convex/credits.ts b/convex/credits.ts index d394d43..7de8bdb 100644 --- a/convex/credits.ts +++ b/convex/credits.ts @@ -394,22 +394,25 @@ export const reserve = mutation({ }); /** - * Reservation committen — nach erfolgreichem KI-Call. - * - * Schreibt die tatsächlichen Kosten ab (können von Reservation abweichen). + * Reservation committen — interne Variante ohne Auth-Kontext. */ -export const commit = mutation({ +export const commitInternal = internalMutation({ args: { transactionId: v.id("creditTransactions"), - actualCost: v.number(), // Tatsächliche Kosten in Cent - openRouterCost: v.optional(v.number()), // Echte API-Kosten + actualCost: v.number(), + openRouterCost: v.optional(v.number()), }, handler: async (ctx, { transactionId, actualCost, openRouterCost }) => { - const user = await requireAuth(ctx); const transaction = await ctx.db.get(transactionId); - if (!transaction || transaction.userId !== user.userId) { + if (!transaction) { throw new Error("Transaction not found"); } + if (transaction.status === "committed") { + return { status: "already_committed" as const }; + } + if (transaction.status === "released") { + return { status: "already_released" as const }; + } if (transaction.status !== "reserved") { throw new Error(`Transaction is ${transaction.status}, expected reserved`); } @@ -419,13 +422,13 @@ export const commit = mutation({ // Balance aktualisieren const balance = await ctx.db .query("creditBalances") - .withIndex("by_user", (q) => q.eq("userId", user.userId)) + .withIndex("by_user", (q) => q.eq("userId", transaction.userId)) .unique(); if (!balance) throw new Error("No credit balance found"); await ctx.db.patch(balance._id, { balance: balance.balance - actualCost, - reserved: balance.reserved - estimatedCost, + reserved: Math.max(0, balance.reserved - estimatedCost), updatedAt: Date.now(), }); @@ -442,7 +445,7 @@ export const commit = mutation({ const dailyUsage = await ctx.db .query("dailyUsage") .withIndex("by_user_date", (q) => - q.eq("userId", user.userId).eq("date", today) + q.eq("userId", transaction.userId).eq("date", today) ) .unique(); if (dailyUsage && dailyUsage.concurrentJobs > 0) { @@ -450,6 +453,94 @@ export const commit = mutation({ concurrentJobs: dailyUsage.concurrentJobs - 1, }); } + + return { status: "committed" as const }; + }, +}); + +/** + * Reservation committen — nach erfolgreichem KI-Call. + * + * Schreibt die tatsächlichen Kosten ab (können von Reservation abweichen). + */ +export const commit = mutation({ + args: { + transactionId: v.id("creditTransactions"), + actualCost: v.number(), + openRouterCost: v.optional(v.number()), + }, + handler: async (ctx, { transactionId, actualCost, openRouterCost }) => { + const user = await requireAuth(ctx); + const transaction = await ctx.db.get(transactionId); + if (!transaction || transaction.userId !== user.userId) { + throw new Error("Transaction not found"); + } + + return await ctx.runMutation(internal.credits.commitInternal, { + transactionId, + actualCost, + openRouterCost, + }); + }, +}); + +/** + * Reservation freigeben — interne Variante ohne Auth-Kontext. + */ +export const releaseInternal = internalMutation({ + args: { + transactionId: v.id("creditTransactions"), + }, + handler: async (ctx, { transactionId }) => { + const transaction = await ctx.db.get(transactionId); + if (!transaction) { + throw new Error("Transaction not found"); + } + if (transaction.status === "released") { + return { status: "already_released" as const }; + } + if (transaction.status === "committed") { + return { status: "already_committed" as const }; + } + if (transaction.status !== "reserved") { + throw new Error(`Transaction is ${transaction.status}, expected reserved`); + } + + const estimatedCost = Math.abs(transaction.amount); + + // Credits freigeben + const balance = await ctx.db + .query("creditBalances") + .withIndex("by_user", (q) => q.eq("userId", transaction.userId)) + .unique(); + if (!balance) throw new Error("No credit balance found"); + + await ctx.db.patch(balance._id, { + reserved: Math.max(0, balance.reserved - estimatedCost), + updatedAt: Date.now(), + }); + + // Transaktion als released markieren + await ctx.db.patch(transactionId, { + status: "released", + }); + + // Concurrent Jobs dekrementieren + const today = new Date().toISOString().split("T")[0]; + const dailyUsage = await ctx.db + .query("dailyUsage") + .withIndex("by_user_date", (q) => + q.eq("userId", transaction.userId).eq("date", today) + ) + .unique(); + if (dailyUsage && dailyUsage.concurrentJobs > 0) { + await ctx.db.patch(dailyUsage._id, { + concurrentJobs: dailyUsage.concurrentJobs - 1, + }); + } + + // Generation Count NICHT zurücksetzen — der Versuch zählt + return { status: "released" as const }; }, }); @@ -468,44 +559,10 @@ export const release = mutation({ if (!transaction || transaction.userId !== user.userId) { throw new Error("Transaction not found"); } - if (transaction.status !== "reserved") { - throw new Error(`Transaction is ${transaction.status}, expected reserved`); - } - const estimatedCost = Math.abs(transaction.amount); - - // Credits freigeben - const balance = await ctx.db - .query("creditBalances") - .withIndex("by_user", (q) => q.eq("userId", user.userId)) - .unique(); - if (!balance) throw new Error("No credit balance found"); - - await ctx.db.patch(balance._id, { - reserved: balance.reserved - estimatedCost, - updatedAt: Date.now(), + return await ctx.runMutation(internal.credits.releaseInternal, { + transactionId, }); - - // Transaktion als released markieren - await ctx.db.patch(transactionId, { - status: "released", - }); - - // Concurrent Jobs dekrementieren - const today = new Date().toISOString().split("T")[0]; - const dailyUsage = await ctx.db - .query("dailyUsage") - .withIndex("by_user_date", (q) => - q.eq("userId", user.userId).eq("date", today) - ) - .unique(); - if (dailyUsage && dailyUsage.concurrentJobs > 0) { - await ctx.db.patch(dailyUsage._id, { - concurrentJobs: dailyUsage.concurrentJobs - 1, - }); - } - - // Generation Count NICHT zurücksetzen — der Versuch zählt }, }); diff --git a/lib/toast-messages.ts b/lib/toast-messages.ts index 77bd8e1..c022aa3 100644 --- a/lib/toast-messages.ts +++ b/lib/toast-messages.ts @@ -70,6 +70,8 @@ export const msg = { generating: { title: "Bild wird generiert…" }, generated: { title: "Bild generiert" }, generatedDesc: (credits: number) => `${credits} Credits verbraucht`, + generationQueued: { title: "Generierung gestartet" }, + generationQueuedDesc: "Das Bild erscheint automatisch, sobald es fertig ist.", generationFailed: { title: "Generierung fehlgeschlagen" }, creditsNotCharged: "Credits wurden nicht abgebucht", insufficientCredits: (needed: number, available: number) => ({