Skip to content

Commit

Permalink
Merge pull request #38 from J-Hoplin/feat/queue-strategy
Browse files Browse the repository at this point in the history
Feat/queue strategy
  • Loading branch information
J-Hoplin authored Feb 17, 2024
2 parents 4b7b0a9 + e4d99d8 commit d702d39
Show file tree
Hide file tree
Showing 40 changed files with 583 additions and 151 deletions.
33 changes: 17 additions & 16 deletions .docker.env
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
# Environment variables declared in this file are automatically made available to Prisma.
# See the documentation for more detail: https://pris.ly/d/prisma-schema#accessing-environment-variables-from-the-schema
# .env file for docker webserver
# Application

# Prisma supports the native connection string format for PostgreSQL, MySQL, SQLite, SQL Server, MongoDB and CockroachDB.
# See the documentation for all the connection string options: https://pris.ly/d/connection-strings
TYPE="webserver" # 'webserver' or 'worker'

# Base
DATABASE_URL="mysql://root:hoplin1234!@db:3306/judge?schema=public"

ADMIN_EMAIL="hoplin.dev@gmail.com"
ADMIN_PW = "admin"

JWT_SECRET="SECRET"
JUDGE_SERVER_ENDPOINT="a"
ENV="a" # dev or production
PORT="3000"

JUDGE_SERVER_ENDPOINT=""

ENV=""
PORT=""
# Queue
QUEUE_TYPE="RMQ" # AWS or RMQ
RMQ_URL="amqp://root:password@rmq:5672"
RMQ_WORKER_QUEUE_NAME="JUDGE_QUEUE"

# AWS
AWS_REGION=""
AWS_ACCESS_ID=""
AWS_ACCESS_SECRET=""
AWS_SQS_QUEUE=""
AWS_S3_BUCKET=""
AWS_REGION="a"
AWS_ACCESS_ID="a"
AWS_ACCESS_SECRET="a"
AWS_SQS_QUEUE="a"
AWS_S3_BUCKET="a"

# Sentry
SENTRY_DSN=""
SENTRY_DSN="a"
28 changes: 28 additions & 0 deletions .docker.worker.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# .env file for docker worker
# Application

TYPE="worker" # 'webserver' or 'worker'

# Base
DATABASE_URL="mysql://root:hoplin1234!@db:3306/judge?schema=public"
ADMIN_EMAIL="hoplin.dev@gmail.com"
ADMIN_PW = "admin"
JWT_SECRET="SECRET"
JUDGE_SERVER_ENDPOINT="a"
ENV="a" # dev or production
PORT="3000"

# Queue
QUEUE_TYPE="RMQ" # AWS or RMQ
RMQ_URL="amqp://root:password@rmq:5672"
RMQ_WORKER_QUEUE_NAME="JUDGE_QUEUE"

# AWS
AWS_REGION="a"
AWS_ACCESS_ID="a"
AWS_ACCESS_SECRET="a"
AWS_SQS_QUEUE="a"
AWS_S3_BUCKET="a"

# Sentry
SENTRY_DSN="a"
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
node_modules/
node_modules/
.env
30 changes: 18 additions & 12 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,30 @@
# Prisma supports the native connection string format for PostgreSQL, MySQL, SQLite, SQL Server, MongoDB and CockroachDB.
# See the documentation for all the connection string options: https://pris.ly/d/connection-strings

DATABASE_URL="mysql://root:hoplin1234!@localhost:3306/judge?schema=public"
# Application

TYPE="webserver" # 'webserver' or 'worker'

# Base
DATABASE_URL="mysql://root:hoplin1234!@localhost:3306/judge?schema=public"
ADMIN_EMAIL="hoplin.dev@gmail.com"
ADMIN_PW = "admin"

JWT_SECRET="SECRET"
JUDGE_SERVER_ENDPOINT="a"
ENV="a" # dev or production
PORT="3000"

JUDGE_SERVER_ENDPOINT=""

