Skip to content

Commit

Permalink
feat: add a WebSocket-based event gateway to the backend + create ent…
Browse files Browse the repository at this point in the history
…ities for Demand and Device
  • Loading branch information
josipbagaric committed Feb 3, 2020
1 parent 19841b7 commit af703ce
Show file tree
Hide file tree
Showing 19 changed files with 686 additions and 25 deletions.
2 changes: 1 addition & 1 deletion packages/origin-backend/.env.test
Original file line number Diff line number Diff line change
@@ -1 +1 @@
PORT=3030
BACKEND_PORT=3030
7 changes: 6 additions & 1 deletion packages/origin-backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
"@nestjs/jwt": "6.1.1",
"@nestjs/passport": "6.1.1",
"@nestjs/platform-express": "^6.10.14",
"@nestjs/platform-ws": "6.10.4",
"@nestjs/typeorm": "^6.2.0",
"@nestjs/websockets": "6.10.4",
"bcryptjs": "2.4.3",
"body-parser": "1.19.0",
"class-transformer": "0.2.3",
Expand All @@ -57,7 +59,8 @@
"reflect-metadata": "0.1.13",
"rxjs": "6.5.4",
"sqlite3": "4.1.1",
"typeorm": "0.2.22"
"typeorm": "0.2.22",
"ws": "7.2.1"
},
"devDependencies": {
"@nestjs/cli": "6.14.1",
Expand All @@ -74,6 +77,8 @@
"@types/passport-jwt": "3.0.3",
"@types/passport-local": "1.0.33",
"@types/supertest": "2.0.8",
"@types/websocket": "1.0.0",
"@types/ws": "^7.2.0",
"axios": "0.19.2",
"jest": "25.1.0",
"supertest": "4.0.2",
Expand Down
14 changes: 13 additions & 1 deletion packages/origin-backend/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import { Currency } from './pods/currency/currency.entity';
import { Compliance } from './pods/compliance/compliance.entity';
import { Organization } from './pods/organization/organization.entity';
import { User } from './pods/user/user.entity';
import { Device } from './pods/device/device.entity';
import { Demand } from './pods/demand/demand.entity';

import { UserModule } from './pods/user/user.module';
import { ComplianceModule } from './pods/compliance/compliance.module';
import createConfig from './config/configuration';
Expand All @@ -21,7 +24,11 @@ import { ImageModule } from './pods/image/image.module';
import { JsonEntityModule } from './pods/json-entity/json-entity.module';
import { ContractsStorageModule } from './pods/contracts-storage/contracts-storage.module';
import { OrganizationModule } from './pods/organization/organization.module';
import { DeviceModule } from './pods/device/device.module';
import { DemandModule } from './pods/demand/demand.module';
import { AuthModule } from './auth/auth.module';
import { EventsModule } from './events/events.module';

import { AppController } from './app.controller';
import { OrganizationInvitation } from './pods/organization/organizationInvitation.entity';

Expand All @@ -43,6 +50,8 @@ const ENV_FILE_PATH = path.resolve(__dirname, '../../../../../.env');
Currency,
Compliance,
Country,
Device,
Demand,
Organization,
User,
OrganizationInvitation
Expand All @@ -58,7 +67,10 @@ const ENV_FILE_PATH = path.resolve(__dirname, '../../../../../.env');
JsonEntityModule,
ContractsStorageModule,
OrganizationModule,
AuthModule
DeviceModule,
DemandModule,
AuthModule,
EventsModule
],
controllers: [AppController],
providers: []
Expand Down
89 changes: 89 additions & 0 deletions packages/origin-backend/src/events/events.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import {
SubscribeMessage,
WebSocketGateway,
OnGatewayConnection,
OnGatewayDisconnect,
OnGatewayInit,
MessageBody
} from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { getEventsServerPort } from '../port'

import moment from 'moment';
import { SupportedEvents, IEvent, NewEvent } from '@energyweb/origin-backend-core';

const PORT = getEventsServerPort();

