feat: add plaid sync

This commit is contained in:
Max Koon
2025-12-15 12:53:35 -05:00
parent c6dd174376
commit 105b0c514f
9 changed files with 212 additions and 76 deletions

View File

@@ -0,0 +1,3 @@
async function sync() {}
sync();

23
apps/api/src/plaid/tx.ts Normal file
View File

@@ -0,0 +1,23 @@
import type { transaction } from "@money/shared/db";
import type { Transaction } from "plaid";
import { type InferInsertModel } from "drizzle-orm";
import { randomUUID } from "crypto";
export function transactionFromPlaid(
userId: string,
tx: Transaction,
): InferInsertModel<typeof transaction> {
return {
id: randomUUID(),
user_id: userId,
plaid_id: tx.transaction_id,
account_id: tx.account_id,
name: tx.name,
amount: tx.amount as any,
datetime: tx.datetime ? new Date(tx.datetime) : new Date(tx.date),
authorized_datetime: tx.authorized_datetime
? new Date(tx.authorized_datetime)
: undefined,
json: JSON.stringify(tx),
};
}

View File

@@ -26,6 +26,8 @@ import {
PlaidApi, PlaidApi,
PlaidEnvironments, PlaidEnvironments,
Products, Products,
SandboxItemFireWebhookRequestWebhookCodeEnum,
WebhookType,
} from "plaid"; } from "plaid";
import { randomUUID } from "crypto"; import { randomUUID } from "crypto";
import { db } from "./db"; import { db } from "./db";
@@ -35,8 +37,16 @@ import {
plaidLink, plaidLink,
transaction, transaction,
} from "@money/shared/db"; } from "@money/shared/db";
import { and, eq, inArray, sql, type InferInsertModel } from "drizzle-orm"; import {
and,
eq,
inArray,
sql,
type InferInsertModel,
type InferSelectModel,
} from "drizzle-orm";
import { plaidClient } from "./plaid"; import { plaidClient } from "./plaid";
import { transactionFromPlaid } from "./plaid/tx";
const processor = new PushProcessor( const processor = new PushProcessor(
new ZQLDatabase( new ZQLDatabase(
@@ -128,9 +138,9 @@ const createMutators = (authData: AuthData | null) => {
throw Error("Plaid error"); throw Error("Plaid error");
} }
}, },
async webhook() {
async updateTransactions() {
isLoggedIn(authData); isLoggedIn(authData);
const accounts = await db.query.plaidAccessTokens.findMany({ const accounts = await db.query.plaidAccessTokens.findMany({
where: eq(plaidAccessTokens.userId, authData.user.id), where: eq(plaidAccessTokens.userId, authData.user.id),
}); });
@@ -139,51 +149,20 @@ const createMutators = (authData: AuthData | null) => {
return; return;
} }
for (const account of accounts) { const account = accounts.at(0)!;
const { data } = await plaidClient.transactionsGet({
access_token: account.token,
start_date: "2025-10-01",
end_date: new Date().toISOString().split("T")[0],
});
const transactions = data.transactions.map( const { data } = await plaidClient.sandboxItemFireWebhook({
(tx) => access_token: account.token,
({ webhook_type: WebhookType.Transactions,
id: randomUUID(), webhook_code:
user_id: authData.user.id, SandboxItemFireWebhookRequestWebhookCodeEnum.DefaultUpdate,
plaid_id: tx.transaction_id, });
account_id: tx.account_id,
name: tx.name,
amount: tx.amount as any,
datetime: tx.datetime
? new Date(tx.datetime)
: new Date(tx.date),
authorized_datetime: tx.authorized_datetime
? new Date(tx.authorized_datetime)
: undefined,
json: JSON.stringify(tx),
}) satisfies InferInsertModel<typeof transaction>,
);
await db console.log(data);
.insert(transaction)
.values(transactions)
.onConflictDoNothing({
target: transaction.plaid_id,
});
const txReplacingPendingIds = data.transactions
.filter((t) => t.pending_transaction_id)
.map((t) => t.pending_transaction_id!);
await db
.delete(transaction)
.where(inArray(transaction.plaid_id, txReplacingPendingIds));
}
}, },
async sync() {
async updateBalences() {
isLoggedIn(authData); isLoggedIn(authData);
const accounts = await db.query.plaidAccessTokens.findMany({ const accounts = await db.query.plaidAccessTokens.findMany({
where: eq(plaidAccessTokens.userId, authData.user.id), where: eq(plaidAccessTokens.userId, authData.user.id),
}); });
@@ -192,32 +171,150 @@ const createMutators = (authData: AuthData | null) => {
return; return;
} }
for (const account of accounts) { const account = accounts.at(0)!;
const { data } = await plaidClient.accountsBalanceGet({
access_token: account.token, const { data } = await plaidClient.transactionsSync({
}); access_token: account.token,
await db cursor: account.syncCursor || undefined,
.insert(balance) });
.values(
data.accounts.map((bal) => ({ const added = data.added.map((tx) =>
id: randomUUID(), transactionFromPlaid(authData.user.id, tx),
user_id: authData.user.id, );
plaid_id: bal.account_id,
avaliable: bal.balances.available as any, const updated = data.modified.map((tx) =>
current: bal.balances.current as any, transactionFromPlaid(authData.user.id, tx),
name: bal.name, );
tokenId: account.id,
})), console.log("added", added.length);
) console.log("updated", updated.length);
.onConflictDoUpdate({ console.log("removed", data.removed.length);
target: balance.plaid_id, console.log("next cursor", data.next_cursor);
set: {
current: sql.raw(`excluded.${balance.current.name}`), await db.transaction(async (tx) => {
avaliable: sql.raw(`excluded.${balance.avaliable.name}`), if (added.length) {
}, await tx.insert(transaction).values(added);
}); }
}
if (updated.length) {
await tx
.insert(transaction)
.values(updated)
.onConflictDoUpdate({
target: transaction.plaid_id,
set: {
name: sql.raw(`excluded.${transaction.name.name}`),
amount: sql.raw(`excluded.${transaction.amount.name}`),
json: sql.raw(`excluded.${transaction.json.name}`),
},
});
}
if (data.removed.length) {
await tx.delete(transaction).where(
inArray(
transaction.id,
data.removed.map((tx) => tx.transaction_id),
),
);
}
await tx
.update(plaidAccessTokens)
.set({ syncCursor: data.next_cursor })
.where(eq(plaidAccessTokens.id, account.id));
});
}, },
// async updateTransactions() {
// isLoggedIn(authData);
// const accounts = await db.query.plaidAccessTokens.findMany({
// where: eq(plaidAccessTokens.userId, authData.user.id),
// });
// if (accounts.length == 0) {
// console.error("No accounts");
// return;
// }
//
// for (const account of accounts) {
// const { data } = await plaidClient.transactionsGet({
// access_token: account.token,
// start_date: "2025-10-01",
// end_date: new Date().toISOString().split("T")[0],
// });
//
// const transactions = data.transactions.map(
// (tx) =>
// ({
// id: randomUUID(),
// user_id: authData.user.id,
// plaid_id: tx.transaction_id,
// account_id: tx.account_id,
// name: tx.name,
// amount: tx.amount as any,
// datetime: tx.datetime
// ? new Date(tx.datetime)
// : new Date(tx.date),
// authorized_datetime: tx.authorized_datetime
// ? new Date(tx.authorized_datetime)
// : undefined,
// json: JSON.stringify(tx),
// }) satisfies InferInsertModel<typeof transaction>,
// );
//
// await db
// .insert(transaction)
// .values(transactions)
// .onConflictDoNothing({
// target: transaction.plaid_id,
// });
//
// const txReplacingPendingIds = data.transactions
// .filter((t) => t.pending_transaction_id)
// .map((t) => t.pending_transaction_id!);
//
// await db
// .delete(transaction)
// .where(inArray(transaction.plaid_id, txReplacingPendingIds));
// }
// },
//
// async updateBalences() {
// isLoggedIn(authData);
// const accounts = await db.query.plaidAccessTokens.findMany({
// where: eq(plaidAccessTokens.userId, authData.user.id),
// });
// if (accounts.length == 0) {
// console.error("No accounts");
// return;
// }
//
// for (const account of accounts) {
// const { data } = await plaidClient.accountsBalanceGet({
// access_token: account.token,
// });
// await db
// .insert(balance)
// .values(
// data.accounts.map((bal) => ({
// id: randomUUID(),
// user_id: authData.user.id,
// plaid_id: bal.account_id,
// avaliable: bal.balances.available as any,
// current: bal.balances.current as any,
// name: bal.name,
// tokenId: account.id,
// })),
// )
// .onConflictDoUpdate({
// target: balance.plaid_id,
// set: {
// current: sql.raw(`excluded.${balance.current.name}`),
// avaliable: sql.raw(`excluded.${balance.avaliable.name}`),
// },
// });
// }
// },
}, },
} as const satisfies Mutators; } as const satisfies Mutators;
}; };

