Add bank synchronization features with FinTS support and update dependencies
This commit is contained in:
884
convex/bank/orchestrator.ts
Normal file
884
convex/bank/orchestrator.ts
Normal file
@@ -0,0 +1,884 @@
|
||||
"use node";
|
||||
|
||||
import { internalAction } from "../_generated/server";
|
||||
import { v } from "convex/values";
|
||||
import { internal } from "../_generated/api";
|
||||
import type { Id } from "../_generated/dataModel";
|
||||
import type { ActionCtx } from "../_generated/server";
|
||||
import {
|
||||
FinTSClient,
|
||||
Mt940Parser,
|
||||
type BankAccount,
|
||||
type ClientResponse,
|
||||
type StatementResponse,
|
||||
} from "lib-fints";
|
||||
import {
|
||||
createFinTsClient,
|
||||
continueWithTan,
|
||||
encodePhotoTan,
|
||||
pickDecoupledTanMethod,
|
||||
resolveFintsEnv,
|
||||
buildSessionSnapshot,
|
||||
sleep,
|
||||
type FintsEnvConfig,
|
||||
type SerializedFintsSession,
|
||||
} from "./fintsSession";
|
||||
import { mapFinTsTransaction } from "./fintsMap";
|
||||
import {
|
||||
fetchComdirectData,
|
||||
hasComdirectCredentials,
|
||||
isRestFallbackError,
|
||||
} from "./comdirectProvider";
|
||||
import type { ImportRow, NormalizedAccount, NormalizedTransaction } from "./types";
|
||||
|
||||
export const TAN_POLL_INTERVAL_MS = 4000;
|
||||
export const TAN_TIMEOUT_MS = 5 * 60 * 1000;
|
||||
export const TAN_MAX_POLL_ATTEMPTS = Math.ceil(TAN_TIMEOUT_MS / TAN_POLL_INTERVAL_MS);
|
||||
|
||||
// comdirect (and other banks) split large MT940 statement results across
|
||||
// multiple responses using continuation marks (return code 3040). lib-fints
|
||||
// sends every continuation request correctly but mis-merges the binary MT940
|
||||
// pages (it string-concats the `@len@` framed payloads, so only the first page
|
||||
// survives the decode). We capture the raw MT940 booked payload of every HIKAZ
|
||||
// response here and reassemble all pages ourselves in fetchStatementsAllPages.
|
||||
let mt940Pages: string[] = [];
|
||||
|
||||
// Extracts the booked MT940 content from every HIKAZ segment in a raw FinTS
|
||||
// response. HIKAZ encodes its booked transactions as a Binary element
|
||||
// `@<len>@<mt940text>` directly after the segment header `HIKAZ:n:7+`.
|
||||
function extractHikazMt940Pages(text: string): string[] {
|
||||
const pages: string[] = [];
|
||||
let idx = 0;
|
||||
for (;;) {
|
||||
const h = text.indexOf("HIKAZ:", idx);
|
||||
if (h === -1) break;
|
||||
const at1 = text.indexOf("@", h);
|
||||
if (at1 === -1) break;
|
||||
const at2 = text.indexOf("@", at1 + 1);
|
||||
if (at2 === -1) break;
|
||||
const len = Number.parseInt(text.slice(at1 + 1, at2), 10);
|
||||
if (!Number.isFinite(len) || len <= 0) {
|
||||
idx = at2 + 1;
|
||||
continue;
|
||||
}
|
||||
const content = text.slice(at2 + 1, at2 + 1 + len);
|
||||
pages.push(content);
|
||||
idx = at2 + 1 + len;
|
||||
}
|
||||
return pages;
|
||||
}
|
||||
|
||||
const origFetch: typeof fetch = globalThis.fetch.bind(globalThis);
|
||||
globalThis.fetch = (async (...args: Parameters<typeof fetch>) => {
|
||||
const res = await origFetch(...args);
|
||||
try {
|
||||
const text = Buffer.from(await res.clone().text(), "base64").toString(
|
||||
"latin1",
|
||||
);
|
||||
if (text.includes("HIKAZ:")) {
|
||||
for (const page of extractHikazMt940Pages(text)) {
|
||||
mt940Pages.push(page);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
/* ignore non-FinTS responses */
|
||||
}
|
||||
return res;
|
||||
}) as typeof fetch;
|
||||
|
||||
type PendingSyncJob = {
|
||||
from: string;
|
||||
to: string;
|
||||
accountId?: Id<"accounts">;
|
||||
provider: "fints" | "comdirect";
|
||||
phase: "fetch" | "persist";
|
||||
partialAccounts?: NormalizedAccount[];
|
||||
partialTransactions?: Record<string, NormalizedTransaction[]>;
|
||||
};
|
||||
|
||||
async function waitForDecoupledTan(
|
||||
ctx: ActionCtx,
|
||||
userId: Id<"users">,
|
||||
client: FinTSClient,
|
||||
session: SerializedFintsSession,
|
||||
initialResponse: ClientResponse,
|
||||
syncJob: PendingSyncJob,
|
||||
): Promise<ClientResponse> {
|
||||
let response = initialResponse;
|
||||
let sessionState = session;
|
||||
const isDecoupled = client.config.selectedTanMethod?.isDecoupled ?? false;
|
||||
|
||||
if (!response.requiresTan || !response.tanReference) {
|
||||
return response;
|
||||
}
|
||||
|
||||
const photo = encodePhotoTan(response);
|
||||
await ctx.runMutation(internal.bank.internal.upsertPendingTan, {
|
||||
userId,
|
||||
status: "awaiting",
|
||||
challengeRef: response.tanReference,
|
||||
challengeMessage: response.tanChallenge ?? "Bitte TAN in der Banking-App freigeben",
|
||||
photoTanMimeType: photo.mimeType ?? null,
|
||||
photoTanBase64: photo.base64 ?? null,
|
||||
pollAttempt: 0,
|
||||
syncJobJson: JSON.stringify({ syncJob }),
|
||||
isDecoupled,
|
||||
submittedTan: null,
|
||||
});
|
||||
|
||||
await ctx.scheduler.runAfter(TAN_POLL_INTERVAL_MS, internal.bank.orchestrator.pollTan, {
|
||||
userId,
|
||||
attempt: 1,
|
||||
});
|
||||
|
||||
for (let attempt = 1; attempt <= TAN_MAX_POLL_ATTEMPTS; attempt += 1) {
|
||||
await sleep(TAN_POLL_INTERVAL_MS);
|
||||
|
||||
const pending = await ctx.runQuery(internal.bank.internal.getPendingTan, { userId });
|
||||
const submittedTan = pending?.submittedTan?.trim() || undefined;
|
||||
|
||||
if (!isDecoupled && !submittedTan) {
|
||||
continue;
|
||||
}
|
||||
|
||||
response = await continueWithTan(
|
||||
client,
|
||||
{
|
||||
...sessionState,
|
||||
tanReference: response.tanReference ?? sessionState.tanReference,
|
||||
},
|
||||
submittedTan,
|
||||
);
|
||||
|
||||
if (submittedTan) {
|
||||
await ctx.runMutation(internal.bank.internal.upsertPendingTan, {
|
||||
userId,
|
||||
status: "awaiting",
|
||||
submittedTan: null,
|
||||
pollAttempt: attempt,
|
||||
});
|
||||
}
|
||||
|
||||
if (!response.requiresTan) {
|
||||
await ctx.runMutation(internal.bank.internal.upsertPendingTan, {
|
||||
userId,
|
||||
status: "done",
|
||||
challengeRef: null,
|
||||
challengeMessage: null,
|
||||
photoTanMimeType: null,
|
||||
photoTanBase64: null,
|
||||
syncJobJson: null,
|
||||
submittedTan: null,
|
||||
pollAttempt: attempt,
|
||||
});
|
||||
return response;
|
||||
}
|
||||
|
||||
sessionState = buildSessionSnapshot(
|
||||
client,
|
||||
response.tanReference ?? sessionState.tanReference,
|
||||
sessionState.tanContinuation,
|
||||
);
|
||||
|
||||
const nextPhoto = encodePhotoTan(response);
|
||||
await ctx.runMutation(internal.bank.internal.upsertPendingTan, {
|
||||
userId,
|
||||
status: "awaiting",
|
||||
challengeRef: response.tanReference ?? sessionState.tanReference,
|
||||
challengeMessage: response.tanChallenge ?? "Bitte TAN in der Banking-App freigeben",
|
||||
photoTanMimeType: nextPhoto.mimeType ?? null,
|
||||
photoTanBase64: nextPhoto.base64 ?? null,
|
||||
pollAttempt: attempt,
|
||||
syncJobJson: JSON.stringify({ syncJob }),
|
||||
isDecoupled,
|
||||
});
|
||||
}
|
||||
|
||||
await ctx.runMutation(internal.bank.internal.upsertPendingTan, {
|
||||
userId,
|
||||
status: "error",
|
||||
errorMessage: "TAN-Freigabe Timeout (5 Minuten)",
|
||||
});
|
||||
throw new Error("TAN-Freigabe Timeout (5 Minuten)");
|
||||
}
|
||||
|
||||
async function resolveTanResponse(
|
||||
ctx: ActionCtx,
|
||||
userId: Id<"users">,
|
||||
client: FinTSClient,
|
||||
response: ClientResponse,
|
||||
continuation: SerializedFintsSession["tanContinuation"],
|
||||
syncJob: PendingSyncJob,
|
||||
): Promise<ClientResponse> {
|
||||
if (!response.requiresTan || !response.tanReference) {
|
||||
return response;
|
||||
}
|
||||
const session = buildSessionSnapshot(client, response.tanReference, continuation);
|
||||
return await waitForDecoupledTan(ctx, userId, client, session, response, syncJob);
|
||||
}
|
||||
|
||||
async function logProvider(
|
||||
ctx: ActionCtx,
|
||||
userId: Id<"users">,
|
||||
provider: "comdirect" | "fints",
|
||||
reason: string,
|
||||
) {
|
||||
console.info("[bank-sync]", { userId, provider, reason });
|
||||
await ctx.runMutation(internal.bank.internal.updateSyncState, {
|
||||
userId,
|
||||
lastProviderUsed: provider,
|
||||
});
|
||||
}
|
||||
|
||||
async function ensureFinTsReady(
|
||||
client: FinTSClient,
|
||||
env: FintsEnvConfig,
|
||||
ctx: ActionCtx,
|
||||
userId: Id<"users">,
|
||||
syncJob: PendingSyncJob,
|
||||
): Promise<FinTSClient> {
|
||||
let syncResponse = await client.synchronize();
|
||||
syncResponse = await resolveTanResponse(ctx, userId, client, syncResponse, "sync", syncJob);
|
||||
if (!syncResponse.success) {
|
||||
throw new Error(
|
||||
syncResponse.bankAnswers.map((a) => a.text).join("; ") || "FinTS-Synchronisation fehlgeschlagen",
|
||||
);
|
||||
}
|
||||
|
||||
const tanMethodId = pickDecoupledTanMethod(client);
|
||||
if (tanMethodId && !client.config.selectedTanMethod) {
|
||||
const method = client.selectTanMethod(tanMethodId);
|
||||
if (method.tanMediaRequirement === 2 && method.activeTanMedia[0]) {
|
||||
client.selectTanMedia(method.activeTanMedia[0]);
|
||||
}
|
||||
}
|
||||
|
||||
if (!client.config.bankingInformation.upd?.bankAccounts?.length) {
|
||||
syncResponse = await client.synchronize();
|
||||
syncResponse = await resolveTanResponse(ctx, userId, client, syncResponse, "sync", syncJob);
|
||||
if (!syncResponse.success) {
|
||||
throw new Error(
|
||||
syncResponse.bankAnswers.map((a) => a.text).join("; ") ||
|
||||
"FinTS-Synchronisation fehlgeschlagen",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await ctx.runMutation(internal.bank.internal.upsertBankConfig, {
|
||||
userId,
|
||||
fints: {
|
||||
blz: env.blz,
|
||||
url: env.url,
|
||||
login: env.login,
|
||||
productId: env.productId,
|
||||
productVersion: env.productVersion,
|
||||
tanMethodId: client.config.selectedTanMethod?.id,
|
||||
tanMediaName: methodMediaName(client),
|
||||
bankingInformationJson: JSON.stringify(client.config.bankingInformation),
|
||||
},
|
||||
});
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
function methodMediaName(client: FinTSClient): string | undefined {
|
||||
const method = client.config.selectedTanMethod;
|
||||
if (!method?.activeTanMedia?.length) return undefined;
|
||||
return method.activeTanMedia[0];
|
||||
}
|
||||
|
||||
function mapBankAccount(account: BankAccount): NormalizedAccount {
|
||||
return {
|
||||
externalId: account.accountNumber,
|
||||
name: account.product ?? account.holder1 ?? "Bankkonto",
|
||||
iban: account.iban,
|
||||
balance: 0,
|
||||
currency: account.currency || "EUR",
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches account statements and reassembles multi-part (3040) MT940 results.
|
||||
*
|
||||
* comdirect ignores the `to` date and returns the full range `[from, now]`,
|
||||
* paginated across multiple responses via continuation marks (return code 3040
|
||||
* "Weitere Informationen liegen vor"). lib-fints sends every continuation
|
||||
* request correctly, but mis-merges the binary MT940 pages (it string-concats
|
||||
* the `@len@` framed payloads, so only the first page survives the decode).
|
||||
*
|
||||
* We capture the raw MT940 booked content of every HIKAZ response via the fetch
|
||||
* interceptor (`mt940Pages`) and parse the concatenated pages ourselves with
|
||||
* lib-fints' own Mt940Parser, which yields the complete, non-overlapping set.
|
||||
*/
|
||||
async function fetchStatementsAllPages(
|
||||
ctx: ActionCtx,
|
||||
userId: Id<"users">,
|
||||
client: FinTSClient,
|
||||
account: NormalizedAccount,
|
||||
fromDate: Date,
|
||||
toDate: Date,
|
||||
preferCamt: boolean,
|
||||
syncJob: PendingSyncJob,
|
||||
): Promise<StatementResponse | null> {
|
||||
// Reset the per-fetch MT940 page collector; the fetch interceptor fills it
|
||||
// while getAccountStatements walks through all 3040 continuation responses.
|
||||
mt940Pages = [];
|
||||
let resp = await client.getAccountStatements(
|
||||
account.externalId,
|
||||
fromDate,
|
||||
toDate,
|
||||
preferCamt,
|
||||
);
|
||||
resp = (await resolveTanResponse(
|
||||
ctx,
|
||||
userId,
|
||||
client,
|
||||
resp,
|
||||
"statements",
|
||||
syncJob,
|
||||
)) as StatementResponse;
|
||||
|
||||
if (!resp.success) {
|
||||
return resp;
|
||||
}
|
||||
|
||||
// Reassemble all captured MT940 pages ourselves to bypass lib-fints' broken
|
||||
// multi-part merge. Only override when we actually captured MT940 content
|
||||
// (i.e. MT940/HKKAZ path; CAMT accounts produce no HIKAZ pages) and our
|
||||
// reassembly is at least as complete as lib-fints' own result.
|
||||
if (mt940Pages.length > 0) {
|
||||
const libFintsCount = countStatementTransactions(resp);
|
||||
try {
|
||||
const statements = new Mt940Parser(mt940Pages.join("\r\n")).parse();
|
||||
const reassembled = statements.reduce(
|
||||
(sum, s) => sum + (s.transactions?.length ?? 0),
|
||||
0,
|
||||
);
|
||||
if (reassembled >= libFintsCount) {
|
||||
resp = { ...resp, statements };
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn("[fints] Eigenes MT940-Reassembly fehlgeschlagen", {
|
||||
account: account.externalId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return resp;
|
||||
}
|
||||
|
||||
async function fetchFinTsAccountData(
|
||||
client: FinTSClient,
|
||||
account: NormalizedAccount,
|
||||
from: string,
|
||||
to: string,
|
||||
ctx: ActionCtx,
|
||||
userId: Id<"users">,
|
||||
syncJob: PendingSyncJob,
|
||||
ownNames: string[],
|
||||
salaryShift: {
|
||||
enabled: boolean;
|
||||
categoryNames: string[];
|
||||
dayThreshold: number;
|
||||
},
|
||||
): Promise<{ balance: number; transactions: NormalizedTransaction[] }> {
|
||||
const fromDate = new Date(from);
|
||||
const toDate = new Date(to);
|
||||
const canFetchBalance = client.canGetAccountBalance(account.externalId);
|
||||
const canFetchStatements = client.canGetAccountStatements(account.externalId);
|
||||
const canFetchCamtStatements = client.config.isAccountTransactionSupported(
|
||||
account.externalId,
|
||||
"HKCAZ",
|
||||
);
|
||||
const canFetchMt940Statements = client.config.isAccountTransactionSupported(
|
||||
account.externalId,
|
||||
"HKKAZ",
|
||||
);
|
||||
|
||||
console.info("[fints] Konto-Capabilities", {
|
||||
account: account.externalId,
|
||||
iban: account.iban,
|
||||
name: account.name,
|
||||
canFetchBalance,
|
||||
canFetchStatements,
|
||||
canFetchCamtStatements,
|
||||
canFetchMt940Statements,
|
||||
});
|
||||
|
||||
let balance = account.balance;
|
||||
if (canFetchBalance) {
|
||||
let balanceResponse = await client.getAccountBalance(account.externalId);
|
||||
balanceResponse = await resolveTanResponse(
|
||||
ctx,
|
||||
userId,
|
||||
client,
|
||||
balanceResponse,
|
||||
"balance",
|
||||
syncJob,
|
||||
);
|
||||
balance = balanceResponse.balance?.balance ?? account.balance;
|
||||
} else {
|
||||
console.warn("[fints] Konto unterstützt keinen HKSAL-Kontostandabruf", {
|
||||
account: account.externalId,
|
||||
});
|
||||
}
|
||||
|
||||
if (!canFetchStatements) {
|
||||
console.warn("[fints] Konto unterstützt keinen Umsatzabruf", {
|
||||
account: account.externalId,
|
||||
});
|
||||
return { balance, transactions: [] };
|
||||
}
|
||||
|
||||
// Prefer MT940 (HKKAZ) when available: comdirect's CAMT (HKCAZ) responds with
|
||||
// "Kontonummer ist ungültig" and MT940 is the reliable path. Large windows are
|
||||
// split adaptively to avoid lib-fints' broken multi-part (3040) reassembly.
|
||||
const preferCamtForFetch = !canFetchMt940Statements;
|
||||
let statementResponse = await fetchStatementsAllPages(
|
||||
ctx,
|
||||
userId,
|
||||
client,
|
||||
account,
|
||||
fromDate,
|
||||
toDate,
|
||||
preferCamtForFetch,
|
||||
syncJob,
|
||||
);
|
||||
|
||||
if (
|
||||
canFetchCamtStatements &&
|
||||
canFetchMt940Statements &&
|
||||
(!statementResponse ||
|
||||
(statementResponse.success &&
|
||||
countStatementTransactions(statementResponse) === 0))
|
||||
) {
|
||||
console.info("[fints] Bevorzugtes Format ohne Umsätze, versuche Alternative", {
|
||||
account: account.externalId,
|
||||
preferCamtForFetch,
|
||||
});
|
||||
const altResponse = await fetchStatementsAllPages(
|
||||
ctx,
|
||||
userId,
|
||||
client,
|
||||
account,
|
||||
fromDate,
|
||||
toDate,
|
||||
!preferCamtForFetch,
|
||||
syncJob,
|
||||
);
|
||||
if (
|
||||
altResponse &&
|
||||
altResponse.success &&
|
||||
countStatementTransactions(altResponse) > 0
|
||||
) {
|
||||
statementResponse = altResponse;
|
||||
}
|
||||
}
|
||||
|
||||
if (!statementResponse || !statementResponse.success) {
|
||||
console.warn("[fints] Umsatzabruf nicht erfolgreich", {
|
||||
account: account.externalId,
|
||||
bankAnswers: statementResponse?.bankAnswers.map((answer) => answer.text) ?? [],
|
||||
});
|
||||
return { balance, transactions: [] };
|
||||
}
|
||||
|
||||
let transactions = mapStatements(statementResponse, ownNames, salaryShift);
|
||||
transactions = transactions.filter(
|
||||
(tx) =>
|
||||
(!tx.bookingDate || tx.bookingDate >= from) &&
|
||||
(!tx.bookingDate || tx.bookingDate <= to),
|
||||
);
|
||||
const statementCount = statementResponse.statements?.length ?? 0;
|
||||
const rawTransactionCount = countStatementTransactions(statementResponse);
|
||||
console.info("[fints] Umsatzabruf Ergebnis", {
|
||||
account: account.externalId,
|
||||
statementCount,
|
||||
rawTransactionCount,
|
||||
mappedTransactionCount: transactions.length,
|
||||
});
|
||||
|
||||
return { balance, transactions };
|
||||
}
|
||||
|
||||
function countStatementTransactions(response: StatementResponse): number {
|
||||
return (
|
||||
response.statements?.reduce(
|
||||
(sum, statement) => sum + (statement.transactions?.length ?? 0),
|
||||
0,
|
||||
) ?? 0
|
||||
);
|
||||
}
|
||||
|
||||
function mapStatements(
|
||||
response: StatementResponse,
|
||||
ownNames: string[],
|
||||
salaryShift: {
|
||||
enabled: boolean;
|
||||
categoryNames: string[];
|
||||
dayThreshold: number;
|
||||
},
|
||||
): NormalizedTransaction[] {
|
||||
const rows: NormalizedTransaction[] = [];
|
||||
for (const statement of response.statements ?? []) {
|
||||
for (const tx of statement.transactions ?? []) {
|
||||
const mapped = mapFinTsTransaction(
|
||||
{
|
||||
valueDate: tx.valueDate,
|
||||
entryDate: tx.entryDate,
|
||||
amount: tx.amount,
|
||||
transactionType: tx.transactionType,
|
||||
bankReference: tx.bankReference,
|
||||
bookingText: tx.bookingText,
|
||||
purpose: tx.purpose,
|
||||
remoteName: tx.remoteName,
|
||||
customerReference: tx.customerReference,
|
||||
},
|
||||
ownNames,
|
||||
salaryShift,
|
||||
);
|
||||
rows.push({
|
||||
bookingDate: mapped.bookingDate,
|
||||
valueDate: mapped.valueDate,
|
||||
description: mapped.description,
|
||||
counterparty: mapped.counterparty,
|
||||
amount: mapped.amount,
|
||||
vorgang: mapped.vorgang,
|
||||
isPending: mapped.isPending,
|
||||
rawText: mapped.rawText,
|
||||
externalRef: mapped.externalRef,
|
||||
categoryName: mapped.categoryName,
|
||||
assignedMonth: mapped.assignedMonth,
|
||||
effectiveMonth: mapped.effectiveMonth,
|
||||
});
|
||||
}
|
||||
}
|
||||
return rows;
|
||||
}
|
||||
|
||||
async function fetchFinTsData(
|
||||
ctx: ActionCtx,
|
||||
userId: Id<"users">,
|
||||
from: string,
|
||||
to: string,
|
||||
filterAccountId: Id<"accounts"> | undefined,
|
||||
pin: string | undefined,
|
||||
ownNames: string[],
|
||||
salaryShift: {
|
||||
enabled: boolean;
|
||||
categoryNames: string[];
|
||||
dayThreshold: number;
|
||||
},
|
||||
): Promise<{
|
||||
accounts: NormalizedAccount[];
|
||||
transactionsByAccount: Map<string, NormalizedTransaction[]>;
|
||||
}> {
|
||||
const bankConfig = await ctx.runQuery(internal.bank.internal.getBankConfig, { userId });
|
||||
const env = resolveFintsEnv({
|
||||
blz: bankConfig?.fints.blz,
|
||||
url: bankConfig?.fints.url,
|
||||
login: bankConfig?.fints.login,
|
||||
productId: bankConfig?.fints.productId,
|
||||
productVersion: bankConfig?.fints.productVersion,
|
||||
tanMethodId: bankConfig?.fints.tanMethodId,
|
||||
tanMediaName: bankConfig?.fints.tanMediaName,
|
||||
bankingInformationJson: bankConfig?.fints.bankingInformationJson,
|
||||
pin,
|
||||
});
|
||||
|
||||
const syncJob: PendingSyncJob = { from, to, accountId: filterAccountId, provider: "fints", phase: "fetch" };
|
||||
let client = createFinTsClient(env);
|
||||
client = await ensureFinTsReady(client, env, ctx, userId, syncJob);
|
||||
|
||||
const bankAccounts = client.config.bankingInformation.upd?.bankAccounts ?? [];
|
||||
const accounts = bankAccounts.map(mapBankAccount);
|
||||
const transactionsByAccount = new Map<string, NormalizedTransaction[]>();
|
||||
|
||||
for (const account of accounts) {
|
||||
if (filterAccountId) {
|
||||
const convexId = await ctx.runMutation(internal.bank.internal.upsertAccountFromProvider, {
|
||||
userId,
|
||||
externalId: account.externalId,
|
||||
name: account.name,
|
||||
iban: account.iban,
|
||||
balance: account.balance,
|
||||
currency: account.currency,
|
||||
});
|
||||
if (convexId !== filterAccountId) continue;
|
||||
}
|
||||
|
||||
const { balance, transactions } = await fetchFinTsAccountData(
|
||||
client,
|
||||
account,
|
||||
from,
|
||||
to,
|
||||
ctx,
|
||||
userId,
|
||||
syncJob,
|
||||
ownNames,
|
||||
salaryShift,
|
||||
);
|
||||
account.balance = balance;
|
||||
transactionsByAccount.set(account.externalId, transactions);
|
||||
}
|
||||
|
||||
await ctx.runMutation(internal.bank.internal.upsertBankConfig, {
|
||||
userId,
|
||||
fints: {
|
||||
blz: env.blz,
|
||||
url: env.url,
|
||||
login: env.login,
|
||||
productId: env.productId,
|
||||
bankingInformationJson: JSON.stringify(client.config.bankingInformation),
|
||||
tanMethodId: client.config.selectedTanMethod?.id,
|
||||
tanMediaName: methodMediaName(client),
|
||||
},
|
||||
});
|
||||
|
||||
return { accounts, transactionsByAccount };
|
||||
}
|
||||
|
||||
async function persistSyncResults(
|
||||
ctx: ActionCtx,
|
||||
userId: Id<"users">,
|
||||
provider: "comdirect" | "fints",
|
||||
from: string,
|
||||
to: string,
|
||||
filterAccountId: Id<"accounts"> | undefined,
|
||||
accounts: NormalizedAccount[],
|
||||
transactionsByAccount: Map<string, NormalizedTransaction[]>,
|
||||
): Promise<{ importedCount: number; skippedCount: number }> {
|
||||
const accountIdMap = new Map<string, Id<"accounts">>();
|
||||
for (const account of accounts) {
|
||||
const convexAccountId = await ctx.runMutation(internal.bank.internal.upsertAccountFromProvider, {
|
||||
userId,
|
||||
externalId: account.externalId,
|
||||
name: account.name,
|
||||
iban: account.iban,
|
||||
balance: account.balance,
|
||||
currency: account.currency,
|
||||
});
|
||||
accountIdMap.set(account.externalId, convexAccountId);
|
||||
}
|
||||
|
||||
const rows: ImportRow[] = [];
|
||||
const rowCountsByAccount: Record<string, number> = {};
|
||||
const targetAccounts = filterAccountId
|
||||
? [...accountIdMap.entries()].filter(([, id]) => id === filterAccountId)
|
||||
: [...accountIdMap.entries()];
|
||||
|
||||
for (const [externalAccountId, convexAccountId] of targetAccounts) {
|
||||
const txs = transactionsByAccount.get(externalAccountId) ?? [];
|
||||
rowCountsByAccount[externalAccountId] = txs.length;
|
||||
for (const tx of txs) {
|
||||
rows.push({
|
||||
accountId: convexAccountId,
|
||||
categoryName: tx.categoryName ?? "Sonstiges",
|
||||
bookingDate: tx.bookingDate,
|
||||
valueDate: tx.valueDate,
|
||||
description: tx.description,
|
||||
counterparty: tx.counterparty,
|
||||
amount: tx.amount,
|
||||
vorgang: tx.vorgang,
|
||||
isPending: tx.isPending,
|
||||
rawText: tx.rawText,
|
||||
assignedMonth: tx.assignedMonth,
|
||||
effectiveMonth: tx.effectiveMonth,
|
||||
externalRef:
|
||||
provider === "fints" && tx.externalRef
|
||||
? `${externalAccountId}:${tx.externalRef}`
|
||||
: tx.externalRef,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
console.info("[bank-sync] Persistiere Umsätze", {
|
||||
provider,
|
||||
accountCount: accounts.length,
|
||||
targetAccountCount: targetAccounts.length,
|
||||
rowCount: rows.length,
|
||||
rowCountsByAccount,
|
||||
});
|
||||
|
||||
const commitResult = await ctx.runMutation(internal.imports.commitRowsInternal, {
|
||||
userId,
|
||||
filename: `${provider}-sync-${from}-${to}`,
|
||||
source: provider === "comdirect" ? "comdirect-api" : "fints",
|
||||
accountId: filterAccountId,
|
||||
rows,
|
||||
});
|
||||
console.info("[bank-sync] Import-Ergebnis", {
|
||||
provider,
|
||||
importedCount: commitResult.importedCount,
|
||||
skippedCount: commitResult.skippedCount,
|
||||
});
|
||||
|
||||
if (provider === "comdirect") {
|
||||
await ctx.runMutation(internal.comdirect.internal.clearSession, { userId });
|
||||
}
|
||||
|
||||
await ctx.runMutation(internal.bank.internal.updateSyncState, {
|
||||
userId,
|
||||
lastSync: Date.now(),
|
||||
lastProviderUsed: provider,
|
||||
lastError: null,
|
||||
});
|
||||
|
||||
await ctx.runMutation(internal.bank.internal.upsertPendingTan, {
|
||||
userId,
|
||||
status: "done",
|
||||
challengeRef: null,
|
||||
challengeMessage: null,
|
||||
photoTanMimeType: null,
|
||||
photoTanBase64: null,
|
||||
syncJobJson: null,
|
||||
});
|
||||
|
||||
return {
|
||||
importedCount: commitResult.importedCount,
|
||||
skippedCount: commitResult.skippedCount,
|
||||
};
|
||||
}
|
||||
|
||||
export const runSyncInternal = internalAction({
|
||||
args: {
|
||||
userId: v.id("users"),
|
||||
from: v.string(),
|
||||
to: v.string(),
|
||||
accountId: v.optional(v.id("accounts")),
|
||||
pin: v.optional(v.string()),
|
||||
},
|
||||
returns: v.object({
|
||||
importedCount: v.number(),
|
||||
skippedCount: v.number(),
|
||||
provider: v.union(v.literal("comdirect"), v.literal("fints")),
|
||||
awaitingTan: v.boolean(),
|
||||
}),
|
||||
handler: async (ctx, args) => {
|
||||
const comdirectReady = hasComdirectCredentials();
|
||||
await ctx.runMutation(internal.bank.internal.upsertBankConfig, {
|
||||
userId: args.userId,
|
||||
comdirectHasCredentials: comdirectReady,
|
||||
});
|
||||
|
||||
const bankConfig = await ctx.runQuery(internal.bank.internal.getBankConfig, { userId: args.userId });
|
||||
const preference = bankConfig?.providerPreference ?? "auto";
|
||||
const settings = await ctx.runQuery(internal.settings.getInternal, { userId: args.userId });
|
||||
const ownNames = settings?.ownNames ?? [];
|
||||
const salaryShift = settings?.salaryShift ?? {
|
||||
enabled: true,
|
||||
categoryNames: ["Gehalt & Besoldung"],
|
||||
dayThreshold: 25,
|
||||
};
|
||||
|
||||
const tryFinTs = async (reason: string) => {
|
||||
await logProvider(ctx, args.userId, "fints", reason);
|
||||
try {
|
||||
const { accounts, transactionsByAccount } = await fetchFinTsData(
|
||||
ctx,
|
||||
args.userId,
|
||||
args.from,
|
||||
args.to,
|
||||
args.accountId,
|
||||
args.pin,
|
||||
ownNames,
|
||||
salaryShift,
|
||||
);
|
||||
const result = await persistSyncResults(
|
||||
ctx,
|
||||
args.userId,
|
||||
"fints",
|
||||
args.from,
|
||||
args.to,
|
||||
args.accountId,
|
||||
accounts,
|
||||
transactionsByAccount,
|
||||
);
|
||||
return { ...result, provider: "fints" as const, awaitingTan: false };
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.message.includes("TAN-Freigabe Timeout")) {
|
||||
await ctx.runMutation(internal.bank.internal.updateSyncState, {
|
||||
userId: args.userId,
|
||||
lastError: error.message,
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
|
||||
const useFinTsDirect =
|
||||
preference === "fints" || (preference === "auto" && !comdirectReady);
|
||||
|
||||
if (useFinTsDirect) {
|
||||
return await tryFinTs(
|
||||
!comdirectReady
|
||||
? "comdirect-Credentials fehlen"
|
||||
: "Provider-Präferenz FinTS",
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
await logProvider(ctx, args.userId, "comdirect", "REST-Versuch");
|
||||
const { accounts, transactionsByAccount } = await fetchComdirectData(
|
||||
ctx,
|
||||
args.userId,
|
||||
args.from,
|
||||
args.to,
|
||||
args.accountId,
|
||||
ownNames,
|
||||
salaryShift,
|
||||
);
|
||||
const result = await persistSyncResults(
|
||||
ctx,
|
||||
args.userId,
|
||||
"comdirect",
|
||||
args.from,
|
||||
args.to,
|
||||
args.accountId,
|
||||
accounts,
|
||||
transactionsByAccount,
|
||||
);
|
||||
return { ...result, provider: "comdirect" as const, awaitingTan: false };
|
||||
} catch (error) {
|
||||
if (!isRestFallbackError(error)) throw error;
|
||||
const reason = error instanceof Error ? error.message : "REST-Fehler";
|
||||
console.warn("[bank-sync] REST fehlgeschlagen, Fallback FinTS:", reason);
|
||||
return await tryFinTs(`REST-Fallback: ${reason}`);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
export const pollTan = internalAction({
|
||||
args: {
|
||||
userId: v.id("users"),
|
||||
attempt: v.optional(v.number()),
|
||||
},
|
||||
returns: v.null(),
|
||||
handler: async (ctx, args) => {
|
||||
const pending = await ctx.runQuery(internal.bank.internal.getPendingTan, {
|
||||
userId: args.userId,
|
||||
});
|
||||
if (!pending || pending.status !== "awaiting") {
|
||||
return null;
|
||||
}
|
||||
|
||||
const attempt = args.attempt ?? (pending.pollAttempt ?? 0) + 1;
|
||||
if (attempt > TAN_MAX_POLL_ATTEMPTS) {
|
||||
await ctx.runMutation(internal.bank.internal.upsertPendingTan, {
|
||||
userId: args.userId,
|
||||
status: "error",
|
||||
errorMessage: "TAN-Freigabe Timeout (5 Minuten)",
|
||||
pollAttempt: attempt,
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
||||
await ctx.scheduler.runAfter(TAN_POLL_INTERVAL_MS, internal.bank.orchestrator.pollTan, {
|
||||
userId: args.userId,
|
||||
attempt: attempt + 1,
|
||||
});
|
||||
return null;
|
||||
},
|
||||
});
|
||||
Reference in New Issue
Block a user