diff --git a/src/lib/nats.ts b/src/lib/nats.ts new file mode 100644 index 0000000..2633dd2 --- /dev/null +++ b/src/lib/nats.ts @@ -0,0 +1,110 @@ +import { PostgresDb } from '@fastify/postgres'; +import { GraphQLClient } from 'graphql-request'; +import { Provider } from 'ethers'; +import { Redis as RedisClient } from 'ioredis'; +import { Codec, JsMsg, JSONCodec } from 'nats'; +import { RegistrationEvent, TransferEvent } from '@lib/custodail'; +import { AccountService, getPhoneNumberFromAddress } from '@services/account'; +import { SystemError } from '@lib/errors'; +import { retrieveWalletBalance } from '@lib/ussd'; +import { config } from '@/config'; +import { generateUserTag, UserService } from '@services/user'; +import { processTransaction } from '@services/transfer'; +import { logger } from '@/app'; + +type EventHandler = ( + db: PostgresDb, + graphql: GraphQLClient, + data: T, + provider: Provider, + redis: RedisClient +) => Promise; + + +function createHandler(codec: Codec, eventHandler: EventHandler) { + return async function ( + db: PostgresDb, + graphql: GraphQLClient, + message: JsMsg, + provider: Provider, + redis: RedisClient + ) { + const data = codec.decode(message.data); + await eventHandler(db, graphql, data, provider, redis); + }; +} + +async function processTransferEvent( + db: PostgresDb, + graphql: GraphQLClient, + data: TransferEvent, + provider: Provider, + redis: RedisClient +): Promise { + const { success} = data; + if (success) { + await Promise.all([ + processTransaction(data.from, db, graphql, provider, redis, data), + processTransaction(data.to, db, graphql, provider, redis, data), + ]) + } +} + +async function processRegistrationEvent( + db: PostgresDb, + graphql: GraphQLClient, + data: RegistrationEvent, + provider: Provider, + redis: RedisClient +): Promise { + const phoneNumber = await getPhoneNumberFromAddress(data.to, db, redis) + if (!phoneNumber) { + throw new SystemError(`Could not find phone number for address: ${data.to}`) + } + const tag = await generateUserTag(data.to, graphql, phoneNumber) + await new AccountService(db, redis).activateOnChain(config.DEFAULT_VOUCHER.ADDRESS, phoneNumber) + const balance = await retrieveWalletBalance(data.to, config.DEFAULT_VOUCHER.ADDRESS, provider) + await new UserService(phoneNumber, redis).update({ + account: { + active_voucher_address: config.DEFAULT_VOUCHER.ADDRESS, + }, + tag, + vouchers: { + active: { + address: config.DEFAULT_VOUCHER.ADDRESS, + balance, + symbol: config.DEFAULT_VOUCHER.SYMBOL, + } + } + }) +} + +const handleTransfer = createHandler( + JSONCodec(), + processTransferEvent +); +const handleRegistration = createHandler( + JSONCodec(), + processRegistrationEvent +); + +export async function processMessage(db: PostgresDb, graphql: GraphQLClient, message: JsMsg, provider: Provider, redis: RedisClient) { + if (message.subject === `${config.NATS.STREAM_NAME}.register`) { + try { + await handleRegistration(db, graphql, message, provider, redis); + message.ack() + } catch (error: any) { + throw new SystemError(`Error handling registration: ${error.message}`); + } + } else if (message.subject === `${config.NATS.STREAM_NAME}.transfer`) { + try { + await handleTransfer(db, graphql, message, provider, redis); + message.ack() + } catch (error: any) { + throw new SystemError(`Error handling transfer: ${error.message}`); + } + } else { + logger.debug(`Unsupported subject: ${message.subject}`); + message.ack() + } +} diff --git a/src/lib/natsHandler.ts b/src/lib/natsHandler.ts deleted file mode 100644 index e0c483d..0000000 --- a/src/lib/natsHandler.ts +++ /dev/null @@ -1,186 +0,0 @@ -import { config } from '@/config'; -import { JSONCodec, Msg } from 'nats'; -import { GraphQLClient } from 'graphql-request'; -import { Redis as RedisClient } from 'ioredis'; -import { getAddress, Provider } from 'ethers'; -import { PostgresDb } from '@fastify/postgres'; -import { Cache } from '@utils/redis'; -import { ActiveVoucher, getVoucherSymbol } from '@lib/ussd/voucher'; -import { retrieveWalletBalance } from '@lib/ussd/account'; -import { Transaction, TransactionType } from '@machines/statement'; -import { activateOnChain } from '@db/models/account'; -import { Address, generateTag } from '@lib/ussd/utils'; -import { User } from '@machines/utils'; -import { logger } from '@/app'; -import { SystemError } from '@lib/errors'; - -interface TransferEvent { - block: number; - contractAddress: string; - from: string; - success: boolean; - to: string; - transactionHash: string; - transactionIndex: number; - value: number; -} - -export async function processMessage (db: PostgresDb, graphql: GraphQLClient, msg: Msg, provider: Provider, redis: RedisClient) { - let codec: any - let message: any - - try{ - if (msg.subject === `${config.NATS.CHAIN.STREAM_NAME}.transfer`) { - { - codec = JSONCodec() - message = codec.decode(msg.data) - await handleTransfer(db, graphql, message, provider, redis) - //message.respond('OK') - } - } else { - logger.debug(`Unknown subject: ${msg.subject}`) - } - } catch(error: any) { - throw new SystemError(`Error processing NATS message: ${error.message}`) - } -} - -async function updateHeldVouchers(heldVouchers: ActiveVoucher[], voucher: ActiveVoucher): Promise { - const { address, balance, symbol } = voucher - const updatedVoucherIndex = heldVouchers.findIndex(v => v.symbol === symbol) - - if (updatedVoucherIndex >= 0) { - const updatedVoucher = { ...heldVouchers[updatedVoucherIndex], balance} - return [ ...heldVouchers.slice(0, updatedVoucherIndex), updatedVoucher, ...heldVouchers.slice(updatedVoucherIndex + 1) ] - } else { - const newVoucher = { address: address, symbol, balance } - return [ ...heldVouchers, newVoucher ] - } -} - - -async function updateTransactions(statement: Transaction[], transaction: Transaction): Promise { - const updatedTransaction = [...statement] - - if (updatedTransaction.length >= 9) { - updatedTransaction.shift() - } - - updatedTransaction.push(transaction) - - return updatedTransaction -} - -async function handleTransfer(db: PostgresDb, graphql: GraphQLClient, message: TransferEvent, provider: Provider, redis: RedisClient) { - const { block, success, transactionHash, value } = message; - const contractAddress = getAddress(message.contractAddress) as Address - const from = getAddress(message.from) as Address - const to = getAddress(message.to) as Address - - if (!success) { - logger.error("Transaction failed: ", message) - return - } - - const txCount = await redis.get(`address-tx-count-${to}`) - if (!txCount) { - await handleRegistration(to, contractAddress, db, graphql, message, provider, redis) - await updateTxCount(to, redis, '1') - return - } - - const symbol = await getVoucherSymbol(contractAddress, graphql, redis) - await Promise.all([ - processTransaction(to, contractAddress, graphql, provider, redis, { - block, from, symbol, time: Date.now(), to, transactionHash, type: TransactionType.CREDIT, value - }, txCount), - processTransaction(from, contractAddress, graphql, provider, redis, { - block, from, symbol, time: Date.now(), to, transactionHash, type: TransactionType.DEBIT, value - }, txCount), - ]) -} -async function handleRegistration( - address: Address, - contractAddress: Address, - db: PostgresDb, - graphql: GraphQLClient, - message: TransferEvent, - provider: Provider, - redis: RedisClient) { - - const phoneNumber = await getPhoneNumber(address, redis) - - if (!phoneNumber) { - throw new SystemError(`Could not find phone number for address: ${address}.`) - } - - - try{ - const balance = await retrieveWalletBalance(address, contractAddress, provider) - const symbol = await getVoucherSymbol(contractAddress, graphql, redis) - const voucher = { address: contractAddress, balance, symbol } - const tag = await generateTag(address, graphql, phoneNumber) - await activateOnChain(address, db, redis) - - const cache = new Cache(redis, phoneNumber) - await cache.updateJSON({ - vouchers: { - active: voucher, - held: [voucher], - }, - tag, - }) - logger.debug(`Account: ${address} successfully set up.`) - } catch (error) { - throw new SystemError(`Error setting up account: ${address}.`) - } -} - - -async function processTransaction(accountAddress: Address, - contractAddress: Address, - graphql: GraphQLClient, - provider: Provider, - redis: RedisClient, - transaction: Transaction, - txCount: string) { - - const phoneNumber = await getPhoneNumber(accountAddress, redis) - - if (!phoneNumber) { - logger.error(`No phone number mapped to address: ${accountAddress}`) - return - } - - const cache = new Cache(redis, phoneNumber) - let user: User = await cache.getJSON() as User - - if(!user) { - logger.error(`No user found for phone number: ${phoneNumber}`) - return - } - - const balance = await retrieveWalletBalance(accountAddress, contractAddress, provider) - - const [held, transactions] = await Promise.all([ - updateHeldVouchers(user.vouchers?.held || [], { address: contractAddress, balance, symbol: transaction.symbol }), - updateTransactions(user.transactions || [], transaction) - ]) - - await cache.updateJSON({ - transactions: transactions, - vouchers: { - held: held, - } - }) - - await updateTxCount(accountAddress, redis, `${parseInt(txCount) + 1}`) -} - -async function updateTxCount(address: Address, redis: RedisClient, txCount: string) { - await redis.set(`address-tx-count-${address}`, txCount) -} - -async function getPhoneNumber(address: Address, redis: RedisClient) { - return redis.get(`address-phone-${address}`); -} \ No newline at end of file diff --git a/src/plugins/nats.ts b/src/plugins/nats.ts index fa05681..e4feb59 100644 --- a/src/plugins/nats.ts +++ b/src/plugins/nats.ts @@ -1,49 +1,66 @@ import { FastifyPluginAsync } from 'fastify'; import fp from 'fastify-plugin'; -import { connect, ConnectionOptions, Msg, NatsError } from 'nats'; -import { processMessage } from '@lib/natsHandler'; - +import { AckPolicy, connect, consumerOpts, DeliverPolicy, JsMsg } from 'nats'; +import { processMessage } from '@lib/nats'; +import { config } from '@/config'; interface NatsPluginOptions { - connOpts: ConnectionOptions; - subjects: string[]; + durableName: string; + server: string; + streamName: string; + subject: string; +} + +async function handleMessage(fastify: any, message: JsMsg | null) { + if(message){ + try { + await processMessage(fastify.pg, fastify.graphql, message, fastify.provider, fastify.p_redis); + } catch (error: any) { + fastify.log.error(`Error processing NATS message: ${error.message}`); + // requeue message after 50 seconds + message.nak(50000); + } + } } const natsPlugin: FastifyPluginAsync = async (fastify, options) => { + const natsConnection = await connect({ debug: config.DEV, servers: [options.server] }); + fastify.log.debug(`Connected to NATS server at: ${options.server}`); + const jetStreamManager = await natsConnection.jetstreamManager(); + const jetStreamClient = natsConnection.jetstream(); - let { connOpts, subjects } = options; + const consumerConfig = { + ack_policy: AckPolicy.Explicit, + deliver_subject: `deliver-${options.durableName}`, + durable_name: options.durableName, + deliver_policy: DeliverPolicy.All, + }; - if (connOpts.servers?.length === 0) { - throw new Error("NATS server URL not specified."); - } + await jetStreamManager.consumers.add(options.streamName, consumerConfig); - const nc = await connect( connOpts); - fastify.log.debug(`Connected to NATS server at ${connOpts?.servers?[0]: []}.`); + const opts = consumerOpts(consumerConfig); - const handler = async (err: NatsError | null, msg: Msg) => { - if (err) { - fastify.log.error(err); - return; + opts.callback((error, msg) => { + if (error) { + fastify.log.error(`Error processing NATS message: ${error.message}`); + msg?.nak(); + } else { + handleMessage(fastify, msg); } - await processMessage(fastify.pg, fastify.graphql, msg, fastify.provider, fastify.p_redis) - } + }); - for (const subject of subjects) { - fastify.log.debug(`Subscribing to subject ${subject}.`); - nc.subscribe(subject, { - callback: handler, - }); - } + opts.bind(options.streamName, options.durableName) - fastify.addHook("onClose", async (_) => { - await nc.drain(); - await nc.close(); - }) + await jetStreamClient.subscribe(`${options.streamName}.${options.subject}`, opts) -} + fastify.addHook("onClose", async (instance) => { + await natsConnection.drain(); + await natsConnection.close(); + }) +}; export default fp(natsPlugin, { fastify: '4.x', - name: 'nats-plugin' -}) + name: 'nats-plugin', +});