ENV=""
PORT=""
# Queue
QUEUE_TYPE="RMQ" # SQS or RMQ. Change if you use SQS for application
RMQ_URL="amqp://root:password@localhost:5672"
RMQ_WORKER_QUEUE_NAME="JUDGE_QUEUE"

# AWS
AWS_REGION=""
AWS_ACCESS_ID=""
AWS_ACCESS_SECRET=""
AWS_SQS_QUEUE=""
AWS_S3_BUCKET=""
AWS_REGION="a"
AWS_ACCESS_ID="a"
AWS_ACCESS_SECRET="a"
AWS_SQS_QUEUE="a"
AWS_S3_BUCKET="a"

# Sentry
SENTRY_DSN=""
SENTRY_DSN="a"
26 changes: 25 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ services:
api:
image: online-judge
ports:
- '3000:3000'
- '3001:3000'
restart: 'unless-stopped'
env_file:
- ./.docker.env
depends_on:
- db
- rmq
- worker
networks:
- system
redis:
Expand All @@ -28,6 +30,28 @@ services:
- '6379:6379'
networks:
- system
rmq:
image: rabbitmq:3-management
ports:
- '1883:1883'
- '15672:15672'
- '5672:5672'
restart: 'unless-stopped'
networks:
- system
environment:
- RABBITMQ_DEFAULT_USER=root
- RABBITMQ_DEFAULT_PASS=password
worker:
image: online-judge
env_file:
- ./.docker.worker.env
restart: 'unless-stopped'
depends_on:
- db
- rmq
networks:
- system
networks:
system:
driver: bridge
8 changes: 0 additions & 8 deletions libs/aws-sqs/src/aws-sqs.module.ts

This file was deleted.

1 change: 0 additions & 1 deletion libs/aws-sqs/src/dto/index.ts

This file was deleted.

2 changes: 0 additions & 2 deletions libs/aws-sqs/src/index.ts

This file was deleted.

32 changes: 32 additions & 0 deletions libs/queue/src/decorator/rmq.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import * as amqp from 'amqplib';

/**
*
* Method decorator of Rabbit MQ Strategy
*
*/
export function RabbitMQConenction(): MethodDecorator {
return function (
target: any,
propertyKey: string,
descriptor: PropertyDescriptor,
) {
// Preserve original function
const fn: (...args: any[]) => any = descriptor.value;
descriptor.value = async function (...args) {
// Connect to RMQ
const connection = await amqp.connect(process.env.RMQ_URL);
// Assert Channel
const channel = await connection.createChannel();
// Assert Queue
await channel.assertQueue(process.env.RMQ_WORKER_QUEUE_NAME, {
durable: true,
});
const task: string = await fn.apply(this, args);
channel.sendToQueue(process.env.RMQ_WORKER_QUEUE_NAME, Buffer.from(task));
setTimeout(() => {
connection.close();
}, 500);
};
};
}
1 change: 1 addition & 0 deletions libs/queue/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './queue.module';
38 changes: 38 additions & 0 deletions libs/queue/src/queue.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Module, Provider } from '@nestjs/common';
import { QueueService } from './strategy/queue-strategy.abstract.service';
import { RabbitMQService } from './strategy/rmq.service';
import { AwsSqsQueueService } from './strategy';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { RMQ_TOKEN } from './type';

@Module({
imports:
process.env.QUEUE_TYPE === 'RMQ'
? [
ClientsModule.register([
{
name: RMQ_TOKEN,
transport: Transport.RMQ,
options: {
urls: [process.env.RMQ_URL],
queue: process.env.RMQ_WORKER_QUEUE_NAME,
queueOptions: {
durable: true,
},
},
},
]),
]
: [],
providers: [
AwsSqsQueueService,
RabbitMQService,
{
useClass:
process.env.QUEUE_TYPE === 'RMQ' ? RabbitMQService : AwsSqsQueueService,
provide: QueueService,
},
],
exports: [QueueService, AwsSqsQueueService, RabbitMQService],
})
export class QueueModule {}
18 changes: 18 additions & 0 deletions libs/queue/src/queue.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Test, TestingModule } from '@nestjs/testing';
import { QueueService } from './queue.service';

