Merge branch 'refactor/convex-ai-modularization'
This commit is contained in:
479
convex/ai.ts
479
convex/ai.ts
@@ -1,4 +1,4 @@
|
|||||||
import { v, ConvexError } from "convex/values";
|
import { v } from "convex/values";
|
||||||
import {
|
import {
|
||||||
action,
|
action,
|
||||||
internalAction,
|
internalAction,
|
||||||
@@ -19,6 +19,18 @@ import {
|
|||||||
FreepikApiError,
|
FreepikApiError,
|
||||||
getVideoTaskStatus,
|
getVideoTaskStatus,
|
||||||
} from "./freepik";
|
} from "./freepik";
|
||||||
|
import {
|
||||||
|
categorizeError,
|
||||||
|
errorMessage,
|
||||||
|
formatTerminalStatusMessage,
|
||||||
|
getErrorCode,
|
||||||
|
getErrorSource,
|
||||||
|
getProviderStatus,
|
||||||
|
getVideoPollDelayMs,
|
||||||
|
isVideoPollTimedOut,
|
||||||
|
} from "./ai_errors";
|
||||||
|
import { getNodeDataRecord } from "./ai_node_data";
|
||||||
|
import { generateImageWithAutoRetry } from "./ai_retry";
|
||||||
import { getVideoModel, isVideoModelId } from "../lib/ai-video-models";
|
import { getVideoModel, isVideoModelId } from "../lib/ai-video-models";
|
||||||
import {
|
import {
|
||||||
shouldLogVideoPollAttempt,
|
shouldLogVideoPollAttempt,
|
||||||
@@ -31,234 +43,42 @@ const MAX_IMAGE_RETRIES = 2;
|
|||||||
const MAX_VIDEO_POLL_ATTEMPTS = 30;
|
const MAX_VIDEO_POLL_ATTEMPTS = 30;
|
||||||
const MAX_VIDEO_POLL_TOTAL_MS = 10 * 60 * 1000;
|
const MAX_VIDEO_POLL_TOTAL_MS = 10 * 60 * 1000;
|
||||||
|
|
||||||
type ErrorCategory =
|
async function releaseInternalReservationBestEffort(
|
||||||
| "credits"
|
reservationId: Id<"creditTransactions"> | null | undefined,
|
||||||
| "policy"
|
releaseFn: (transactionId: Id<"creditTransactions">) => Promise<unknown>
|
||||||
| "timeout"
|
|
||||||
| "transient"
|
|
||||||
| "provider"
|
|
||||||
| "unknown";
|
|
||||||
|
|
||||||
interface ErrorData {
|
|
||||||
code?: string;
|
|
||||||
[key: string]: unknown;
|
|
||||||
}
|
|
||||||
|
|
||||||
function getErrorCode(error: unknown): string | undefined {
|
|
||||||
if (error instanceof ConvexError) {
|
|
||||||
const data = error.data as ErrorData;
|
|
||||||
return data?.code;
|
|
||||||
}
|
|
||||||
if (error instanceof FreepikApiError) {
|
|
||||||
return error.code;
|
|
||||||
}
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
function getErrorSource(error: unknown): string | undefined {
|
|
||||||
if (error instanceof FreepikApiError) {
|
|
||||||
return error.source;
|
|
||||||
}
|
|
||||||
if (error && typeof error === "object") {
|
|
||||||
const source = (error as { source?: unknown }).source;
|
|
||||||
return typeof source === "string" ? source : undefined;
|
|
||||||
}
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
function getProviderStatus(error: unknown): number | null {
|
|
||||||
if (error instanceof FreepikApiError) {
|
|
||||||
return typeof error.status === "number" ? error.status : null;
|
|
||||||
}
|
|
||||||
if (error && typeof error === "object") {
|
|
||||||
const status = (error as { status?: unknown }).status;
|
|
||||||
if (typeof status === "number" && Number.isFinite(status)) {
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
function errorMessage(error: unknown): string {
|
|
||||||
if (error instanceof Error) return error.message;
|
|
||||||
return String(error ?? "Generation failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
function parseOpenRouterStatus(message: string): number | null {
|
|
||||||
const match = message.match(/OpenRouter API error\s+(\d+)/i);
|
|
||||||
if (!match) return null;
|
|
||||||
const parsed = Number(match[1]);
|
|
||||||
return Number.isFinite(parsed) ? parsed : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
function categorizeError(error: unknown): {
|
|
||||||
category: ErrorCategory;
|
|
||||||
retryable: boolean;
|
|
||||||
} {
|
|
||||||
const code = getErrorCode(error);
|
|
||||||
const source = getErrorSource(error);
|
|
||||||
const message = errorMessage(error);
|
|
||||||
const lower = message.toLowerCase();
|
|
||||||
const status = getProviderStatus(error) ?? parseOpenRouterStatus(message);
|
|
||||||
|
|
||||||
if (source === "freepik") {
|
|
||||||
if (code === "model_unavailable") {
|
|
||||||
return {
|
|
||||||
category: "provider",
|
|
||||||
retryable: status === 503,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
if (code === "timeout") {
|
|
||||||
return { category: "timeout", retryable: true };
|
|
||||||
}
|
|
||||||
if (code === "transient") {
|
|
||||||
return { category: "transient", retryable: true };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
code === "CREDITS_TEST_DISABLED" ||
|
|
||||||
code === "CREDITS_INVALID_AMOUNT" ||
|
|
||||||
code === "CREDITS_BALANCE_NOT_FOUND" ||
|
|
||||||
code === "CREDITS_DAILY_CAP_REACHED" ||
|
|
||||||
code === "CREDITS_CONCURRENCY_LIMIT"
|
|
||||||
) {
|
) {
|
||||||
return { category: "credits", retryable: false };
|
if (!reservationId) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (
|
|
||||||
code === "OPENROUTER_MODEL_REFUSAL" ||
|
|
||||||
lower.includes("content policy") ||
|
|
||||||
lower.includes("policy") ||
|
|
||||||
lower.includes("moderation") ||
|
|
||||||
lower.includes("safety") ||
|
|
||||||
lower.includes("refusal") ||
|
|
||||||
lower.includes("policy_violation")
|
|
||||||
) {
|
|
||||||
return { category: "policy", retryable: false };
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status !== null) {
|
|
||||||
if (status >= 500 || status === 408 || status === 429 || status === 499) {
|
|
||||||
return { category: "provider", retryable: true };
|
|
||||||
}
|
|
||||||
if (status >= 400 && status < 500) {
|
|
||||||
return { category: "provider", retryable: false };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
lower.includes("timeout") ||
|
|
||||||
lower.includes("timed out") ||
|
|
||||||
lower.includes("deadline") ||
|
|
||||||
lower.includes("abort") ||
|
|
||||||
lower.includes("etimedout")
|
|
||||||
) {
|
|
||||||
return { category: "timeout", retryable: true };
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
lower.includes("fetch failed") ||
|
|
||||||
lower.includes("network") ||
|
|
||||||
lower.includes("connection") ||
|
|
||||||
lower.includes("econnreset") ||
|
|
||||||
lower.includes("temporarily unavailable") ||
|
|
||||||
lower.includes("service unavailable") ||
|
|
||||||
lower.includes("rate limit") ||
|
|
||||||
lower.includes("overloaded")
|
|
||||||
) {
|
|
||||||
return { category: "transient", retryable: true };
|
|
||||||
}
|
|
||||||
|
|
||||||
return { category: "unknown", retryable: false };
|
|
||||||
}
|
|
||||||
|
|
||||||
function formatTerminalStatusMessage(error: unknown): string {
|
|
||||||
const code = getErrorCode(error);
|
|
||||||
if (code) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
const message = errorMessage(error).trim() || "Generation failed";
|
|
||||||
const { category } = categorizeError(error);
|
|
||||||
|
|
||||||
const prefixByCategory: Record<Exclude<ErrorCategory, "unknown">, string> = {
|
|
||||||
credits: "Credits",
|
|
||||||
policy: "Policy",
|
|
||||||
timeout: "Timeout",
|
|
||||||
transient: "Netzwerk",
|
|
||||||
provider: "Provider",
|
|
||||||
};
|
|
||||||
|
|
||||||
if (category === "unknown") {
|
|
||||||
return message;
|
|
||||||
}
|
|
||||||
|
|
||||||
const prefix = prefixByCategory[category];
|
|
||||||
if (message.toLowerCase().startsWith(prefix.toLowerCase())) {
|
|
||||||
return message;
|
|
||||||
}
|
|
||||||
|
|
||||||
return `${prefix}: ${message}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
function wait(ms: number) {
|
|
||||||
return new Promise<void>((resolve) => {
|
|
||||||
setTimeout(resolve, ms);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async function generateImageWithAutoRetry(
|
|
||||||
operation: () => Promise<Awaited<ReturnType<typeof generateImageViaOpenRouter>>>,
|
|
||||||
onRetry: (
|
|
||||||
retryCount: number,
|
|
||||||
maxRetries: number,
|
|
||||||
failure: { message: string; category: ErrorCategory }
|
|
||||||
) => Promise<void>
|
|
||||||
) {
|
|
||||||
let lastError: unknown = null;
|
|
||||||
const startedAt = Date.now();
|
|
||||||
|
|
||||||
for (let attempt = 0; attempt <= MAX_IMAGE_RETRIES; attempt++) {
|
|
||||||
const attemptStartedAt = Date.now();
|
|
||||||
try {
|
try {
|
||||||
const result = await operation();
|
await releaseFn(reservationId);
|
||||||
console.info("[generateImageWithAutoRetry] success", {
|
} catch {
|
||||||
attempts: attempt + 1,
|
// Keep node status updates best-effort even if credit release fails.
|
||||||
totalDurationMs: Date.now() - startedAt,
|
|
||||||
lastAttemptDurationMs: Date.now() - attemptStartedAt,
|
|
||||||
});
|
|
||||||
return result;
|
|
||||||
} catch (error) {
|
|
||||||
lastError = error;
|
|
||||||
const { retryable, category } = categorizeError(error);
|
|
||||||
const retryCount = attempt + 1;
|
|
||||||
const hasRemainingRetry = retryCount <= MAX_IMAGE_RETRIES;
|
|
||||||
|
|
||||||
console.warn("[generateImageWithAutoRetry] attempt failed", {
|
|
||||||
attempt: retryCount,
|
|
||||||
maxAttempts: MAX_IMAGE_RETRIES + 1,
|
|
||||||
retryable,
|
|
||||||
hasRemainingRetry,
|
|
||||||
category,
|
|
||||||
attemptDurationMs: Date.now() - attemptStartedAt,
|
|
||||||
totalDurationMs: Date.now() - startedAt,
|
|
||||||
message: errorMessage(error),
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!retryable || !hasRemainingRetry) {
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
|
|
||||||
await onRetry(retryCount, MAX_IMAGE_RETRIES, {
|
|
||||||
message: errorMessage(error),
|
|
||||||
category,
|
|
||||||
});
|
|
||||||
await wait(Math.min(1500, 400 * retryCount));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
throw lastError ?? new Error("Generation failed");
|
async function releasePublicReservationBestEffort(
|
||||||
|
reservationId: Id<"creditTransactions"> | null | undefined,
|
||||||
|
releaseFn: (transactionId: Id<"creditTransactions">) => Promise<unknown>
|
||||||
|
) {
|
||||||
|
if (!reservationId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await releaseFn(reservationId);
|
||||||
|
} catch {
|
||||||
|
// Prefer returning a clear node error over masking with cleanup failures.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function decrementConcurrencyIfNeeded(
|
||||||
|
shouldDecrement: boolean,
|
||||||
|
decrementFn: () => Promise<unknown>
|
||||||
|
) {
|
||||||
|
if (!shouldDecrement) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await decrementFn();
|
||||||
}
|
}
|
||||||
|
|
||||||
export const markNodeExecuting = internalMutation({
|
export const markNodeExecuting = internalMutation({
|
||||||
@@ -317,10 +137,7 @@ export const finalizeImageSuccess = internalMutation({
|
|||||||
throw new Error("Node not found");
|
throw new Error("Node not found");
|
||||||
}
|
}
|
||||||
|
|
||||||
const prev =
|
const prev = getNodeDataRecord(existing.data);
|
||||||
existing.data && typeof existing.data === "object"
|
|
||||||
? (existing.data as Record<string, unknown>)
|
|
||||||
: {};
|
|
||||||
const creditCost = modelConfig.creditCost;
|
const creditCost = modelConfig.creditCost;
|
||||||
const resolvedAspectRatio =
|
const resolvedAspectRatio =
|
||||||
aspectRatio?.trim() ||
|
aspectRatio?.trim() ||
|
||||||
@@ -411,7 +228,8 @@ export const generateAndStoreImage = internalAction({
|
|||||||
maxRetries,
|
maxRetries,
|
||||||
failureMessage: failure.message,
|
failureMessage: failure.message,
|
||||||
});
|
});
|
||||||
}
|
},
|
||||||
|
MAX_IMAGE_RETRIES
|
||||||
);
|
);
|
||||||
|
|
||||||
const decodeStartedAt = Date.now();
|
const decodeStartedAt = Date.now();
|
||||||
@@ -521,15 +339,13 @@ export const processImageGeneration = internalAction({
|
|||||||
message: errorMessage(error),
|
message: errorMessage(error),
|
||||||
});
|
});
|
||||||
|
|
||||||
if (args.reservationId) {
|
await releaseInternalReservationBestEffort(
|
||||||
try {
|
args.reservationId,
|
||||||
await ctx.runMutation(internal.credits.releaseInternal, {
|
(transactionId) =>
|
||||||
transactionId: args.reservationId,
|
ctx.runMutation(internal.credits.releaseInternal, {
|
||||||
});
|
transactionId,
|
||||||
} catch {
|
})
|
||||||
// Keep node status updates best-effort even if credit release fails.
|
);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await ctx.runMutation(internal.ai.finalizeImageFailure, {
|
await ctx.runMutation(internal.ai.finalizeImageFailure, {
|
||||||
nodeId: args.nodeId,
|
nodeId: args.nodeId,
|
||||||
@@ -537,11 +353,11 @@ export const processImageGeneration = internalAction({
|
|||||||
statusMessage: formatTerminalStatusMessage(error),
|
statusMessage: formatTerminalStatusMessage(error),
|
||||||
});
|
});
|
||||||
} finally {
|
} finally {
|
||||||
if (args.shouldDecrementConcurrency) {
|
await decrementConcurrencyIfNeeded(args.shouldDecrementConcurrency, () =>
|
||||||
await ctx.runMutation(internal.credits.decrementConcurrency, {
|
ctx.runMutation(internal.credits.decrementConcurrency, {
|
||||||
userId: args.userId,
|
userId: args.userId,
|
||||||
});
|
})
|
||||||
}
|
);
|
||||||
|
|
||||||
console.info("[processImageGeneration] finished", {
|
console.info("[processImageGeneration] finished", {
|
||||||
nodeId: args.nodeId,
|
nodeId: args.nodeId,
|
||||||
@@ -660,15 +476,11 @@ export const generateImage = action({
|
|||||||
message: errorMessage(error),
|
message: errorMessage(error),
|
||||||
});
|
});
|
||||||
|
|
||||||
if (reservationId) {
|
await releasePublicReservationBestEffort(reservationId, (transactionId) =>
|
||||||
try {
|
ctx.runMutation(api.credits.release, {
|
||||||
await ctx.runMutation(api.credits.release, {
|
transactionId,
|
||||||
transactionId: reservationId,
|
})
|
||||||
});
|
);
|
||||||
} catch {
|
|
||||||
// Prefer returning a clear node error over masking with cleanup failures.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await ctx.runMutation(internal.ai.finalizeImageFailure, {
|
await ctx.runMutation(internal.ai.finalizeImageFailure, {
|
||||||
nodeId: verifiedNodeId,
|
nodeId: verifiedNodeId,
|
||||||
@@ -678,11 +490,13 @@ export const generateImage = action({
|
|||||||
|
|
||||||
throw error;
|
throw error;
|
||||||
} finally {
|
} finally {
|
||||||
if (usageIncremented && !backgroundJobScheduled) {
|
await decrementConcurrencyIfNeeded(
|
||||||
await ctx.runMutation(internal.credits.decrementConcurrency, {
|
usageIncremented && !backgroundJobScheduled,
|
||||||
|
() =>
|
||||||
|
ctx.runMutation(internal.credits.decrementConcurrency, {
|
||||||
userId,
|
userId,
|
||||||
});
|
})
|
||||||
}
|
);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
@@ -703,10 +517,7 @@ export const setVideoTaskInfo = internalMutation({
|
|||||||
throw new Error("Node not found");
|
throw new Error("Node not found");
|
||||||
}
|
}
|
||||||
|
|
||||||
const prev =
|
const prev = getNodeDataRecord(node.data);
|
||||||
node.data && typeof node.data === "object"
|
|
||||||
? (node.data as Record<string, unknown>)
|
|
||||||
: {};
|
|
||||||
|
|
||||||
await ctx.db.patch(nodeId, {
|
await ctx.db.patch(nodeId, {
|
||||||
data: {
|
data: {
|
||||||
@@ -757,10 +568,7 @@ export const finalizeVideoSuccess = internalMutation({
|
|||||||
throw new Error("Node not found");
|
throw new Error("Node not found");
|
||||||
}
|
}
|
||||||
|
|
||||||
const prev =
|
const prev = getNodeDataRecord(existing.data);
|
||||||
existing.data && typeof existing.data === "object"
|
|
||||||
? (existing.data as Record<string, unknown>)
|
|
||||||
: {};
|
|
||||||
|
|
||||||
await ctx.db.patch(nodeId, {
|
await ctx.db.patch(nodeId, {
|
||||||
status: "done",
|
status: "done",
|
||||||
@@ -792,10 +600,7 @@ export const finalizeVideoFailure = internalMutation({
|
|||||||
if (!existing) {
|
if (!existing) {
|
||||||
throw new Error("Node not found");
|
throw new Error("Node not found");
|
||||||
}
|
}
|
||||||
const prev =
|
const prev = getNodeDataRecord(existing.data);
|
||||||
existing.data && typeof existing.data === "object"
|
|
||||||
? (existing.data as Record<string, unknown>)
|
|
||||||
: {};
|
|
||||||
|
|
||||||
await ctx.db.patch(nodeId, {
|
await ctx.db.patch(nodeId, {
|
||||||
status: "error",
|
status: "error",
|
||||||
@@ -854,7 +659,7 @@ export const processVideoGeneration = internalAction({
|
|||||||
taskId: task_id,
|
taskId: task_id,
|
||||||
});
|
});
|
||||||
|
|
||||||
await ctx.scheduler.runAfter(5000, internal.ai.pollVideoTask, {
|
await ctx.scheduler.runAfter(getVideoPollDelayMs(1), internal.ai.pollVideoTask, {
|
||||||
taskId: task_id,
|
taskId: task_id,
|
||||||
outputNodeId: args.outputNodeId,
|
outputNodeId: args.outputNodeId,
|
||||||
prompt: args.prompt,
|
prompt: args.prompt,
|
||||||
@@ -878,15 +683,13 @@ export const processVideoGeneration = internalAction({
|
|||||||
freepikBody: error instanceof FreepikApiError ? error.body : undefined,
|
freepikBody: error instanceof FreepikApiError ? error.body : undefined,
|
||||||
});
|
});
|
||||||
|
|
||||||
if (args.reservationId) {
|
await releaseInternalReservationBestEffort(
|
||||||
try {
|
args.reservationId,
|
||||||
await ctx.runMutation(internal.credits.releaseInternal, {
|
(transactionId) =>
|
||||||
transactionId: args.reservationId,
|
ctx.runMutation(internal.credits.releaseInternal, {
|
||||||
});
|
transactionId,
|
||||||
} catch {
|
})
|
||||||
// Keep node failure updates best-effort even if release fails.
|
);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await ctx.runMutation(internal.ai.finalizeVideoFailure, {
|
await ctx.runMutation(internal.ai.finalizeVideoFailure, {
|
||||||
nodeId: args.outputNodeId,
|
nodeId: args.outputNodeId,
|
||||||
@@ -894,11 +697,11 @@ export const processVideoGeneration = internalAction({
|
|||||||
statusMessage: formatTerminalStatusMessage(error),
|
statusMessage: formatTerminalStatusMessage(error),
|
||||||
});
|
});
|
||||||
|
|
||||||
if (args.shouldDecrementConcurrency) {
|
await decrementConcurrencyIfNeeded(args.shouldDecrementConcurrency, () =>
|
||||||
await ctx.runMutation(internal.credits.decrementConcurrency, {
|
ctx.runMutation(internal.credits.decrementConcurrency, {
|
||||||
userId: args.userId,
|
userId: args.userId,
|
||||||
});
|
})
|
||||||
}
|
);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
@@ -919,16 +722,21 @@ export const pollVideoTask = internalAction({
|
|||||||
},
|
},
|
||||||
handler: async (ctx, args) => {
|
handler: async (ctx, args) => {
|
||||||
const elapsedMs = Date.now() - args.startedAtMs;
|
const elapsedMs = Date.now() - args.startedAtMs;
|
||||||
if (args.attempt > MAX_VIDEO_POLL_ATTEMPTS || elapsedMs > MAX_VIDEO_POLL_TOTAL_MS) {
|
if (
|
||||||
if (args.reservationId) {
|
isVideoPollTimedOut({
|
||||||
try {
|
attempt: args.attempt,
|
||||||
await ctx.runMutation(internal.credits.releaseInternal, {
|
maxAttempts: MAX_VIDEO_POLL_ATTEMPTS,
|
||||||
transactionId: args.reservationId,
|
elapsedMs,
|
||||||
});
|
maxTotalMs: MAX_VIDEO_POLL_TOTAL_MS,
|
||||||
} catch {
|
})
|
||||||
// Keep node status updates best-effort.
|
) {
|
||||||
}
|
await releaseInternalReservationBestEffort(
|
||||||
}
|
args.reservationId,
|
||||||
|
(transactionId) =>
|
||||||
|
ctx.runMutation(internal.credits.releaseInternal, {
|
||||||
|
transactionId,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
await ctx.runMutation(internal.ai.finalizeVideoFailure, {
|
await ctx.runMutation(internal.ai.finalizeVideoFailure, {
|
||||||
nodeId: args.outputNodeId,
|
nodeId: args.outputNodeId,
|
||||||
@@ -936,11 +744,11 @@ export const pollVideoTask = internalAction({
|
|||||||
statusMessage: "Timeout: Video generation exceeded maximum polling time",
|
statusMessage: "Timeout: Video generation exceeded maximum polling time",
|
||||||
});
|
});
|
||||||
|
|
||||||
if (args.shouldDecrementConcurrency) {
|
await decrementConcurrencyIfNeeded(args.shouldDecrementConcurrency, () =>
|
||||||
await ctx.runMutation(internal.credits.decrementConcurrency, {
|
ctx.runMutation(internal.credits.decrementConcurrency, {
|
||||||
userId: args.userId,
|
userId: args.userId,
|
||||||
});
|
})
|
||||||
}
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -978,15 +786,13 @@ export const pollVideoTask = internalAction({
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (status.status === "FAILED") {
|
if (status.status === "FAILED") {
|
||||||
if (args.reservationId) {
|
await releaseInternalReservationBestEffort(
|
||||||
try {
|
args.reservationId,
|
||||||
await ctx.runMutation(internal.credits.releaseInternal, {
|
(transactionId) =>
|
||||||
transactionId: args.reservationId,
|
ctx.runMutation(internal.credits.releaseInternal, {
|
||||||
});
|
transactionId,
|
||||||
} catch {
|
})
|
||||||
// Keep node status updates best-effort.
|
);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await ctx.runMutation(internal.ai.finalizeVideoFailure, {
|
await ctx.runMutation(internal.ai.finalizeVideoFailure, {
|
||||||
nodeId: args.outputNodeId,
|
nodeId: args.outputNodeId,
|
||||||
@@ -994,11 +800,11 @@ export const pollVideoTask = internalAction({
|
|||||||
statusMessage: status.error?.trim() || "Provider: Video generation failed",
|
statusMessage: status.error?.trim() || "Provider: Video generation failed",
|
||||||
});
|
});
|
||||||
|
|
||||||
if (args.shouldDecrementConcurrency) {
|
await decrementConcurrencyIfNeeded(args.shouldDecrementConcurrency, () =>
|
||||||
await ctx.runMutation(internal.credits.decrementConcurrency, {
|
ctx.runMutation(internal.credits.decrementConcurrency, {
|
||||||
userId: args.userId,
|
userId: args.userId,
|
||||||
});
|
})
|
||||||
}
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1028,11 +834,11 @@ export const pollVideoTask = internalAction({
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
if (args.shouldDecrementConcurrency) {
|
await decrementConcurrencyIfNeeded(args.shouldDecrementConcurrency, () =>
|
||||||
await ctx.runMutation(internal.credits.decrementConcurrency, {
|
ctx.runMutation(internal.credits.decrementConcurrency, {
|
||||||
userId: args.userId,
|
userId: args.userId,
|
||||||
});
|
})
|
||||||
}
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@@ -1058,8 +864,7 @@ export const pollVideoTask = internalAction({
|
|||||||
failureMessage: errorMessage(error),
|
failureMessage: errorMessage(error),
|
||||||
});
|
});
|
||||||
|
|
||||||
const retryDelayMs =
|
const retryDelayMs = getVideoPollDelayMs(args.attempt);
|
||||||
args.attempt <= 5 ? 5000 : args.attempt <= 15 ? 10000 : 20000;
|
|
||||||
await ctx.scheduler.runAfter(retryDelayMs, internal.ai.pollVideoTask, {
|
await ctx.scheduler.runAfter(retryDelayMs, internal.ai.pollVideoTask, {
|
||||||
...args,
|
...args,
|
||||||
attempt: args.attempt + 1,
|
attempt: args.attempt + 1,
|
||||||
@@ -1067,15 +872,13 @@ export const pollVideoTask = internalAction({
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (args.reservationId) {
|
await releaseInternalReservationBestEffort(
|
||||||
try {
|
args.reservationId,
|
||||||
await ctx.runMutation(internal.credits.releaseInternal, {
|
(transactionId) =>
|
||||||
transactionId: args.reservationId,
|
ctx.runMutation(internal.credits.releaseInternal, {
|
||||||
});
|
transactionId,
|
||||||
} catch {
|
})
|
||||||
// Keep node status updates best-effort.
|
);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await ctx.runMutation(internal.ai.finalizeVideoFailure, {
|
await ctx.runMutation(internal.ai.finalizeVideoFailure, {
|
||||||
nodeId: args.outputNodeId,
|
nodeId: args.outputNodeId,
|
||||||
@@ -1083,15 +886,15 @@ export const pollVideoTask = internalAction({
|
|||||||
statusMessage: formatTerminalStatusMessage(error),
|
statusMessage: formatTerminalStatusMessage(error),
|
||||||
});
|
});
|
||||||
|
|
||||||
if (args.shouldDecrementConcurrency) {
|
await decrementConcurrencyIfNeeded(args.shouldDecrementConcurrency, () =>
|
||||||
await ctx.runMutation(internal.credits.decrementConcurrency, {
|
ctx.runMutation(internal.credits.decrementConcurrency, {
|
||||||
userId: args.userId,
|
userId: args.userId,
|
||||||
});
|
})
|
||||||
}
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const delayMs = args.attempt <= 5 ? 5000 : args.attempt <= 15 ? 10000 : 20000;
|
const delayMs = getVideoPollDelayMs(args.attempt);
|
||||||
await ctx.scheduler.runAfter(delayMs, internal.ai.pollVideoTask, {
|
await ctx.scheduler.runAfter(delayMs, internal.ai.pollVideoTask, {
|
||||||
...args,
|
...args,
|
||||||
attempt: args.attempt + 1,
|
attempt: args.attempt + 1,
|
||||||
@@ -1210,15 +1013,11 @@ export const generateVideo = action({
|
|||||||
|
|
||||||
return { queued: true, outputNodeId: args.outputNodeId };
|
return { queued: true, outputNodeId: args.outputNodeId };
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (reservationId) {
|
await releasePublicReservationBestEffort(reservationId, (transactionId) =>
|
||||||
try {
|
ctx.runMutation(api.credits.release, {
|
||||||
await ctx.runMutation(api.credits.release, {
|
transactionId,
|
||||||
transactionId: reservationId,
|
})
|
||||||
});
|
);
|
||||||
} catch {
|
|
||||||
// Prefer returning a clear node error over masking with cleanup failures.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await ctx.runMutation(internal.ai.finalizeVideoFailure, {
|
await ctx.runMutation(internal.ai.finalizeVideoFailure, {
|
||||||
nodeId: args.outputNodeId,
|
nodeId: args.outputNodeId,
|
||||||
@@ -1226,11 +1025,11 @@ export const generateVideo = action({
|
|||||||
statusMessage: formatTerminalStatusMessage(error),
|
statusMessage: formatTerminalStatusMessage(error),
|
||||||
});
|
});
|
||||||
|
|
||||||
if (usageIncremented) {
|
await decrementConcurrencyIfNeeded(usageIncremented, () =>
|
||||||
await ctx.runMutation(internal.credits.decrementConcurrency, {
|
ctx.runMutation(internal.credits.decrementConcurrency, {
|
||||||
userId,
|
userId,
|
||||||
});
|
})
|
||||||
}
|
);
|
||||||
|
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
|||||||
192
convex/ai_errors.ts
Normal file
192
convex/ai_errors.ts
Normal file
@@ -0,0 +1,192 @@
|
|||||||
|
import { ConvexError } from "convex/values";
|
||||||
|
import { FreepikApiError } from "./freepik";
|
||||||
|
|
||||||
|
export type ErrorCategory =
|
||||||
|
| "credits"
|
||||||
|
| "policy"
|
||||||
|
| "timeout"
|
||||||
|
| "transient"
|
||||||
|
| "provider"
|
||||||
|
| "unknown";
|
||||||
|
|
||||||
|
interface ErrorData {
|
||||||
|
code?: string;
|
||||||
|
[key: string]: unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getErrorCode(error: unknown): string | undefined {
|
||||||
|
if (error instanceof ConvexError) {
|
||||||
|
const data = error.data as ErrorData;
|
||||||
|
return data?.code;
|
||||||
|
}
|
||||||
|
if (error instanceof FreepikApiError) {
|
||||||
|
return error.code;
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getErrorSource(error: unknown): string | undefined {
|
||||||
|
if (error instanceof FreepikApiError) {
|
||||||
|
return error.source;
|
||||||
|
}
|
||||||
|
if (error && typeof error === "object") {
|
||||||
|
const source = (error as { source?: unknown }).source;
|
||||||
|
return typeof source === "string" ? source : undefined;
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getProviderStatus(error: unknown): number | null {
|
||||||
|
if (error instanceof FreepikApiError) {
|
||||||
|
return typeof error.status === "number" ? error.status : null;
|
||||||
|
}
|
||||||
|
if (error && typeof error === "object") {
|
||||||
|
const status = (error as { status?: unknown }).status;
|
||||||
|
if (typeof status === "number" && Number.isFinite(status)) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function errorMessage(error: unknown): string {
|
||||||
|
if (error instanceof Error) return error.message;
|
||||||
|
return String(error ?? "Generation failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseOpenRouterStatus(message: string): number | null {
|
||||||
|
const match = message.match(/OpenRouter API error\s+(\d+)/i);
|
||||||
|
if (!match) return null;
|
||||||
|
const parsed = Number(match[1]);
|
||||||
|
return Number.isFinite(parsed) ? parsed : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function categorizeError(error: unknown): {
|
||||||
|
category: ErrorCategory;
|
||||||
|
retryable: boolean;
|
||||||
|
} {
|
||||||
|
const code = getErrorCode(error);
|
||||||
|
const source = getErrorSource(error);
|
||||||
|
const message = errorMessage(error);
|
||||||
|
const lower = message.toLowerCase();
|
||||||
|
const status = getProviderStatus(error) ?? parseOpenRouterStatus(message);
|
||||||
|
|
||||||
|
if (source === "freepik") {
|
||||||
|
if (code === "model_unavailable") {
|
||||||
|
return {
|
||||||
|
category: "provider",
|
||||||
|
retryable: status === 503,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (code === "timeout") {
|
||||||
|
return { category: "timeout", retryable: true };
|
||||||
|
}
|
||||||
|
if (code === "transient") {
|
||||||
|
return { category: "transient", retryable: true };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
code === "CREDITS_TEST_DISABLED" ||
|
||||||
|
code === "CREDITS_INVALID_AMOUNT" ||
|
||||||
|
code === "CREDITS_BALANCE_NOT_FOUND" ||
|
||||||
|
code === "CREDITS_DAILY_CAP_REACHED" ||
|
||||||
|
code === "CREDITS_CONCURRENCY_LIMIT"
|
||||||
|
) {
|
||||||
|
return { category: "credits", retryable: false };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
code === "OPENROUTER_MODEL_REFUSAL" ||
|
||||||
|
lower.includes("content policy") ||
|
||||||
|
lower.includes("policy") ||
|
||||||
|
lower.includes("moderation") ||
|
||||||
|
lower.includes("safety") ||
|
||||||
|
lower.includes("refusal") ||
|
||||||
|
lower.includes("policy_violation")
|
||||||
|
) {
|
||||||
|
return { category: "policy", retryable: false };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status !== null) {
|
||||||
|
if (status >= 500 || status === 408 || status === 429 || status === 499) {
|
||||||
|
return { category: "provider", retryable: true };
|
||||||
|
}
|
||||||
|
if (status >= 400 && status < 500) {
|
||||||
|
return { category: "provider", retryable: false };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
lower.includes("timeout") ||
|
||||||
|
lower.includes("timed out") ||
|
||||||
|
lower.includes("deadline") ||
|
||||||
|
lower.includes("abort") ||
|
||||||
|
lower.includes("etimedout")
|
||||||
|
) {
|
||||||
|
return { category: "timeout", retryable: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
lower.includes("fetch failed") ||
|
||||||
|
lower.includes("network") ||
|
||||||
|
lower.includes("connection") ||
|
||||||
|
lower.includes("econnreset") ||
|
||||||
|
lower.includes("temporarily unavailable") ||
|
||||||
|
lower.includes("service unavailable") ||
|
||||||
|
lower.includes("rate limit") ||
|
||||||
|
lower.includes("overloaded")
|
||||||
|
) {
|
||||||
|
return { category: "transient", retryable: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
return { category: "unknown", retryable: false };
|
||||||
|
}
|
||||||
|
|
||||||
|
export function formatTerminalStatusMessage(error: unknown): string {
|
||||||
|
const code = getErrorCode(error);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = errorMessage(error).trim() || "Generation failed";
|
||||||
|
const { category } = categorizeError(error);
|
||||||
|
|
||||||
|
const prefixByCategory: Record<Exclude<ErrorCategory, "unknown">, string> = {
|
||||||
|
credits: "Credits",
|
||||||
|
policy: "Policy",
|
||||||
|
timeout: "Timeout",
|
||||||
|
transient: "Netzwerk",
|
||||||
|
provider: "Provider",
|
||||||
|
};
|
||||||
|
|
||||||
|
if (category === "unknown") {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
const prefix = prefixByCategory[category];
|
||||||
|
if (message.toLowerCase().startsWith(prefix.toLowerCase())) {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
return `${prefix}: ${message}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getVideoPollDelayMs(attempt: number): number {
|
||||||
|
if (attempt <= 5) {
|
||||||
|
return 5000;
|
||||||
|
}
|
||||||
|
if (attempt <= 15) {
|
||||||
|
return 10000;
|
||||||
|
}
|
||||||
|
return 20000;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function isVideoPollTimedOut(args: {
|
||||||
|
attempt: number;
|
||||||
|
maxAttempts: number;
|
||||||
|
elapsedMs: number;
|
||||||
|
maxTotalMs: number;
|
||||||
|
}): boolean {
|
||||||
|
return args.attempt > args.maxAttempts || args.elapsedMs > args.maxTotalMs;
|
||||||
|
}
|
||||||
6
convex/ai_node_data.ts
Normal file
6
convex/ai_node_data.ts
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
export function getNodeDataRecord(data: unknown): Record<string, unknown> {
|
||||||
|
if (data && typeof data === "object") {
|
||||||
|
return data as Record<string, unknown>;
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
61
convex/ai_retry.ts
Normal file
61
convex/ai_retry.ts
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import { categorizeError, errorMessage, type ErrorCategory } from "./ai_errors";
|
||||||
|
|
||||||
|
function wait(ms: number) {
|
||||||
|
return new Promise<void>((resolve) => {
|
||||||
|
setTimeout(resolve, ms);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function generateImageWithAutoRetry<T>(
|
||||||
|
operation: () => Promise<T>,
|
||||||
|
onRetry: (
|
||||||
|
retryCount: number,
|
||||||
|
maxRetries: number,
|
||||||
|
failure: { message: string; category: ErrorCategory }
|
||||||
|
) => Promise<void>,
|
||||||
|
maxRetries: number
|
||||||
|
): Promise<T> {
|
||||||
|
let lastError: unknown = null;
|
||||||
|
const startedAt = Date.now();
|
||||||
|
|
||||||
|
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||||
|
const attemptStartedAt = Date.now();
|
||||||
|
try {
|
||||||
|
const result = await operation();
|
||||||
|
console.info("[generateImageWithAutoRetry] success", {
|
||||||
|
attempts: attempt + 1,
|
||||||
|
totalDurationMs: Date.now() - startedAt,
|
||||||
|
lastAttemptDurationMs: Date.now() - attemptStartedAt,
|
||||||
|
});
|
||||||
|
return result;
|
||||||
|
} catch (error) {
|
||||||
|
lastError = error;
|
||||||
|
const { retryable, category } = categorizeError(error);
|
||||||
|
const retryCount = attempt + 1;
|
||||||
|
const hasRemainingRetry = retryCount <= maxRetries;
|
||||||
|
|
||||||
|
console.warn("[generateImageWithAutoRetry] attempt failed", {
|
||||||
|
attempt: retryCount,
|
||||||
|
maxAttempts: maxRetries + 1,
|
||||||
|
retryable,
|
||||||
|
hasRemainingRetry,
|
||||||
|
category,
|
||||||
|
attemptDurationMs: Date.now() - attemptStartedAt,
|
||||||
|
totalDurationMs: Date.now() - startedAt,
|
||||||
|
message: errorMessage(error),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!retryable || !hasRemainingRetry) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
|
await onRetry(retryCount, maxRetries, {
|
||||||
|
message: errorMessage(error),
|
||||||
|
category,
|
||||||
|
});
|
||||||
|
await wait(Math.min(1500, 400 * retryCount));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw lastError ?? new Error("Generation failed");
|
||||||
|
}
|
||||||
68
tests/convex/ai-errors.test.ts
Normal file
68
tests/convex/ai-errors.test.ts
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
import { FreepikApiError } from "@/convex/freepik";
|
||||||
|
import {
|
||||||
|
categorizeError,
|
||||||
|
formatTerminalStatusMessage,
|
||||||
|
getVideoPollDelayMs,
|
||||||
|
isVideoPollTimedOut,
|
||||||
|
} from "@/convex/ai_errors";
|
||||||
|
|
||||||
|
describe("ai error helpers", () => {
|
||||||
|
it("marks provider 503 failures as retryable", () => {
|
||||||
|
const result = categorizeError(new Error("OpenRouter API error 503"));
|
||||||
|
expect(result).toEqual({ category: "provider", retryable: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("maps Freepik timeout to timeout category", () => {
|
||||||
|
const error = new FreepikApiError({
|
||||||
|
code: "timeout",
|
||||||
|
message: "Task polling timeout",
|
||||||
|
retryable: true,
|
||||||
|
status: 504,
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = categorizeError(error);
|
||||||
|
expect(result).toEqual({ category: "timeout", retryable: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("formats terminal status with translated transient prefix", () => {
|
||||||
|
expect(formatTerminalStatusMessage(new Error("network disconnected"))).toBe(
|
||||||
|
"Netzwerk: network disconnected",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("uses staged poll delays", () => {
|
||||||
|
expect(getVideoPollDelayMs(1)).toBe(5000);
|
||||||
|
expect(getVideoPollDelayMs(9)).toBe(10000);
|
||||||
|
expect(getVideoPollDelayMs(20)).toBe(20000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("detects poll timeout by attempts and elapsed time", () => {
|
||||||
|
expect(
|
||||||
|
isVideoPollTimedOut({
|
||||||
|
attempt: 31,
|
||||||
|
maxAttempts: 30,
|
||||||
|
elapsedMs: 1000,
|
||||||
|
maxTotalMs: 600000,
|
||||||
|
}),
|
||||||
|
).toBe(true);
|
||||||
|
|
||||||
|
expect(
|
||||||
|
isVideoPollTimedOut({
|
||||||
|
attempt: 10,
|
||||||
|
maxAttempts: 30,
|
||||||
|
elapsedMs: 700000,
|
||||||
|
maxTotalMs: 600000,
|
||||||
|
}),
|
||||||
|
).toBe(true);
|
||||||
|
|
||||||
|
expect(
|
||||||
|
isVideoPollTimedOut({
|
||||||
|
attempt: 10,
|
||||||
|
maxAttempts: 30,
|
||||||
|
elapsedMs: 200000,
|
||||||
|
maxTotalMs: 600000,
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user