View File

@@ -3,7 +3,9 @@
"private": true, "private": true,
"scripts": { "scripts": {
"dev": "process-compose up -p 0", "dev": "process-compose up -p 0",
"tui": "pnpm --filter=@money/tui run build && pnpm --filter=@money/tui run start" "tui": "pnpm --filter=@money/tui run build && pnpm --filter=@money/tui run start",
"db:gen": "pnpm --filter=@money/shared db:gen",
"db:push": "pnpm --filter=@money/shared db:push"
}, },
"pnpm": { "pnpm": {
"onlyBuiltDependencies": ["@rocicorp/zero-sqlite3"], "onlyBuiltDependencies": ["@rocicorp/zero-sqlite3"],

View File

@@ -13,6 +13,6 @@
}, },
"scripts": { "scripts": {
"db:gen": "drizzle-zero generate -s ./src/db/schema/public.ts -o ./src/zero-schema.gen.ts -f && sed -i 's/enableLegacyQueries: true,/enableLegacyQueries: false,/g' src/zero-schema.gen.ts && sed -i 's/enableLegacyMutators: true,/enableLegacyMutators: false,/g' src/zero-schema.gen.ts", "db:gen": "drizzle-zero generate -s ./src/db/schema/public.ts -o ./src/zero-schema.gen.ts -f && sed -i 's/enableLegacyQueries: true,/enableLegacyQueries: false,/g' src/zero-schema.gen.ts && sed -i 's/enableLegacyMutators: true,/enableLegacyMutators: false,/g' src/zero-schema.gen.ts",
"db:migrate": "drizzle-kit push" "db:push": "drizzle-kit push"
} }
} }

