Implement local-first canvas sync and fix drag edge stability
This commit is contained in:
@@ -151,6 +151,7 @@ export function enqueueCanvasOp(
|
||||
enqueuedAt: op.enqueuedAt ?? Date.now(),
|
||||
};
|
||||
const payload = readOpsPayload(canvasId);
|
||||
payload.ops = payload.ops.filter((candidate) => candidate.id !== entry.id);
|
||||
payload.ops.push(entry);
|
||||
payload.updatedAt = Date.now();
|
||||
writePayload(opsKey(canvasId), payload);
|
||||
@@ -166,6 +167,17 @@ export function resolveCanvasOp(canvasId: string, opId: string): void {
|
||||
writePayload(opsKey(canvasId), payload);
|
||||
}
|
||||
|
||||
export function resolveCanvasOps(canvasId: string, opIds: string[]): void {
|
||||
if (opIds.length === 0) return;
|
||||
const idSet = new Set(opIds);
|
||||
const payload = readOpsPayload(canvasId);
|
||||
const nextOps = payload.ops.filter((op) => !idSet.has(op.id));
|
||||
if (nextOps.length === payload.ops.length) return;
|
||||
payload.ops = nextOps;
|
||||
payload.updatedAt = Date.now();
|
||||
writePayload(opsKey(canvasId), payload);
|
||||
}
|
||||
|
||||
export function readCanvasOps(canvasId: string): CanvasPendingOp[] {
|
||||
return readOpsPayload(canvasId).ops;
|
||||
}
|
||||
|
||||
420
lib/canvas-op-queue.ts
Normal file
420
lib/canvas-op-queue.ts
Normal file
@@ -0,0 +1,420 @@
|
||||
import type { Id } from "@/convex/_generated/dataModel";
|
||||
|
||||
const DB_NAME = "lemonspace.canvas.sync";
|
||||
const DB_VERSION = 1;
|
||||
const STORE_NAME = "ops";
|
||||
const FALLBACK_STORAGE_KEY = "lemonspace.canvas:sync-fallback:v1";
|
||||
export const CANVAS_SYNC_RETENTION_MS = 24 * 60 * 60 * 1000;
|
||||
|
||||
export type CanvasSyncOpPayloadByType = {
|
||||
moveNode: { nodeId: Id<"nodes">; positionX: number; positionY: number };
|
||||
resizeNode: { nodeId: Id<"nodes">; width: number; height: number };
|
||||
updateData: { nodeId: Id<"nodes">; data: unknown };
|
||||
};
|
||||
|
||||
export type CanvasSyncOpType = keyof CanvasSyncOpPayloadByType;
|
||||
|
||||
type CanvasSyncOpBase = {
|
||||
id: string;
|
||||
canvasId: string;
|
||||
enqueuedAt: number;
|
||||
attemptCount: number;
|
||||
nextRetryAt: number;
|
||||
expiresAt: number;
|
||||
lastError?: string;
|
||||
};
|
||||
|
||||
export type CanvasSyncOp = {
|
||||
[TType in CanvasSyncOpType]: CanvasSyncOpBase & {
|
||||
type: TType;
|
||||
payload: CanvasSyncOpPayloadByType[TType];
|
||||
};
|
||||
}[CanvasSyncOpType];
|
||||
|
||||
type CanvasSyncOpFor<TType extends CanvasSyncOpType> = Extract<
|
||||
CanvasSyncOp,
|
||||
{ type: TType }
|
||||
>;
|
||||
|
||||
type JsonRecord = Record<string, unknown>;
|
||||
|
||||
type EnqueueInput<TType extends CanvasSyncOpType> = {
|
||||
id: string;
|
||||
canvasId: string;
|
||||
type: TType;
|
||||
payload: CanvasSyncOpPayloadByType[TType];
|
||||
now?: number;
|
||||
};
|
||||
|
||||
let dbPromise: Promise<IDBDatabase | null> | null = null;
|
||||
|
||||
function isRecord(value: unknown): value is JsonRecord {
|
||||
return typeof value === "object" && value !== null;
|
||||
}
|
||||
|
||||
function getLocalStorage(): Storage | null {
|
||||
if (typeof window === "undefined") return null;
|
||||
try {
|
||||
return window.localStorage;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function safeParse(raw: string | null): unknown {
|
||||
if (!raw) return null;
|
||||
try {
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function readFallbackOps(): CanvasSyncOp[] {
|
||||
const storage = getLocalStorage();
|
||||
if (!storage) return [];
|
||||
const parsed = safeParse(storage.getItem(FALLBACK_STORAGE_KEY));
|
||||
if (!Array.isArray(parsed)) return [];
|
||||
return parsed
|
||||
.filter((entry): entry is JsonRecord => isRecord(entry))
|
||||
.map(normalizeOp)
|
||||
.filter((entry): entry is CanvasSyncOp => entry !== null);
|
||||
}
|
||||
|
||||
function writeFallbackOps(ops: CanvasSyncOp[]): void {
|
||||
const storage = getLocalStorage();
|
||||
if (!storage) return;
|
||||
try {
|
||||
storage.setItem(FALLBACK_STORAGE_KEY, JSON.stringify(ops));
|
||||
} catch {
|
||||
// Ignore storage quota failures in fallback layer.
|
||||
}
|
||||
}
|
||||
|
||||
function openDb(): Promise<IDBDatabase | null> {
|
||||
if (typeof window === "undefined" || typeof indexedDB === "undefined") {
|
||||
return Promise.resolve(null);
|
||||
}
|
||||
if (dbPromise) return dbPromise;
|
||||
|
||||
dbPromise = new Promise((resolve) => {
|
||||
const request = indexedDB.open(DB_NAME, DB_VERSION);
|
||||
|
||||
request.onupgradeneeded = () => {
|
||||
const db = request.result;
|
||||
if (db.objectStoreNames.contains(STORE_NAME)) return;
|
||||
const store = db.createObjectStore(STORE_NAME, { keyPath: "id" });
|
||||
store.createIndex("by_canvas", "canvasId", { unique: false });
|
||||
store.createIndex("by_nextRetryAt", "nextRetryAt", { unique: false });
|
||||
};
|
||||
|
||||
request.onsuccess = () => {
|
||||
const db = request.result;
|
||||
db.onversionchange = () => {
|
||||
db.close();
|
||||
dbPromise = null;
|
||||
};
|
||||
resolve(db);
|
||||
};
|
||||
|
||||
request.onerror = () => {
|
||||
resolve(null);
|
||||
};
|
||||
});
|
||||
|
||||
return dbPromise;
|
||||
}
|
||||
|
||||
function txDone(tx: IDBTransaction): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
tx.oncomplete = () => resolve();
|
||||
tx.onerror = () => reject(tx.error ?? new Error("IndexedDB transaction failed"));
|
||||
tx.onabort = () => reject(tx.error ?? new Error("IndexedDB transaction aborted"));
|
||||
});
|
||||
}
|
||||
|
||||
function reqToPromise<T>(req: IDBRequest<T>): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
req.onsuccess = () => resolve(req.result);
|
||||
req.onerror = () => reject(req.error ?? new Error("IndexedDB request failed"));
|
||||
});
|
||||
}
|
||||
|
||||
function getNodeIdFromOp(op: CanvasSyncOp): string {
|
||||
const payload = op.payload as { nodeId?: string };
|
||||
return typeof payload.nodeId === "string" ? payload.nodeId : "";
|
||||
}
|
||||
|
||||
function normalizeOp(raw: unknown): CanvasSyncOp | null {
|
||||
if (!isRecord(raw)) return null;
|
||||
const id = raw.id;
|
||||
const canvasId = raw.canvasId;
|
||||
const type = raw.type;
|
||||
const payload = raw.payload;
|
||||
if (
|
||||
typeof id !== "string" ||
|
||||
!id ||
|
||||
typeof canvasId !== "string" ||
|
||||
!canvasId ||
|
||||
(type !== "moveNode" && type !== "resizeNode" && type !== "updateData")
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const enqueuedAt = typeof raw.enqueuedAt === "number" ? raw.enqueuedAt : Date.now();
|
||||
const attemptCount = typeof raw.attemptCount === "number" ? raw.attemptCount : 0;
|
||||
const nextRetryAt =
|
||||
typeof raw.nextRetryAt === "number" ? raw.nextRetryAt : enqueuedAt;
|
||||
const expiresAt =
|
||||
typeof raw.expiresAt === "number"
|
||||
? raw.expiresAt
|
||||
: enqueuedAt + CANVAS_SYNC_RETENTION_MS;
|
||||
const lastError = typeof raw.lastError === "string" ? raw.lastError : undefined;
|
||||
|
||||
if (!isRecord(payload)) return null;
|
||||
|
||||
if (
|
||||
type === "moveNode" &&
|
||||
typeof payload.nodeId === "string" &&
|
||||
typeof payload.positionX === "number" &&
|
||||
typeof payload.positionY === "number"
|
||||
) {
|
||||
return {
|
||||
id,
|
||||
canvasId,
|
||||
type,
|
||||
payload: {
|
||||
nodeId: payload.nodeId as Id<"nodes">,
|
||||
positionX: payload.positionX,
|
||||
positionY: payload.positionY,
|
||||
},
|
||||
enqueuedAt,
|
||||
attemptCount,
|
||||
nextRetryAt,
|
||||
expiresAt,
|
||||
lastError,
|
||||
};
|
||||
}
|
||||
|
||||
if (
|
||||
type === "resizeNode" &&
|
||||
typeof payload.nodeId === "string" &&
|
||||
typeof payload.width === "number" &&
|
||||
typeof payload.height === "number"
|
||||
) {
|
||||
return {
|
||||
id,
|
||||
canvasId,
|
||||
type,
|
||||
payload: {
|
||||
nodeId: payload.nodeId as Id<"nodes">,
|
||||
width: payload.width,
|
||||
height: payload.height,
|
||||
},
|
||||
enqueuedAt,
|
||||
attemptCount,
|
||||
nextRetryAt,
|
||||
expiresAt,
|
||||
lastError,
|
||||
};
|
||||
}
|
||||
|
||||
if (type === "updateData" && typeof payload.nodeId === "string") {
|
||||
return {
|
||||
id,
|
||||
canvasId,
|
||||
type,
|
||||
payload: {
|
||||
nodeId: payload.nodeId as Id<"nodes">,
|
||||
data: payload.data,
|
||||
},
|
||||
enqueuedAt,
|
||||
attemptCount,
|
||||
nextRetryAt,
|
||||
expiresAt,
|
||||
lastError,
|
||||
};
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
function sortByEnqueued(a: CanvasSyncOp, b: CanvasSyncOp): number {
|
||||
if (a.enqueuedAt === b.enqueuedAt) return a.id.localeCompare(b.id);
|
||||
return a.enqueuedAt - b.enqueuedAt;
|
||||
}
|
||||
|
||||
function toStoredOp<TType extends CanvasSyncOpType>(
|
||||
input: EnqueueInput<TType>,
|
||||
): CanvasSyncOpFor<TType> {
|
||||
const now = input.now ?? Date.now();
|
||||
return {
|
||||
id: input.id,
|
||||
canvasId: input.canvasId,
|
||||
type: input.type,
|
||||
payload: input.payload,
|
||||
enqueuedAt: now,
|
||||
attemptCount: 0,
|
||||
nextRetryAt: now,
|
||||
expiresAt: now + CANVAS_SYNC_RETENTION_MS,
|
||||
} as CanvasSyncOpFor<TType>;
|
||||
}
|
||||
|
||||
function coalescingNodeId(
|
||||
op: Pick<CanvasSyncOp, "type" | "payload">,
|
||||
): string | null {
|
||||
if (op.type !== "moveNode" && op.type !== "resizeNode" && op.type !== "updateData") {
|
||||
return null;
|
||||
}
|
||||
const payload = op.payload as { nodeId?: string };
|
||||
return typeof payload.nodeId === "string" && payload.nodeId.length > 0
|
||||
? payload.nodeId
|
||||
: null;
|
||||
}
|
||||
|
||||
export async function listCanvasSyncOps(canvasId: string): Promise<CanvasSyncOp[]> {
|
||||
const db = await openDb();
|
||||
if (!db) {
|
||||
return readFallbackOps()
|
||||
.filter((entry) => entry.canvasId === canvasId)
|
||||
.sort(sortByEnqueued);
|
||||
}
|
||||
|
||||
const tx = db.transaction(STORE_NAME, "readonly");
|
||||
const store = tx.objectStore(STORE_NAME);
|
||||
const byCanvas = store.index("by_canvas");
|
||||
const records = await reqToPromise(byCanvas.getAll(canvasId));
|
||||
return (records as unknown[])
|
||||
.map(normalizeOp)
|
||||
.filter((entry): entry is CanvasSyncOp => entry !== null)
|
||||
.sort(sortByEnqueued);
|
||||
}
|
||||
|
||||
export async function countCanvasSyncOps(canvasId: string): Promise<number> {
|
||||
const db = await openDb();
|
||||
if (!db) {
|
||||
return readFallbackOps().filter((entry) => entry.canvasId === canvasId).length;
|
||||
}
|
||||
const tx = db.transaction(STORE_NAME, "readonly");
|
||||
const store = tx.objectStore(STORE_NAME);
|
||||
const byCanvas = store.index("by_canvas");
|
||||
const count = await reqToPromise(byCanvas.count(canvasId));
|
||||
return count;
|
||||
}
|
||||
|
||||
export async function enqueueCanvasSyncOp<TType extends CanvasSyncOpType>(
|
||||
input: EnqueueInput<TType>,
|
||||
): Promise<{ replacedIds: string[] }> {
|
||||
const op = toStoredOp(input);
|
||||
const existing = await listCanvasSyncOps(input.canvasId);
|
||||
const nodeId = coalescingNodeId(op);
|
||||
const replacedIds: string[] = [];
|
||||
|
||||
for (const candidate of existing) {
|
||||
if (candidate.type !== op.type) continue;
|
||||
if (nodeId === null) continue;
|
||||
if (getNodeIdFromOp(candidate) !== nodeId) continue;
|
||||
replacedIds.push(candidate.id);
|
||||
}
|
||||
|
||||
const db = await openDb();
|
||||
if (!db) {
|
||||
const fallback = readFallbackOps().filter(
|
||||
(entry) => !replacedIds.includes(entry.id),
|
||||
);
|
||||
fallback.push(op);
|
||||
writeFallbackOps(fallback);
|
||||
return { replacedIds };
|
||||
}
|
||||
|
||||
const tx = db.transaction(STORE_NAME, "readwrite");
|
||||
const store = tx.objectStore(STORE_NAME);
|
||||
for (const id of replacedIds) {
|
||||
store.delete(id);
|
||||
}
|
||||
store.put(op);
|
||||
await txDone(tx);
|
||||
return { replacedIds };
|
||||
}
|
||||
|
||||
export async function ackCanvasSyncOp(opId: string): Promise<void> {
|
||||
const db = await openDb();
|
||||
if (!db) {
|
||||
const fallback = readFallbackOps().filter((entry) => entry.id !== opId);
|
||||
writeFallbackOps(fallback);
|
||||
return;
|
||||
}
|
||||
|
||||
const tx = db.transaction(STORE_NAME, "readwrite");
|
||||
tx.objectStore(STORE_NAME).delete(opId);
|
||||
await txDone(tx);
|
||||
}
|
||||
|
||||
export async function markCanvasSyncOpFailed(
|
||||
opId: string,
|
||||
opts: { nextRetryAt: number; lastError?: string },
|
||||
): Promise<void> {
|
||||
const db = await openDb();
|
||||
if (!db) {
|
||||
const fallback = readFallbackOps().map((entry) => {
|
||||
if (entry.id !== opId) return entry;
|
||||
return {
|
||||
...entry,
|
||||
attemptCount: entry.attemptCount + 1,
|
||||
nextRetryAt: opts.nextRetryAt,
|
||||
lastError: opts.lastError,
|
||||
};
|
||||
});
|
||||
writeFallbackOps(fallback);
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const tx = db.transaction(STORE_NAME, "readwrite");
|
||||
const store = tx.objectStore(STORE_NAME);
|
||||
const getReq = store.get(opId);
|
||||
|
||||
getReq.onerror = () => reject(getReq.error ?? new Error("IndexedDB get failed"));
|
||||
getReq.onsuccess = () => {
|
||||
const current = normalizeOp(getReq.result);
|
||||
if (!current) return;
|
||||
const next: CanvasSyncOp = {
|
||||
...current,
|
||||
attemptCount: current.attemptCount + 1,
|
||||
nextRetryAt: opts.nextRetryAt,
|
||||
lastError: opts.lastError,
|
||||
};
|
||||
store.put(next);
|
||||
};
|
||||
|
||||
tx.oncomplete = () => resolve();
|
||||
tx.onerror = () => reject(tx.error ?? new Error("IndexedDB transaction failed"));
|
||||
tx.onabort = () => reject(tx.error ?? new Error("IndexedDB transaction aborted"));
|
||||
});
|
||||
}
|
||||
|
||||
export async function dropExpiredCanvasSyncOps(
|
||||
canvasId: string,
|
||||
now: number,
|
||||
): Promise<string[]> {
|
||||
const all = await listCanvasSyncOps(canvasId);
|
||||
const expiredIds = all
|
||||
.filter((entry) => entry.expiresAt <= now)
|
||||
.map((entry) => entry.id);
|
||||
if (expiredIds.length === 0) return [];
|
||||
|
||||
const db = await openDb();
|
||||
if (!db) {
|
||||
const fallback = readFallbackOps().filter((entry) => !expiredIds.includes(entry.id));
|
||||
writeFallbackOps(fallback);
|
||||
return expiredIds;
|
||||
}
|
||||
|
||||
const tx = db.transaction(STORE_NAME, "readwrite");
|
||||
const store = tx.objectStore(STORE_NAME);
|
||||
for (const id of expiredIds) {
|
||||
store.delete(id);
|
||||
}
|
||||
await txDone(tx);
|
||||
return expiredIds;
|
||||
}
|
||||
Reference in New Issue
Block a user