Skip to content

Commit

Permalink
refactor(nats): Refactor
Browse files Browse the repository at this point in the history
- Switches from ephemeral to durable consumer
- Improves natsHandler
logic.

Closes #28
  • Loading branch information
Mango Habanero committed Apr 27, 2023
1 parent 6dcae31 commit 355cffa
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 216 deletions.
110 changes: 110 additions & 0 deletions src/lib/nats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { PostgresDb } from '@fastify/postgres';
import { GraphQLClient } from 'graphql-request';
import { Provider } from 'ethers';
import { Redis as RedisClient } from 'ioredis';
import { Codec, JsMsg, JSONCodec } from 'nats';
import { RegistrationEvent, TransferEvent } from '@lib/custodail';
import { AccountService, getPhoneNumberFromAddress } from '@services/account';
import { SystemError } from '@lib/errors';
import { retrieveWalletBalance } from '@lib/ussd';
import { config } from '@/config';
import { generateUserTag, UserService } from '@services/user';
import { processTransaction } from '@services/transfer';
import { logger } from '@/app';

type EventHandler<T> = (
db: PostgresDb,
graphql: GraphQLClient,
data: T,
provider: Provider,
redis: RedisClient
) => Promise<void>;


function createHandler<T>(codec: Codec<T>, eventHandler: EventHandler<T>) {
return async function (
db: PostgresDb,
graphql: GraphQLClient,
message: JsMsg,
provider: Provider,
redis: RedisClient
) {
const data = codec.decode(message.data);
await eventHandler(db, graphql, data, provider, redis);
};
}

async function processTransferEvent(
db: PostgresDb,
graphql: GraphQLClient,
data: TransferEvent,
provider: Provider,
redis: RedisClient
): Promise<void> {
const { success} = data;
if (success) {
await Promise.all([
processTransaction(data.from, db, graphql, provider, redis, data),
processTransaction(data.to, db, graphql, provider, redis, data),
])
}
}

async function processRegistrationEvent(
db: PostgresDb,
graphql: GraphQLClient,
data: RegistrationEvent,
provider: Provider,
redis: RedisClient
): Promise<void> {
const phoneNumber = await getPhoneNumberFromAddress(data.to, db, redis)
if (!phoneNumber) {
throw new SystemError(`Could not find phone number for address: ${data.to}`)
}
const tag = await generateUserTag(data.to, graphql, phoneNumber)
await new AccountService(db, redis).activateOnChain(config.DEFAULT_VOUCHER.ADDRESS, phoneNumber)
const balance = await retrieveWalletBalance(data.to, config.DEFAULT_VOUCHER.ADDRESS, provider)
await new UserService(phoneNumber, redis).update({
account: {
active_voucher_address: config.DEFAULT_VOUCHER.ADDRESS,
},
tag,
vouchers: {
active: {
address: config.DEFAULT_VOUCHER.ADDRESS,
balance,
symbol: config.DEFAULT_VOUCHER.SYMBOL,
}
}
})
}

const handleTransfer = createHandler(
JSONCodec<TransferEvent>(),
processTransferEvent
);
const handleRegistration = createHandler(
JSONCodec<RegistrationEvent>(),
processRegistrationEvent
);

export async function processMessage(db: PostgresDb, graphql: GraphQLClient, message: JsMsg, provider: Provider, redis: RedisClient) {
if (message.subject === `${config.NATS.STREAM_NAME}.register`) {
try {
await handleRegistration(db, graphql, message, provider, redis);
message.ack()
} catch (error: any) {
throw new SystemError(`Error handling registration: ${error.message}`);
}
} else if (message.subject === `${config.NATS.STREAM_NAME}.transfer`) {
try {
await handleTransfer(db, graphql, message, provider, redis);
message.ack()
} catch (error: any) {
throw new SystemError(`Error handling transfer: ${error.message}`);
}
} else {
logger.debug(`Unsupported subject: ${message.subject}`);
message.ack()
}
}
186 changes: 0 additions & 186 deletions src/lib/natsHandler.ts

This file was deleted.

77 changes: 47 additions & 30 deletions src/plugins/nats.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,66 @@
import { FastifyPluginAsync } from 'fastify';
import fp from 'fastify-plugin';
import { connect, ConnectionOptions, Msg, NatsError } from 'nats';
import { processMessage } from '@lib/natsHandler';

import { AckPolicy, connect, consumerOpts, DeliverPolicy, JsMsg } from 'nats';
import { processMessage } from '@lib/nats';
import { config } from '@/config';

interface NatsPluginOptions {
connOpts: ConnectionOptions;
subjects: string[];
durableName: string;
server: string;
streamName: string;
subject: string;
}

async function handleMessage(fastify: any, message: JsMsg | null) {
if(message){
try {
await processMessage(fastify.pg, fastify.graphql, message, fastify.provider, fastify.p_redis);
} catch (error: any) {
fastify.log.error(`Error processing NATS message: ${error.message}`);
// requeue message after 50 seconds
message.nak(50000);
}
}
}

const natsPlugin: FastifyPluginAsync<NatsPluginOptions> = async (fastify, options) => {
const natsConnection = await connect({ debug: config.DEV, servers: [options.server] });
fastify.log.debug(`Connected to NATS server at: ${options.server}`);
const jetStreamManager = await natsConnection.jetstreamManager();
const jetStreamClient = natsConnection.jetstream();

let { connOpts, subjects } = options;
const consumerConfig = {
ack_policy: AckPolicy.Explicit,
deliver_subject: `deliver-${options.durableName}`,
durable_name: options.durableName,
deliver_policy: DeliverPolicy.All,
};

if (connOpts.servers?.length === 0) {
throw new Error("NATS server URL not specified.");
}
await jetStreamManager.consumers.add(options.streamName, consumerConfig);

const nc = await connect( connOpts);
fastify.log.debug(`Connected to NATS server at ${connOpts?.servers?[0]: []}.`);
const opts = consumerOpts(consumerConfig);

const handler = async (err: NatsError | null, msg: Msg) => {
if (err) {
fastify.log.error(err);
return;
opts.callback((error, msg) => {
if (error) {
fastify.log.error(`Error processing NATS message: ${error.message}`);
msg?.nak();
} else {
handleMessage(fastify, msg);
}
await processMessage(fastify.pg, fastify.graphql, msg, fastify.provider, fastify.p_redis)
}
});

for (const subject of subjects) {
fastify.log.debug(`Subscribing to subject ${subject}.`);
nc.subscribe(subject, {
callback: handler,
});
}
opts.bind(options.streamName, options.durableName)

fastify.addHook("onClose", async (_) => {
await nc.drain();
await nc.close();
})
await jetStreamClient.subscribe(`${options.streamName}.${options.subject}`, opts)

}
fastify.addHook("onClose", async (instance) => {
await natsConnection.drain();
await natsConnection.close();
})

};

export default fp(natsPlugin, {
fastify: '4.x',
name: 'nats-plugin'
})
name: 'nats-plugin',
});

0 comments on commit 355cffa

Please sign in to comment.