From 105b0c514ff2b701fcc5e5ac230ccde0da62d88b Mon Sep 17 00:00:00 2001 From: Max Koon <22125083+k2on@users.noreply.github.com> Date: Mon, 15 Dec 2025 12:53:35 -0500 Subject: [PATCH] feat: add plaid sync --- apps/api/src/plaid/sync.ts | 3 + apps/api/src/plaid/tx.ts | 23 +++ apps/api/src/zero.ts | 235 +++++++++++++++++------- package.json | 4 +- packages/shared/package.json | 2 +- packages/shared/src/db/schema/public.ts | 1 + packages/shared/src/mutators.ts | 6 +- packages/shared/src/zero-schema.gen.ts | 10 + packages/ui/src/transactions.tsx | 4 +- 9 files changed, 212 insertions(+), 76 deletions(-) create mode 100644 apps/api/src/plaid/sync.ts create mode 100644 apps/api/src/plaid/tx.ts diff --git a/apps/api/src/plaid/sync.ts b/apps/api/src/plaid/sync.ts new file mode 100644 index 0000000..e9441a8 --- /dev/null +++ b/apps/api/src/plaid/sync.ts @@ -0,0 +1,3 @@ +async function sync() {} + +sync(); diff --git a/apps/api/src/plaid/tx.ts b/apps/api/src/plaid/tx.ts new file mode 100644 index 0000000..8b7be22 --- /dev/null +++ b/apps/api/src/plaid/tx.ts @@ -0,0 +1,23 @@ +import type { transaction } from "@money/shared/db"; +import type { Transaction } from "plaid"; +import { type InferInsertModel } from "drizzle-orm"; +import { randomUUID } from "crypto"; + +export function transactionFromPlaid( + userId: string, + tx: Transaction, +): InferInsertModel { + return { + id: randomUUID(), + user_id: userId, + plaid_id: tx.transaction_id, + account_id: tx.account_id, + name: tx.name, + amount: tx.amount as any, + datetime: tx.datetime ? new Date(tx.datetime) : new Date(tx.date), + authorized_datetime: tx.authorized_datetime + ? new Date(tx.authorized_datetime) + : undefined, + json: JSON.stringify(tx), + }; +} diff --git a/apps/api/src/zero.ts b/apps/api/src/zero.ts index a59f50e..ce96716 100644 --- a/apps/api/src/zero.ts +++ b/apps/api/src/zero.ts @@ -26,6 +26,8 @@ import { PlaidApi, PlaidEnvironments, Products, + SandboxItemFireWebhookRequestWebhookCodeEnum, + WebhookType, } from "plaid"; import { randomUUID } from "crypto"; import { db } from "./db"; @@ -35,8 +37,16 @@ import { plaidLink, transaction, } from "@money/shared/db"; -import { and, eq, inArray, sql, type InferInsertModel } from "drizzle-orm"; +import { + and, + eq, + inArray, + sql, + type InferInsertModel, + type InferSelectModel, +} from "drizzle-orm"; import { plaidClient } from "./plaid"; +import { transactionFromPlaid } from "./plaid/tx"; const processor = new PushProcessor( new ZQLDatabase( @@ -128,9 +138,9 @@ const createMutators = (authData: AuthData | null) => { throw Error("Plaid error"); } }, - - async updateTransactions() { + async webhook() { isLoggedIn(authData); + const accounts = await db.query.plaidAccessTokens.findMany({ where: eq(plaidAccessTokens.userId, authData.user.id), }); @@ -139,51 +149,20 @@ const createMutators = (authData: AuthData | null) => { return; } - for (const account of accounts) { - const { data } = await plaidClient.transactionsGet({ - access_token: account.token, - start_date: "2025-10-01", - end_date: new Date().toISOString().split("T")[0], - }); + const account = accounts.at(0)!; - const transactions = data.transactions.map( - (tx) => - ({ - id: randomUUID(), - user_id: authData.user.id, - plaid_id: tx.transaction_id, - account_id: tx.account_id, - name: tx.name, - amount: tx.amount as any, - datetime: tx.datetime - ? new Date(tx.datetime) - : new Date(tx.date), - authorized_datetime: tx.authorized_datetime - ? new Date(tx.authorized_datetime) - : undefined, - json: JSON.stringify(tx), - }) satisfies InferInsertModel, - ); + const { data } = await plaidClient.sandboxItemFireWebhook({ + access_token: account.token, + webhook_type: WebhookType.Transactions, + webhook_code: + SandboxItemFireWebhookRequestWebhookCodeEnum.DefaultUpdate, + }); - await db - .insert(transaction) - .values(transactions) - .onConflictDoNothing({ - target: transaction.plaid_id, - }); - - const txReplacingPendingIds = data.transactions - .filter((t) => t.pending_transaction_id) - .map((t) => t.pending_transaction_id!); - - await db - .delete(transaction) - .where(inArray(transaction.plaid_id, txReplacingPendingIds)); - } + console.log(data); }, - - async updateBalences() { + async sync() { isLoggedIn(authData); + const accounts = await db.query.plaidAccessTokens.findMany({ where: eq(plaidAccessTokens.userId, authData.user.id), }); @@ -192,32 +171,150 @@ const createMutators = (authData: AuthData | null) => { return; } - for (const account of accounts) { - const { data } = await plaidClient.accountsBalanceGet({ - access_token: account.token, - }); - await db - .insert(balance) - .values( - data.accounts.map((bal) => ({ - id: randomUUID(), - user_id: authData.user.id, - plaid_id: bal.account_id, - avaliable: bal.balances.available as any, - current: bal.balances.current as any, - name: bal.name, - tokenId: account.id, - })), - ) - .onConflictDoUpdate({ - target: balance.plaid_id, - set: { - current: sql.raw(`excluded.${balance.current.name}`), - avaliable: sql.raw(`excluded.${balance.avaliable.name}`), - }, - }); - } + const account = accounts.at(0)!; + + const { data } = await plaidClient.transactionsSync({ + access_token: account.token, + cursor: account.syncCursor || undefined, + }); + + const added = data.added.map((tx) => + transactionFromPlaid(authData.user.id, tx), + ); + + const updated = data.modified.map((tx) => + transactionFromPlaid(authData.user.id, tx), + ); + + console.log("added", added.length); + console.log("updated", updated.length); + console.log("removed", data.removed.length); + console.log("next cursor", data.next_cursor); + + await db.transaction(async (tx) => { + if (added.length) { + await tx.insert(transaction).values(added); + } + + if (updated.length) { + await tx + .insert(transaction) + .values(updated) + .onConflictDoUpdate({ + target: transaction.plaid_id, + set: { + name: sql.raw(`excluded.${transaction.name.name}`), + amount: sql.raw(`excluded.${transaction.amount.name}`), + json: sql.raw(`excluded.${transaction.json.name}`), + }, + }); + } + + if (data.removed.length) { + await tx.delete(transaction).where( + inArray( + transaction.id, + data.removed.map((tx) => tx.transaction_id), + ), + ); + } + + await tx + .update(plaidAccessTokens) + .set({ syncCursor: data.next_cursor }) + .where(eq(plaidAccessTokens.id, account.id)); + }); }, + + // async updateTransactions() { + // isLoggedIn(authData); + // const accounts = await db.query.plaidAccessTokens.findMany({ + // where: eq(plaidAccessTokens.userId, authData.user.id), + // }); + // if (accounts.length == 0) { + // console.error("No accounts"); + // return; + // } + // + // for (const account of accounts) { + // const { data } = await plaidClient.transactionsGet({ + // access_token: account.token, + // start_date: "2025-10-01", + // end_date: new Date().toISOString().split("T")[0], + // }); + // + // const transactions = data.transactions.map( + // (tx) => + // ({ + // id: randomUUID(), + // user_id: authData.user.id, + // plaid_id: tx.transaction_id, + // account_id: tx.account_id, + // name: tx.name, + // amount: tx.amount as any, + // datetime: tx.datetime + // ? new Date(tx.datetime) + // : new Date(tx.date), + // authorized_datetime: tx.authorized_datetime + // ? new Date(tx.authorized_datetime) + // : undefined, + // json: JSON.stringify(tx), + // }) satisfies InferInsertModel, + // ); + // + // await db + // .insert(transaction) + // .values(transactions) + // .onConflictDoNothing({ + // target: transaction.plaid_id, + // }); + // + // const txReplacingPendingIds = data.transactions + // .filter((t) => t.pending_transaction_id) + // .map((t) => t.pending_transaction_id!); + // + // await db + // .delete(transaction) + // .where(inArray(transaction.plaid_id, txReplacingPendingIds)); + // } + // }, + // + // async updateBalences() { + // isLoggedIn(authData); + // const accounts = await db.query.plaidAccessTokens.findMany({ + // where: eq(plaidAccessTokens.userId, authData.user.id), + // }); + // if (accounts.length == 0) { + // console.error("No accounts"); + // return; + // } + // + // for (const account of accounts) { + // const { data } = await plaidClient.accountsBalanceGet({ + // access_token: account.token, + // }); + // await db + // .insert(balance) + // .values( + // data.accounts.map((bal) => ({ + // id: randomUUID(), + // user_id: authData.user.id, + // plaid_id: bal.account_id, + // avaliable: bal.balances.available as any, + // current: bal.balances.current as any, + // name: bal.name, + // tokenId: account.id, + // })), + // ) + // .onConflictDoUpdate({ + // target: balance.plaid_id, + // set: { + // current: sql.raw(`excluded.${balance.current.name}`), + // avaliable: sql.raw(`excluded.${balance.avaliable.name}`), + // }, + // }); + // } + // }, }, } as const satisfies Mutators; }; diff --git a/package.json b/package.json index 6414165..cda94f7 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,9 @@ "private": true, "scripts": { "dev": "process-compose up -p 0", - "tui": "pnpm --filter=@money/tui run build && pnpm --filter=@money/tui run start" + "tui": "pnpm --filter=@money/tui run build && pnpm --filter=@money/tui run start", + "db:gen": "pnpm --filter=@money/shared db:gen", + "db:push": "pnpm --filter=@money/shared db:push" }, "pnpm": { "onlyBuiltDependencies": ["@rocicorp/zero-sqlite3"], diff --git a/packages/shared/package.json b/packages/shared/package.json index 6e7130d..e87c244 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -13,6 +13,6 @@ }, "scripts": { "db:gen": "drizzle-zero generate -s ./src/db/schema/public.ts -o ./src/zero-schema.gen.ts -f && sed -i 's/enableLegacyQueries: true,/enableLegacyQueries: false,/g' src/zero-schema.gen.ts && sed -i 's/enableLegacyMutators: true,/enableLegacyMutators: false,/g' src/zero-schema.gen.ts", - "db:migrate": "drizzle-kit push" + "db:push": "drizzle-kit push" } } diff --git a/packages/shared/src/db/schema/public.ts b/packages/shared/src/db/schema/public.ts index 49e7c21..7798a39 100644 --- a/packages/shared/src/db/schema/public.ts +++ b/packages/shared/src/db/schema/public.ts @@ -65,6 +65,7 @@ export const plaidAccessTokens = pgTable("plaidAccessToken", { logoUrl: text("logoUrl").notNull(), userId: text("user_id").notNull(), token: text("token").notNull(), + syncCursor: text("sync_cursor"), createdAt: timestamp("created_at").notNull().defaultNow(), }); diff --git a/packages/shared/src/mutators.ts b/packages/shared/src/mutators.ts index 2ae3374..0757b40 100644 --- a/packages/shared/src/mutators.ts +++ b/packages/shared/src/mutators.ts @@ -10,8 +10,10 @@ export function createMutators(authData: AuthData | null) { link: { async create() {}, async get(tx: Tx, { link_token }: { link_token: string }) {}, - async updateTransactions() {}, - async updateBalences() {}, + async webhook() {}, + async sync() {}, + // async updateTransactions() {}, + // async updateBalences() {}, async deleteAccounts(tx: Tx, { accountIds }: { accountIds: string[] }) { isLoggedIn(authData); for (const id of accountIds) { diff --git a/packages/shared/src/zero-schema.gen.ts b/packages/shared/src/zero-schema.gen.ts index 3382efb..6f8ee34 100644 --- a/packages/shared/src/zero-schema.gen.ts +++ b/packages/shared/src/zero-schema.gen.ts @@ -345,6 +345,16 @@ export const schema = { "token" >, }, + syncCursor: { + type: "string", + optional: true, + customType: null as unknown as ZeroCustomType< + ZeroSchema, + "plaidAccessTokens", + "syncCursor" + >, + serverName: "sync_cursor", + }, createdAt: { type: "number", optional: true, diff --git a/packages/ui/src/transactions.tsx b/packages/ui/src/transactions.tsx index 31f8d2d..032af21 100644 --- a/packages/ui/src/transactions.tsx +++ b/packages/ui/src/transactions.tsx @@ -40,9 +40,7 @@ export function Transactions() { z.mutate.link.updateTransactions() }, - ]} + shortcuts={[{ key: "r", handler: () => z.mutate.link.sync() }]} >