@WebSocketGateway(PORT, { transports: ['websocket'] })
export class EventsWebSocketGateway implements OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit {

private logger: Logger = new Logger('EventsWebSocketGateway');
private allEvents: IEvent[] = [];

wsClients: any[] = [];

afterInit() {
this.logger.log(`Initialized the WebSockets server on port: ${PORT}.`);
}

handleConnection(client: any) {
this.wsClients.push(client);;

this.logger.log(`Client connected. Total clients connected: ${this.wsClients.length}`);
}

handleDisconnect(client: any) {
for (let i = 0; i < this.wsClients.length; i++) {
if (this.wsClients[i] === client) {
this.wsClients.splice(i, 1);
this.logger.log(`Client disconnected`);
break;
}
}
}

private broadcastEvent(event: IEvent) {
this.logger.log(`Broadcasting a new "${event.type}" event.`);

const content = JSON.stringify(event);

for (let client of this.wsClients) {
client.send(content);
}
}

@SubscribeMessage('getAllEvents')
getAllEvents(client: any) {
this.logger.log('Client requested getting all events.');

client.send(JSON.stringify(this.allEvents));
}

@SubscribeMessage('events')
handleEvent(@MessageBody() incomingEvent: NewEvent) {
this.logger.log(`Incoming event: ${JSON.stringify(incomingEvent)}`);

const { type, data } = incomingEvent;

if (!type || !data) {
return 'Incorrect event structure';
}

const supportedEvents = Object.values(SupportedEvents);

if (!supportedEvents.includes(type)) {
return `Unsupported event name. Please use one of the following: ${supportedEvents.join(', ')}`;
}

const event: IEvent = {
...incomingEvent,
timestamp: moment().unix()
};

this.allEvents.push(event);

this.broadcastEvent(event);

return `Saved ${type} event.`;
}
}
8 changes: 8 additions & 0 deletions packages/origin-backend/src/events/events.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { Module } from '@nestjs/common';
import { EventsWebSocketGateway } from './events.gateway';

@Module({
providers: [EventsWebSocketGateway],
exports: [EventsWebSocketGateway]
})
export class EventsModule {}
21 changes: 6 additions & 15 deletions packages/origin-backend/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
import { NestFactory } from '@nestjs/core';
import { LoggerService } from '@nestjs/common';
import { WsAdapter } from '@nestjs/platform-ws';

import { AppModule } from './app.module';

function extractPort(url: string): number {
if (url) {
const backendUrlSplit: string[] = url.split(':');
const extractedPort: number = parseInt(backendUrlSplit[backendUrlSplit.length - 1], 10);

return extractedPort;
}

return null;
}
import { getPort } from './port'

export async function startAPI(logger?: LoggerService) {
const PORT: number =
parseInt(process.env.PORT, 10) || extractPort(process.env.BACKEND_URL) || 3030;

const PORT = getPort();

console.log(`Backend starting on port: ${PORT}`);

const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));
app.enableCors();
app.setGlobalPrefix('api');

Expand All @@ -31,4 +22,4 @@ export async function startAPI(logger?: LoggerService) {
await app.listen(PORT);

return app;
}
}
182 changes: 182 additions & 0 deletions packages/origin-backend/src/pods/demand/demand.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import { Repository } from 'typeorm';
import { validate } from 'class-validator';
import { IDemand, DemandStatus, DemandPostData, DemandUpdateData, SupportedEvents, CreatedNewDemand, DemandPartiallyFilledEvent } from '@energyweb/origin-backend-core';