describe('QueueService', () => {
let service: QueueService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [QueueService],
}).compile();

service = module.get<QueueService>(QueueService);
});

it('should be defined', () => {
expect(service).toBeDefined();
});
});
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { Injectable } from '@nestjs/common';
import { SQSTask } from './type';
import { QueueTask } from './type';
import { QueueService } from './queue-strategy.abstract.service';
/**
* Queue Strategy
*
* AWS Environment
*/

@Injectable()
export class AwsSqsService {
export class AwsSqsQueueService extends QueueService {
private sqsClient: SQSClient;
private sqsQueue: string;

constructor() {
// Super class constructor
super();

// SQS Client
this.sqsClient = new SQSClient({
region: process.env.AWS_REGION,
Expand All @@ -21,7 +30,7 @@ export class AwsSqsService {
this.sqsQueue = process.env.AWS_SQS_QUEUE;
}

async sendTask(task: SQSTask) {
async sendTask(task: QueueTask) {
// Do send task if it's dev or production
if (process.env.ENV === 'dev' || process.env.ENV === 'production') {
const command = new SendMessageCommand({
Expand Down
2 changes: 2 additions & 0 deletions libs/queue/src/strategy/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './aws-sqs.service';
export * from './queue-strategy.abstract.service';
5 changes: 5 additions & 0 deletions libs/queue/src/strategy/queue-strategy.abstract.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { QueueTask } from './type';

export abstract class QueueService {
abstract sendTask(task: QueueTask): Promise<void>;
}
24 changes: 24 additions & 0 deletions libs/queue/src/strategy/rmq.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { RMQ_TOKEN } from '../type';
import { QueueService } from './queue-strategy.abstract.service';
import { QueueTask } from './type';

@Injectable()
export class RabbitMQService extends QueueService {
// Basic name of workerQueueName

constructor(@Inject(RMQ_TOKEN) private client: ClientProxy) {
super();
// If Rabbit MQ URL not found
if (!process.env.RMQ_URL) {
throw new Error('Rabbit MQ URL not found');
}
}

async sendTask(task: QueueTask) {
// Send returns cold Observable. Requires to explicit subscribe before sent
// https://docs.nestjs.com/microservices/basics#sending-messages
this.client.send(task.message, task).subscribe();
}
}
8 changes: 4 additions & 4 deletions libs/aws-sqs/src/type.ts → libs/queue/src/strategy/type.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
export enum SQSMessageTypeEnum {
RE_CORRECTION,
CODE_SUBMIT,
RE_CORRECTION = 'RE_CORRECTION',
CODE_SUBMIT = 'CODE_SUBMIT',
}

export type SQSMessageType = keyof typeof SQSMessageTypeEnum;

export type SQSTask = {
export type QueueTask = {
message: SQSMessageType;
id: string | number;
id: number;
};
1 change: 1 addition & 0 deletions libs/queue/src/type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const RMQ_TOKEN = 'RMQ_CLIENT';
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"extends": "../../tsconfig.json",
"compilerOptions": {
"declaration": true,
"outDir": "../../dist/libs/aws-sqs"
"outDir": "../../dist/libs/queue"
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "test", "**/*spec.ts"]
Expand Down
9 changes: 9 additions & 0 deletions nest-cli.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@
"compilerOptions": {
"tsConfigPath": "libs/aws-sqs/tsconfig.lib.json"
}
},
"queue": {
"type": "library",
"root": "libs/queue",
"entryFile": "index",
"sourceRoot": "libs/queue/src",
"compilerOptions": {
"tsConfigPath": "libs/queue/tsconfig.lib.json"
}
}
}
}
Loading

0 comments on commit d702d39

Please sign in to comment.