Skip to content

Commit

Permalink
fix(nats): Explicit transfer data handling
Browse files Browse the repository at this point in the history
- separate transaction handling for "GraphTransaction" and
"CustodialTransferEvents": N/B this will be consolidated later.

Closes #28
  • Loading branch information
Mango Habanero committed Apr 28, 2023
1 parent 355cffa commit b9dad51
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 32 deletions.
80 changes: 71 additions & 9 deletions src/lib/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import { Codec, JsMsg, JSONCodec } from 'nats';
import { RegistrationEvent, TransferEvent } from '@lib/custodail';
import { AccountService, getPhoneNumberFromAddress } from '@services/account';
import { SystemError } from '@lib/errors';
import { retrieveWalletBalance } from '@lib/ussd';
import { getVoucherSymbol, retrieveWalletBalance } from '@lib/ussd';
import { config } from '@/config';
import { generateUserTag, UserService } from '@services/user';
import { processTransaction } from '@services/transfer';
import { generateUserTag, User, UserService } from '@services/user';
import { formatTransferData, updateHeldVouchers, updateStatement } from '@services/transfer';
import { logger } from '@/app';

type EventHandler<T> = (
Expand Down Expand Up @@ -41,15 +41,76 @@ async function processTransferEvent(
provider: Provider,
redis: RedisClient
): Promise<void> {
const { success} = data;
if (success) {
await Promise.all([
processTransaction(data.from, db, graphql, provider, redis, data),
processTransaction(data.to, db, graphql, provider, redis, data),
])
if (!data.success) {
logger.error(`Transfer failed: ${data.transactionHash}`);
return;
}

const symbol = await getVoucherSymbol(data.contractAddress, graphql, redis);
if (!symbol) {
throw new SystemError(`Could not find symbol for contract address: ${data.contractAddress}`);
}

const [sender, senderService] = await getUser(db, redis, data.from, true);
const [recipient, recipientService] = await getUser(db, redis, data.to, false);

if (!sender && recipient) {
await updateUser(recipient, recipientService, sender, symbol, data, provider);
return;
}

if (sender && recipient) {
await updateUser(sender, senderService, recipient, symbol, data, provider);
await updateUser(recipient, recipientService, sender, symbol, data, provider);
}
}

async function getUser(
db: PostgresDb,
redis: RedisClient,
address: string,
isSender: boolean,
counterparty?: any
): Promise<[null, null] | [User, UserService]> {
const phoneNumber = await getPhoneNumberFromAddress(address, db, redis);

if (isSender && !phoneNumber) {
return [null, null];
} else if (!phoneNumber) {
throw new SystemError(`Could not find phone number for address: ${address}`);
}

const userService = new UserService(phoneNumber, redis);
const user = await userService.get();

if (!user) {
throw new SystemError(`Could not find recipient: ${phoneNumber}`);
}
return [user as User, userService];
}

async function updateUser(
user: User,
userService: UserService,
counterparty: Partial<User> | null | undefined,
symbol: string,
data: TransferEvent,
provider: Provider
) {
const transaction = await formatTransferData(data, user, counterparty, symbol);
const statement = await updateStatement(user.statement || [], transaction);
const balance = await retrieveWalletBalance(user.account.address, data.contractAddress, provider);
const heldVouchers = await updateHeldVouchers(user.vouchers?.held || [], { address: data.contractAddress, balance, symbol });

await userService.update({
statement,
vouchers: {
held: heldVouchers,
},
});
}


async function processRegistrationEvent(
db: PostgresDb,
graphql: GraphQLClient,
Expand Down Expand Up @@ -98,6 +159,7 @@ export async function processMessage(db: PostgresDb, graphql: GraphQLClient, mes
}
} else if (message.subject === `${config.NATS.STREAM_NAME}.transfer`) {
try {
console.log('Handling transfer')
await handleTransfer(db, graphql, message, provider, redis);
message.ack()
} catch (error: any) {
Expand Down
44 changes: 21 additions & 23 deletions src/plugins/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,6 @@ interface NatsPluginOptions {
subject: string;
}

async function handleMessage(fastify: any, message: JsMsg | null) {
if(message){
try {
await processMessage(fastify.pg, fastify.graphql, message, fastify.provider, fastify.p_redis);
} catch (error: any) {
fastify.log.error(`Error processing NATS message: ${error.message}`);
// requeue message after 50 seconds
message.nak(50000);
}
}
}

const natsPlugin: FastifyPluginAsync<NatsPluginOptions> = async (fastify, options) => {
const natsConnection = await connect({ debug: config.DEV, servers: [options.server] });
fastify.log.debug(`Connected to NATS server at: ${options.server}`);
Expand All @@ -40,20 +28,30 @@ const natsPlugin: FastifyPluginAsync<NatsPluginOptions> = async (fastify, option

const opts = consumerOpts(consumerConfig);

opts.callback((error, msg) => {
if (error) {
fastify.log.error(`Error processing NATS message: ${error.message}`);
msg?.nak();
} else {
handleMessage(fastify, msg);
}
});

opts.bind(options.streamName, options.durableName)

await jetStreamClient.subscribe(`${options.streamName}.${options.subject}`, opts)
const subscription = await jetStreamClient.subscribe(`${options.streamName}.${options.subject}`, opts)
const done = async () => {
for await (const message of subscription) {
if(message){
try {
await processMessage(fastify.pg, fastify.graphql, message, fastify.provider, fastify.p_redis);
// Add delay before processing the next message
await new Promise((resolve) => setTimeout(resolve, 5000));
} catch (error: any) {
fastify.log.error(`Error processing NATS message: ${error.message}`);
// requeue message after 50 seconds
message.nak(50000);
}
}
}
}

done().catch((err) => {
fastify.log.error(`Error processing NATS message: ${err.message}`);
});

fastify.addHook("onClose", async (instance) => {
fastify.addHook("onClose", async (_) => {
await natsConnection.drain();
await natsConnection.close();
})
Expand Down

0 comments on commit b9dad51

Please sign in to comment.