Enable offline delete and reconnect queue sync

This commit is contained in:
Matthias
2026-04-01 10:37:20 +02:00
parent da576c1400
commit b6187210c7
7 changed files with 441 additions and 77 deletions

View File

@@ -57,6 +57,12 @@ export type CanvasSyncOpPayloadByType = {
targetHandle?: string;
clientRequestId: string;
};
removeEdge: {
edgeId: Id<"edges">;
};
batchRemoveNodes: {
nodeIds: Id<"nodes">[];
};
moveNode: { nodeId: Id<"nodes">; positionX: number; positionY: number };
resizeNode: { nodeId: Id<"nodes">; width: number; height: number };
updateData: { nodeId: Id<"nodes">; data: unknown };
@@ -210,6 +216,8 @@ function normalizeOp(raw: unknown): CanvasSyncOp | null {
type !== "createNodeWithEdgeFromSource" &&
type !== "createNodeWithEdgeToTarget" &&
type !== "createEdge" &&
type !== "removeEdge" &&
type !== "batchRemoveNodes" &&
type !== "moveNode" &&
type !== "resizeNode" &&
type !== "updateData"
@@ -393,6 +401,45 @@ function normalizeOp(raw: unknown): CanvasSyncOp | null {
};
}
if (
type === "removeEdge" &&
typeof payload.edgeId === "string"
) {
return {
id,
canvasId,
type,
payload: {
edgeId: payload.edgeId as Id<"edges">,
},
enqueuedAt,
attemptCount,
nextRetryAt,
expiresAt,
lastError,
};
}
if (
type === "batchRemoveNodes" &&
Array.isArray(payload.nodeIds) &&
payload.nodeIds.every((entry) => typeof entry === "string")
) {
return {
id,
canvasId,
type,
payload: {
nodeIds: payload.nodeIds as Id<"nodes">[],
},
enqueuedAt,
attemptCount,
nextRetryAt,
expiresAt,
lastError,
};
}
if (
type === "moveNode" &&
typeof payload.nodeId === "string" &&
@@ -713,6 +760,20 @@ function remapNodeIdInPayload(
return { ...op, payload: next };
}
}
if (op.type === "batchRemoveNodes") {
if (!op.payload.nodeIds.includes(fromNodeId as Id<"nodes">)) {
return op;
}
return {
...op,
payload: {
...op.payload,
nodeIds: op.payload.nodeIds.map((nodeId) =>
nodeId === fromNodeId ? (toNodeId as Id<"nodes">) : nodeId,
),
},
};
}
return op;
}
@@ -747,3 +808,114 @@ export async function remapCanvasSyncNodeId(
await txDone(tx);
return changed;
}
function opTouchesNodeId(op: CanvasSyncOp, nodeIdSet: ReadonlySet<string>): boolean {
if (op.type === "moveNode" || op.type === "resizeNode" || op.type === "updateData") {
return nodeIdSet.has(op.payload.nodeId);
}
if (op.type === "createEdge") {
return (
nodeIdSet.has(op.payload.sourceNodeId) || nodeIdSet.has(op.payload.targetNodeId)
);
}
if (op.type === "createNode") {
return op.payload.parentId !== undefined && nodeIdSet.has(op.payload.parentId);
}
if (op.type === "createNodeWithEdgeFromSource") {
return (
nodeIdSet.has(op.payload.sourceNodeId) ||
(op.payload.parentId !== undefined && nodeIdSet.has(op.payload.parentId))
);
}
if (op.type === "createNodeWithEdgeToTarget") {
return (
nodeIdSet.has(op.payload.targetNodeId) ||
(op.payload.parentId !== undefined && nodeIdSet.has(op.payload.parentId))
);
}
if (op.type === "batchRemoveNodes") {
return op.payload.nodeIds.some((nodeId) => nodeIdSet.has(nodeId));
}
return false;
}
function opHasClientRequestId(op: CanvasSyncOp, clientRequestIdSet: ReadonlySet<string>): boolean {
if (op.type === "createNode") {
return clientRequestIdSet.has(op.payload.clientRequestId);
}
if (op.type === "createNodeWithEdgeFromSource") {
return clientRequestIdSet.has(op.payload.clientRequestId);
}
if (op.type === "createNodeWithEdgeToTarget") {
return clientRequestIdSet.has(op.payload.clientRequestId);
}
if (op.type === "createEdge") {
return clientRequestIdSet.has(op.payload.clientRequestId);
}
return false;
}
function opTouchesEdgeId(op: CanvasSyncOp, edgeIdSet: ReadonlySet<string>): boolean {
if (op.type === "removeEdge") {
return edgeIdSet.has(op.payload.edgeId);
}
return false;
}
async function dropCanvasSyncOpsByPredicate(
canvasId: string,
predicate: (op: CanvasSyncOp) => boolean,
): Promise<string[]> {
const all = await listCanvasSyncOps(canvasId);
const idsToDrop = all.filter(predicate).map((entry) => entry.id);
if (idsToDrop.length === 0) return [];
const idSet = new Set(idsToDrop);
const db = await openDb();
if (!db) {
const fallback = readFallbackOps().filter((entry) => !idSet.has(entry.id));
writeFallbackOps(fallback);
return idsToDrop;
}
const tx = db.transaction(STORE_NAME, "readwrite");
const store = tx.objectStore(STORE_NAME);
for (const id of idsToDrop) {
store.delete(id);
}
await txDone(tx);
return idsToDrop;
}
export async function dropCanvasSyncOpsByNodeIds(
canvasId: string,
nodeIds: string[],
): Promise<string[]> {
if (nodeIds.length === 0) return [];
const nodeIdSet = new Set(nodeIds);
return await dropCanvasSyncOpsByPredicate(canvasId, (op) =>
opTouchesNodeId(op, nodeIdSet),
);
}
export async function dropCanvasSyncOpsByClientRequestIds(
canvasId: string,
clientRequestIds: string[],
): Promise<string[]> {
if (clientRequestIds.length === 0) return [];
const idSet = new Set(clientRequestIds);
return await dropCanvasSyncOpsByPredicate(canvasId, (op) =>
opHasClientRequestId(op, idSet),
);
}
export async function dropCanvasSyncOpsByEdgeIds(
canvasId: string,
edgeIds: string[],
): Promise<string[]> {
if (edgeIds.length === 0) return [];
const edgeIdSet = new Set(edgeIds);
return await dropCanvasSyncOpsByPredicate(canvasId, (op) =>
opTouchesEdgeId(op, edgeIdSet),
);
}