Skip to content

Commit

Permalink
Stableswap lp transfer (#4308)
Browse files Browse the repository at this point in the history
* fix: create external swap template

* fix: lpTokenAmount of removeOneToken

* feat: staging subgraph for lp token

* fix:  LpToken subgraph

* feat: add snapshot to lp transfer events

* feat: v0 stableswap subgraph

* feat: graph-client build

* Revert "feat: graph-client build"

This reverts commit da0c931.

* feat: rebuild graph-client

* feat: migration for lp transfers

* feat: database for lp transfers

* feat: subgraph for lp transfers

* feat: poller for lp transfer

* fix: add more unit tests to database

* fix: add unit tests for poller

* chore: lint

* feat: rebuild graph-client
  • Loading branch information
liu-zhipeng authored May 22, 2023
1 parent 1fe2310 commit 11ff45e
Show file tree
Hide file tree
Showing 100 changed files with 461,792 additions and 224,224 deletions.
2 changes: 1 addition & 1 deletion packages/adapters/database/.nycrc.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"extends": "../../../.nycrc.json",
"exclude": ["src/index.ts", "test/**/*.ts"],
"exclude": ["src/index.ts", "src/zapatos/schema.d.ts", "test/**/*.ts"],
"all": true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- migrate:up
DROP VIEW IF EXISTS public.stableswap_lp_balances;

CREATE TABLE IF NOT EXISTS public.stableswap_lp_transfers (
id character varying(255) NOT NULL UNIQUE,
pool_id character(66) NOT NULL,
domain character varying (255) NOT NULL,
lp_token character (42) NOT NULL,
from_address character (42) NOT NULL,
to_address character (42) NOT NULL,
pooled_tokens text [],
amount numeric NOT NULL,
balances numeric [],
block_number integer NOT NULL,
transaction_hash character(66) NOT NULL,
timestamp integer NOT NULL,
PRIMARY KEY(id)
);


CREATE TABLE IF NOT EXISTS public.stableswap_lp_balances (
pool_id character(66) NOT NULL,
domain character varying (255) NOT NULL,
provider character (42) NOT NULL,
lp_token character (42) NOT NULL,
balance numeric NOT NULL,
last_timestamp integer NOT NULL,
PRIMARY KEY(pool_id, domain, provider)
);

GRANT SELECT ON public.stableswap_lp_transfers to query;
GRANT SELECT ON public.stableswap_lp_transfers to reader;

-- migrate:down
DROP TABLE IF EXISTS public.stableswap_lp_transfers;
DROP TABLE IF EXISTS public.stableswap_lp_balances;
118 changes: 47 additions & 71 deletions packages/adapters/database/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,6 @@ SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;

--
-- Name: pg_cron; Type: EXTENSION; Schema: -; Owner: -
--

CREATE EXTENSION IF NOT EXISTS pg_cron WITH SCHEMA pg_catalog;


--
-- Name: EXTENSION pg_cron; Type: COMMENT; Schema: -; Owner: -
--

COMMENT ON EXTENSION pg_cron IS 'Job scheduler for PostgreSQL';


--
-- Name: public; Type: SCHEMA; Schema: -; Owner: -
--

-- *not* creating schema, since initdb creates it


--
-- Name: action_type; Type: TYPE; Schema: public; Owner: -
--
Expand Down Expand Up @@ -763,31 +742,37 @@ CREATE TABLE public.schema_migrations (


--
-- Name: stableswap_lp_balances; Type: VIEW; Schema: public; Owner: -
-- Name: stableswap_lp_balances; Type: TABLE; Schema: public; Owner: -
--

CREATE VIEW public.stableswap_lp_balances AS
SELECT e.pool_id,
e.domain,
e.provider,
sum(
CASE
WHEN (e.action = 'Add'::public.action_type) THEN e.lp_token_amount
WHEN (e.action = 'Remove'::public.action_type) THEN (('-1'::integer)::numeric * e.lp_token_amount)
ELSE NULL::numeric
END) AS balance,
sum(
CASE
WHEN (e.action = 'Add'::public.action_type) THEN 1
ELSE 0
END) AS add_count,
sum(
CASE
WHEN (e.action = 'Remove'::public.action_type) THEN 1
ELSE 0
END) AS remove_count
FROM public.stableswap_pool_events e
GROUP BY e.pool_id, e.domain, e.provider;
CREATE TABLE public.stableswap_lp_balances (
pool_id character(66) NOT NULL,
domain character varying(255) NOT NULL,
provider character(42) NOT NULL,
lp_token character(42) NOT NULL,
balance numeric NOT NULL,
last_timestamp integer NOT NULL
);


--
-- Name: stableswap_lp_transfers; Type: TABLE; Schema: public; Owner: -
--

CREATE TABLE public.stableswap_lp_transfers (
id character varying(255) NOT NULL,
pool_id character(66) NOT NULL,
domain character varying(255) NOT NULL,
lp_token character(42) NOT NULL,
from_address character(42) NOT NULL,
to_address character(42) NOT NULL,
pooled_tokens text[],
amount numeric NOT NULL,
balances numeric[],
block_number integer NOT NULL,
transaction_hash character(66) NOT NULL,
"timestamp" integer NOT NULL
);


--
Expand Down Expand Up @@ -1135,6 +1120,22 @@ ALTER TABLE ONLY public.stableswap_exchanges
ADD CONSTRAINT stableswap_exchanges_pkey PRIMARY KEY (domain, id);


--
-- Name: stableswap_lp_balances stableswap_lp_balances_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.stableswap_lp_balances
ADD CONSTRAINT stableswap_lp_balances_pkey PRIMARY KEY (pool_id, domain, provider);


--
-- Name: stableswap_lp_transfers stableswap_lp_transfers_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.stableswap_lp_transfers
ADD CONSTRAINT stableswap_lp_transfers_pkey PRIMARY KEY (id);


--
-- Name: stableswap_pool_events stableswap_pool_events_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
Expand Down Expand Up @@ -1266,32 +1267,6 @@ ALTER TABLE ONLY public.asset_balances
ADD CONSTRAINT fk_router FOREIGN KEY (router_address) REFERENCES public.routers(address);


--
-- Name: job cron_job_policy; Type: POLICY; Schema: cron; Owner: -
--

CREATE POLICY cron_job_policy ON cron.job USING ((username = CURRENT_USER));


--
-- Name: job_run_details cron_job_run_details_policy; Type: POLICY; Schema: cron; Owner: -
--

CREATE POLICY cron_job_run_details_policy ON cron.job_run_details USING ((username = CURRENT_USER));


--
-- Name: job; Type: ROW SECURITY; Schema: cron; Owner: -
--

ALTER TABLE cron.job ENABLE ROW LEVEL SECURITY;

--
-- Name: job_run_details; Type: ROW SECURITY; Schema: cron; Owner: -
--

ALTER TABLE cron.job_run_details ENABLE ROW LEVEL SECURITY;

--
-- PostgreSQL database dump complete
--
Expand Down Expand Up @@ -1369,4 +1344,5 @@ INSERT INTO public.schema_migrations (version) VALUES
('20230509112648'),
('20230509123037'),
('20230509165732'),
('20230510210620');
('20230510210620'),
('20230519155643');
54 changes: 54 additions & 0 deletions packages/adapters/database/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import {
XTransferMessageStatus,
Asset,
AssetPrice,
StableSwapTransfer,
StableSwapLpBalance,
} from "@connext/nxtp-utils";
import { Pool } from "pg";
import * as db from "zapatos/db";
Expand Down Expand Up @@ -218,6 +220,34 @@ const convertToDbStableSwapPoolEvent = (event: StableSwapPoolEvent): s.stableswa
};
};

