fix(polar): make webhook credit flows idempotent
This commit is contained in:
120
convex/polar.ts
120
convex/polar.ts
@@ -4,6 +4,11 @@ import { internalMutation, type MutationCtx } from "./_generated/server";
|
||||
|
||||
type DbCtx = Pick<MutationCtx, "db">;
|
||||
|
||||
type IdempotencyScope =
|
||||
| "topup_paid"
|
||||
| "subscription_activated_cycle"
|
||||
| "subscription_revoked";
|
||||
|
||||
type ActivatedArgs = {
|
||||
userId: string;
|
||||
tier: "starter" | "pro" | "max";
|
||||
@@ -26,12 +31,21 @@ type TopUpArgs = {
|
||||
};
|
||||
|
||||
export async function applySubscriptionActivated(ctx: DbCtx, args: ActivatedArgs) {
|
||||
const existing = await ctx.db
|
||||
const existingByPolar = await ctx.db
|
||||
.query("subscriptions")
|
||||
.withIndex("by_user", (q) => q.eq("userId", args.userId))
|
||||
.order("desc")
|
||||
.withIndex("by_polar", (q) => q.eq("polarSubscriptionId", args.polarSubscriptionId))
|
||||
.first();
|
||||
|
||||
const existingByUser = existingByPolar
|
||||
? null
|
||||
: await ctx.db
|
||||
.query("subscriptions")
|
||||
.withIndex("by_user", (q) => q.eq("userId", args.userId))
|
||||
.order("desc")
|
||||
.first();
|
||||
|
||||
const existing = existingByPolar ?? existingByUser;
|
||||
|
||||
if (existing) {
|
||||
await ctx.db.patch(existing._id, {
|
||||
tier: args.tier,
|
||||
@@ -51,6 +65,19 @@ export async function applySubscriptionActivated(ctx: DbCtx, args: ActivatedArgs
|
||||
});
|
||||
}
|
||||
|
||||
const cycleKey = `polar:subscription_cycle:${args.polarSubscriptionId}:${args.currentPeriodStart}:${args.currentPeriodEnd}`;
|
||||
const isFirstCycleEvent = await registerWebhookEvent(ctx, {
|
||||
provider: "polar",
|
||||
scope: "subscription_activated_cycle",
|
||||
idempotencyKey: cycleKey,
|
||||
userId: args.userId,
|
||||
polarSubscriptionId: args.polarSubscriptionId,
|
||||
});
|
||||
|
||||
if (!isFirstCycleEvent) {
|
||||
return;
|
||||
}
|
||||
|
||||
const balance = await ctx.db
|
||||
.query("creditBalances")
|
||||
.withIndex("by_user", (q) => q.eq("userId", args.userId))
|
||||
@@ -82,11 +109,22 @@ export async function applySubscriptionActivated(ctx: DbCtx, args: ActivatedArgs
|
||||
}
|
||||
|
||||
export async function applySubscriptionRevoked(ctx: DbCtx, args: RevokedArgs) {
|
||||
const sub = await ctx.db
|
||||
.query("subscriptions")
|
||||
.withIndex("by_user", (q) => q.eq("userId", args.userId))
|
||||
.order("desc")
|
||||
.first();
|
||||
const subByPolar = args.polarSubscriptionId
|
||||
? await ctx.db
|
||||
.query("subscriptions")
|
||||
.withIndex("by_polar", (q) => q.eq("polarSubscriptionId", args.polarSubscriptionId))
|
||||
.first()
|
||||
: null;
|
||||
|
||||
const subByUser = subByPolar
|
||||
? null
|
||||
: await ctx.db
|
||||
.query("subscriptions")
|
||||
.withIndex("by_user", (q) => q.eq("userId", args.userId))
|
||||
.order("desc")
|
||||
.first();
|
||||
|
||||
const sub = subByPolar ?? subByUser;
|
||||
|
||||
if (sub) {
|
||||
await ctx.db.patch(sub._id, {
|
||||
@@ -107,6 +145,21 @@ export async function applySubscriptionRevoked(ctx: DbCtx, args: RevokedArgs) {
|
||||
});
|
||||
}
|
||||
|
||||
const revokedKey = args.polarSubscriptionId
|
||||
? `polar:subscription_revoked:${args.polarSubscriptionId}`
|
||||
: `polar:subscription_revoked:user:${args.userId}`;
|
||||
const isFirstRevokedEvent = await registerWebhookEvent(ctx, {
|
||||
provider: "polar",
|
||||
scope: "subscription_revoked",
|
||||
idempotencyKey: revokedKey,
|
||||
userId: args.userId,
|
||||
polarSubscriptionId: args.polarSubscriptionId,
|
||||
});
|
||||
|
||||
if (!isFirstRevokedEvent) {
|
||||
return;
|
||||
}
|
||||
|
||||
await ctx.db.insert("creditTransactions", {
|
||||
userId: args.userId,
|
||||
amount: 0,
|
||||
@@ -119,13 +172,15 @@ export async function applySubscriptionRevoked(ctx: DbCtx, args: RevokedArgs) {
|
||||
}
|
||||
|
||||
export async function applyTopUpPaid(ctx: DbCtx, args: TopUpArgs) {
|
||||
const duplicate = await ctx.db
|
||||
.query("creditTransactions")
|
||||
.withIndex("by_user", (q) => q.eq("userId", args.userId))
|
||||
.filter((q) => q.eq(q.field("description"), `Top-up order ${args.polarOrderId}`))
|
||||
.first();
|
||||
const isFirstTopUpEvent = await registerWebhookEvent(ctx, {
|
||||
provider: "polar",
|
||||
scope: "topup_paid",
|
||||
idempotencyKey: `polar:order_paid:${args.polarOrderId}`,
|
||||
userId: args.userId,
|
||||
polarOrderId: args.polarOrderId,
|
||||
});
|
||||
|
||||
if (duplicate) {
|
||||
if (!isFirstTopUpEvent) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -135,7 +190,7 @@ export async function applyTopUpPaid(ctx: DbCtx, args: TopUpArgs) {
|
||||
.unique();
|
||||
|
||||
if (!balance) {
|
||||
return;
|
||||
throw new Error(`Missing credit balance for user ${args.userId}`);
|
||||
}
|
||||
|
||||
await ctx.db.patch(balance._id, {
|
||||
@@ -152,6 +207,41 @@ export async function applyTopUpPaid(ctx: DbCtx, args: TopUpArgs) {
|
||||
});
|
||||
}
|
||||
|
||||
async function registerWebhookEvent(
|
||||
ctx: DbCtx,
|
||||
args: {
|
||||
provider: "polar";
|
||||
scope: IdempotencyScope;
|
||||
idempotencyKey: string;
|
||||
userId: string;
|
||||
polarOrderId?: string;
|
||||
polarSubscriptionId?: string;
|
||||
}
|
||||
) {
|
||||
const existing = await ctx.db
|
||||
.query("webhookIdempotencyEvents")
|
||||
.withIndex("by_provider_key", (q) =>
|
||||
q.eq("provider", args.provider).eq("idempotencyKey", args.idempotencyKey)
|
||||
)
|
||||
.first();
|
||||
|
||||
if (existing) {
|
||||
return false;
|
||||
}
|
||||
|
||||
await ctx.db.insert("webhookIdempotencyEvents", {
|
||||
provider: args.provider,
|
||||
scope: args.scope,
|
||||
idempotencyKey: args.idempotencyKey,
|
||||
userId: args.userId,
|
||||
polarOrderId: args.polarOrderId,
|
||||
polarSubscriptionId: args.polarSubscriptionId,
|
||||
createdAt: Date.now(),
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
export const handleSubscriptionActivated = internalMutation({
|
||||
args: {
|
||||
userId: v.string(),
|
||||
|
||||
Reference in New Issue
Block a user