diff --git a/convex/polar.ts b/convex/polar.ts index 946f363..7415848 100644 --- a/convex/polar.ts +++ b/convex/polar.ts @@ -4,6 +4,11 @@ import { internalMutation, type MutationCtx } from "./_generated/server"; type DbCtx = Pick; +type IdempotencyScope = + | "topup_paid" + | "subscription_activated_cycle" + | "subscription_revoked"; + type ActivatedArgs = { userId: string; tier: "starter" | "pro" | "max"; @@ -26,12 +31,21 @@ type TopUpArgs = { }; export async function applySubscriptionActivated(ctx: DbCtx, args: ActivatedArgs) { - const existing = await ctx.db + const existingByPolar = await ctx.db .query("subscriptions") - .withIndex("by_user", (q) => q.eq("userId", args.userId)) - .order("desc") + .withIndex("by_polar", (q) => q.eq("polarSubscriptionId", args.polarSubscriptionId)) .first(); + const existingByUser = existingByPolar + ? null + : await ctx.db + .query("subscriptions") + .withIndex("by_user", (q) => q.eq("userId", args.userId)) + .order("desc") + .first(); + + const existing = existingByPolar ?? existingByUser; + if (existing) { await ctx.db.patch(existing._id, { tier: args.tier, @@ -51,6 +65,19 @@ export async function applySubscriptionActivated(ctx: DbCtx, args: ActivatedArgs }); } + const cycleKey = `polar:subscription_cycle:${args.polarSubscriptionId}:${args.currentPeriodStart}:${args.currentPeriodEnd}`; + const isFirstCycleEvent = await registerWebhookEvent(ctx, { + provider: "polar", + scope: "subscription_activated_cycle", + idempotencyKey: cycleKey, + userId: args.userId, + polarSubscriptionId: args.polarSubscriptionId, + }); + + if (!isFirstCycleEvent) { + return; + } + const balance = await ctx.db .query("creditBalances") .withIndex("by_user", (q) => q.eq("userId", args.userId)) @@ -82,11 +109,22 @@ export async function applySubscriptionActivated(ctx: DbCtx, args: ActivatedArgs } export async function applySubscriptionRevoked(ctx: DbCtx, args: RevokedArgs) { - const sub = await ctx.db - .query("subscriptions") - .withIndex("by_user", (q) => q.eq("userId", args.userId)) - .order("desc") - .first(); + const subByPolar = args.polarSubscriptionId + ? await ctx.db + .query("subscriptions") + .withIndex("by_polar", (q) => q.eq("polarSubscriptionId", args.polarSubscriptionId)) + .first() + : null; + + const subByUser = subByPolar + ? null + : await ctx.db + .query("subscriptions") + .withIndex("by_user", (q) => q.eq("userId", args.userId)) + .order("desc") + .first(); + + const sub = subByPolar ?? subByUser; if (sub) { await ctx.db.patch(sub._id, { @@ -107,6 +145,21 @@ export async function applySubscriptionRevoked(ctx: DbCtx, args: RevokedArgs) { }); } + const revokedKey = args.polarSubscriptionId + ? `polar:subscription_revoked:${args.polarSubscriptionId}` + : `polar:subscription_revoked:user:${args.userId}`; + const isFirstRevokedEvent = await registerWebhookEvent(ctx, { + provider: "polar", + scope: "subscription_revoked", + idempotencyKey: revokedKey, + userId: args.userId, + polarSubscriptionId: args.polarSubscriptionId, + }); + + if (!isFirstRevokedEvent) { + return; + } + await ctx.db.insert("creditTransactions", { userId: args.userId, amount: 0, @@ -119,13 +172,15 @@ export async function applySubscriptionRevoked(ctx: DbCtx, args: RevokedArgs) { } export async function applyTopUpPaid(ctx: DbCtx, args: TopUpArgs) { - const duplicate = await ctx.db - .query("creditTransactions") - .withIndex("by_user", (q) => q.eq("userId", args.userId)) - .filter((q) => q.eq(q.field("description"), `Top-up order ${args.polarOrderId}`)) - .first(); + const isFirstTopUpEvent = await registerWebhookEvent(ctx, { + provider: "polar", + scope: "topup_paid", + idempotencyKey: `polar:order_paid:${args.polarOrderId}`, + userId: args.userId, + polarOrderId: args.polarOrderId, + }); - if (duplicate) { + if (!isFirstTopUpEvent) { return; } @@ -135,7 +190,7 @@ export async function applyTopUpPaid(ctx: DbCtx, args: TopUpArgs) { .unique(); if (!balance) { - return; + throw new Error(`Missing credit balance for user ${args.userId}`); } await ctx.db.patch(balance._id, { @@ -152,6 +207,41 @@ export async function applyTopUpPaid(ctx: DbCtx, args: TopUpArgs) { }); } +async function registerWebhookEvent( + ctx: DbCtx, + args: { + provider: "polar"; + scope: IdempotencyScope; + idempotencyKey: string; + userId: string; + polarOrderId?: string; + polarSubscriptionId?: string; + } +) { + const existing = await ctx.db + .query("webhookIdempotencyEvents") + .withIndex("by_provider_key", (q) => + q.eq("provider", args.provider).eq("idempotencyKey", args.idempotencyKey) + ) + .first(); + + if (existing) { + return false; + } + + await ctx.db.insert("webhookIdempotencyEvents", { + provider: args.provider, + scope: args.scope, + idempotencyKey: args.idempotencyKey, + userId: args.userId, + polarOrderId: args.polarOrderId, + polarSubscriptionId: args.polarSubscriptionId, + createdAt: Date.now(), + }); + + return true; +} + export const handleSubscriptionActivated = internalMutation({ args: { userId: v.string(), diff --git a/convex/schema.ts b/convex/schema.ts index 5c2ceb7..8360350 100644 --- a/convex/schema.ts +++ b/convex/schema.ts @@ -258,6 +258,22 @@ export default defineSchema({ .index("by_polar", ["polarSubscriptionId"]) .index("by_lemon_squeezy", ["lemonSqueezySubscriptionId"]), + webhookIdempotencyEvents: defineTable({ + provider: v.union(v.literal("polar")), + scope: v.union( + v.literal("topup_paid"), + v.literal("subscription_activated_cycle"), + v.literal("subscription_revoked") + ), + idempotencyKey: v.string(), + userId: v.string(), + polarOrderId: v.optional(v.string()), + polarSubscriptionId: v.optional(v.string()), + createdAt: v.number(), + }) + .index("by_provider_key", ["provider", "idempotencyKey"]) + .index("by_user", ["userId"]), + // ========================================================================== // Abuse Prevention // ==========================================================================