View File

@@ -65,6 +65,7 @@ export const plaidAccessTokens = pgTable("plaidAccessToken", {
logoUrl: text("logoUrl").notNull(), logoUrl: text("logoUrl").notNull(),
userId: text("user_id").notNull(), userId: text("user_id").notNull(),
token: text("token").notNull(), token: text("token").notNull(),
syncCursor: text("sync_cursor"),
createdAt: timestamp("created_at").notNull().defaultNow(), createdAt: timestamp("created_at").notNull().defaultNow(),
}); });

View File

@@ -10,8 +10,10 @@ export function createMutators(authData: AuthData | null) {
link: { link: {
async create() {}, async create() {},
async get(tx: Tx, { link_token }: { link_token: string }) {}, async get(tx: Tx, { link_token }: { link_token: string }) {},
async updateTransactions() {}, async webhook() {},
async updateBalences() {}, async sync() {},
// async updateTransactions() {},
// async updateBalences() {},
async deleteAccounts(tx: Tx, { accountIds }: { accountIds: string[] }) { async deleteAccounts(tx: Tx, { accountIds }: { accountIds: string[] }) {
isLoggedIn(authData); isLoggedIn(authData);
for (const id of accountIds) { for (const id of accountIds) {

View File

@@ -345,6 +345,16 @@ export const schema = {
"token" "token"
>, >,
}, },
syncCursor: {
type: "string",
optional: true,
customType: null as unknown as ZeroCustomType<
ZeroSchema,
"plaidAccessTokens",
"syncCursor"
>,
serverName: "sync_cursor",
},
createdAt: { createdAt: {
type: "number", type: "number",
optional: true, optional: true,

View File

@@ -40,9 +40,7 @@ export function Transactions() {
<Table.Provider <Table.Provider
data={items} data={items}
columns={COLUMNS} columns={COLUMNS}
shortcuts={[ shortcuts={[{ key: "r", handler: () => z.mutate.link.sync() }]}
{ key: "r", handler: () => z.mutate.link.updateTransactions() },
]}
> >
<View style={{ padding: 10, flex: 1 }}> <View style={{ padding: 10, flex: 1 }}>
<View style={{ flexShrink: 0 }}> <View style={{ flexShrink: 0 }}>