const convertToDbStableSwapTransfer = (event: StableSwapTransfer): s.stableswap_lp_transfers.Insertable => {
return {
id: event.id,
pool_id: event.poolId,
domain: event.domain,
lp_token: event.lpToken,
from_address: event.fromAddress,
to_address: event.toAddress,
pooled_tokens: event.pooledTokens,
amount: event.amount,
balances: event.balances,
block_number: event.blockNumber,
transaction_hash: event.transactionHash,
timestamp: event.timestamp,
};
};

const convertToDbStableSwapLpBalance = (event: StableSwapLpBalance): s.stableswap_lp_balances.Insertable => {
return {
pool_id: event.poolId,
domain: event.domain,
lp_token: event.lpToken,
provider: event.provider,
balance: event.balance,
last_timestamp: event.lastTimestamp,
};
};

const convertToDbRouterDailyTVL = (tvl: RouterDailyTVL): s.daily_router_tvl.Insertable => {
return {
id: tvl.id,
Expand Down Expand Up @@ -1039,6 +1069,30 @@ export const saveStableSwapPoolEvent = async (
await db.upsert("stableswap_pool_events", poolEvents, ["id"]).run(poolToUse);
};

export const saveStableSwapTransfers = async (
_transfers: StableSwapTransfer[],
_pool?: Pool | db.TxnClientForRepeatableRead,
): Promise<void> => {
const poolToUse = _pool ?? pool;
const transfers: s.stableswap_lp_transfers.Insertable[] = _transfers
.map((m) => convertToDbStableSwapTransfer(m))
.map(sanitizeNull);

await db.upsert("stableswap_lp_transfers", transfers, ["id"]).run(poolToUse);
};

export const saveStableSwapLpBalances = async (
_balances: StableSwapLpBalance[],
_pool?: Pool | db.TxnClientForRepeatableRead,
): Promise<void> => {
const poolToUse = _pool ?? pool;
const balances: s.stableswap_lp_balances.Insertable[] = _balances
.map((m) => convertToDbStableSwapLpBalance(m))
.map(sanitizeNull);

await db.upsert("stableswap_lp_balances", balances, ["pool_id", "domain", "provider"]).run(poolToUse);
};

export const saveRouterDailyTVL = async (
_tvls: RouterDailyTVL[],
_pool?: Pool | db.TxnClientForRepeatableRead,
Expand Down
16 changes: 15 additions & 1 deletion packages/adapters/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {
SlippageUpdate,
Asset,
AssetPrice,
StableSwapTransfer,
StableSwapLpBalance,
} from "@connext/nxtp-utils";
import { Pool } from "pg";
import { TxnClientForRepeatableRead } from "zapatos/db";
Expand Down Expand Up @@ -60,9 +62,11 @@ import {
increaseBackoff,
saveStableSwapExchange,
saveStableSwapPool,
saveStableSwapPoolEvent,
saveStableSwapLpBalances,
saveStableSwapTransfers,
resetBackoffs,
updateErrorStatus,
saveStableSwapPoolEvent,
saveRouterDailyTVL,
updateSlippage,
markRootMessagesProcessed,
Expand Down Expand Up @@ -235,6 +239,14 @@ export type Database = {
_poolEvents: StableSwapPoolEvent[],
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<void>;
saveStableSwapTransfers: (
_transfers: StableSwapTransfer[],
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<void>;
saveStableSwapLpBalances: (
_transfers: StableSwapLpBalance[],
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<void>;
markRootMessagesProcessed: (rootMessages: RootMessage[], _pool?: Pool | TxnClientForRepeatableRead) => Promise<void>;
saveRouterDailyTVL: (_tvls: RouterDailyTVL[], _pool?: Pool | TxnClientForRepeatableRead) => Promise<void>;
updateSlippage: (_slippageUpdates: SlippageUpdate[], _pool?: Pool | TxnClientForRepeatableRead) => Promise<void>;
Expand Down Expand Up @@ -316,6 +328,8 @@ export const getDatabase = async (databaseUrl: string, logger: Logger): Promise<
resetBackoffs,
saveStableSwapPool,
saveStableSwapExchange,
saveStableSwapTransfers,
saveStableSwapLpBalances,
updateErrorStatus,
saveStableSwapPoolEvent,
markRootMessagesProcessed,
Expand Down
Loading

0 comments on commit 11ff45e

Please sign in to comment.