import {
Controller,
Get,
Param,
NotFoundException,
Post,
Body,
UnprocessableEntityException,
Delete,
Put,
Inject
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { Demand } from './demand.entity';
import { StorageErrors } from '../../enums/StorageErrors';
import { EventsWebSocketGateway } from '../../events/events.gateway';

@Controller('/Demand')
export class DemandController {
constructor(
@InjectRepository(Demand) private readonly demandRepository: Repository<Demand>,
@Inject(EventsWebSocketGateway) private readonly eventGateway: EventsWebSocketGateway
) {}

@Get()
async getAll() {
console.log(`<GET> Demand all`);

const allDemands = await this.demandRepository.find();

for (let demand of allDemands) {
demand.demandPartiallyFilledEvents = demand.demandPartiallyFilledEvents.map(
event => JSON.parse(event)
);
}

return allDemands;
}

@Get('/:id')
async get(@Param('id') id: string) {
const existing = await this.demandRepository.findOne(id, {
loadRelationIds: true
});

if (!existing) {
throw new NotFoundException(StorageErrors.NON_EXISTENT);
}

existing.demandPartiallyFilledEvents = existing.demandPartiallyFilledEvents.map(
event => JSON.parse(event)
);

return existing;
}

@Post()
async post(@Body() body: DemandPostData) {
let newEntity = new Demand();

const data: Omit<IDemand, 'id'> = {
...body,
status: DemandStatus.ACTIVE,
demandPartiallyFilledEvents: [],
location: body.location ?? [],
deviceType: body.deviceType ?? [],
otherGreenAttributes: body.otherGreenAttributes ?? '',
typeOfPublicSupport: body.typeOfPublicSupport ?? '',
registryCompliance: body.registryCompliance ?? '',
procureFromSingleFacility: body.procureFromSingleFacility ?? false,
vintage: body.vintage ?? [1900, 2100]
};

Object.assign(newEntity, data);

const validationErrors = await validate(newEntity);

if (validationErrors.length > 0) {
throw new UnprocessableEntityException({
success: false,
errors: validationErrors
});
}

newEntity = await this.demandRepository.save(newEntity);

const eventData: CreatedNewDemand = {
demandId: newEntity.id
};

this.eventGateway.handleEvent({
type: SupportedEvents.CREATE_NEW_DEMAND,
data: eventData
});

return newEntity;
}

@Delete('/:id')
async delete(@Param('id') id: string) {
const existing = await this.demandRepository.findOne(id);

if (!existing) {
throw new NotFoundException(StorageErrors.NON_EXISTENT);
}

existing.status = DemandStatus.ARCHIVED;

try {
await existing.save();

return {
message: `Demand ${id} successfully archived`
};
} catch (error) {
throw new UnprocessableEntityException({
message: `Demand ${id} could not be archived due to an unknown error`
});
}
}

@Put('/:id')
async put(@Param('id') id: string, @Body() body: DemandUpdateData) {
const existing = await this.demandRepository.findOne(id);

if (!existing) {
throw new NotFoundException(StorageErrors.NON_EXISTENT);
}

existing.status = body.status ?? existing.status;

if (body.demandPartiallyFilledEvent) {
existing.demandPartiallyFilledEvents.push(
JSON.stringify(body.demandPartiallyFilledEvent)
);
}

const hasNewFillEvent = body.demandPartiallyFilledEvent !== null;

if (hasNewFillEvent) {
existing.demandPartiallyFilledEvents.push(
JSON.stringify(body.demandPartiallyFilledEvent)
);
}

try {
await existing.save();
} catch (error) {
throw new UnprocessableEntityException({
message: `Demand ${id} could not be updated due to an unkown error`
});
}

this.eventGateway.handleEvent({
type: SupportedEvents.DEMAND_UPDATED,
data: { demandId: existing.id }
});

if (hasNewFillEvent) {
const eventData: DemandPartiallyFilledEvent = {
demandId: existing.id,
certificateId: body.demandPartiallyFilledEvent.certificateId,
energy: body.demandPartiallyFilledEvent.energy,
blockNumber: body.demandPartiallyFilledEvent.blockNumber
};

this.eventGateway.handleEvent({
type: SupportedEvents.DEMAND_PARTIALLY_FILLED,
data: eventData
});
}

return {
message: `Demand ${id} successfully updated`
};
}
}
Loading

0 comments on commit af703ce

Please sign in to comment.