Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wip): initial bulk subscriber create #3938

Merged
merged 12 commits into from
Aug 15, 2023
Merged
4 changes: 3 additions & 1 deletion .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@
"stddev",
"shortid",
"upsert",
"upserted",
"cond",
"redismaster",
"appendonly",
Expand Down Expand Up @@ -497,7 +498,8 @@
"reactjs",
"nextjs",
"vanillajs",
"quckstart"
"quckstart",
"errmsg"
],
"flagWords": [],
"patterns": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
TriggerRecipientTopics,
TriggerRecipientsTypeEnum,
UserId,
TriggerRecipient,
} from '@novu/shared';
import { InstrumentUsecase, FeatureFlagCommand, GetFeatureFlag } from '@novu/application-generic';

Expand All @@ -26,10 +27,10 @@ interface ILogTopicSubscribersPayload {
userId: UserId;
}

const isNotTopic = (recipient: TriggerRecipientSubscriber): recipient is TriggerRecipientSubscriber =>
typeof recipient === 'string' || recipient?.type !== TriggerRecipientsTypeEnum.TOPIC;
const isNotTopic = (recipient: TriggerRecipient) => !isTopic(recipient);

const isTopic = (recipient: ITopic): recipient is ITopic => recipient?.type === TriggerRecipientsTypeEnum.TOPIC;
const isTopic = (recipient: TriggerRecipient): recipient is ITopic =>
(recipient as ITopic).type && (recipient as ITopic).type === TriggerRecipientsTypeEnum.TOPIC;

@Injectable()
export class MapTriggerRecipients {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { IsDefined, IsEmail, IsLocale, IsOptional, IsString } from 'class-validator';
import {
ArrayMaxSize,
ArrayNotEmpty,
IsArray,
IsDefined,
IsEmail,
IsLocale,
IsOptional,
IsString,
ValidateNested,
} from 'class-validator';
import { SubscriberCustomData } from '@novu/shared';
import { Type } from 'class-transformer';

export class CreateSubscriberRequestDto {
@ApiProperty({
Expand Down Expand Up @@ -47,3 +58,13 @@ export class CreateSubscriberRequestDto {
@IsOptional()
data?: SubscriberCustomData;
}

export class BulkSubscriberCreateDto {
@ApiProperty()
@IsArray()
@ArrayNotEmpty()
@ArrayMaxSize(500)
@ValidateNested()
@Type(() => CreateSubscriberRequestDto)
subscribers: CreateSubscriberRequestDto[];
}
143 changes: 143 additions & 0 deletions apps/api/src/app/subscribers/e2e/bulk-create-subscribers.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { expect } from 'chai';
import axios from 'axios';
import { SubscriberRepository, SubscriberEntity } from '@novu/dal';
import { UserSession, SubscribersService } from '@novu/testing';

const axiosInstance = axios.create();

describe('Bulk create subscribers - /v1/subscribers/bulk (POST)', function () {
const BULK_API_ENDPOINT = '/v1/subscribers/bulk';
let session: UserSession;
let subscriber: SubscriberEntity;
let subscriberService: SubscribersService;
const subscriberRepository = new SubscriberRepository();

beforeEach(async () => {
session = new UserSession();
await session.initialize();
subscriberService = new SubscribersService(session.organization._id, session.environment._id);
subscriber = await subscriberService.createSubscriber();
});

it('should return the response array in correct format', async function () {
const { data: body } = await axiosInstance.post(
`${session.serverUrl}${BULK_API_ENDPOINT}`,
{
subscribers: [
{
subscriberId: 'test1',
firstName: 'sub1',
email: 'sub1@test.co',
},
{
subscriberId: 'test2',
firstName: 'sub2',
email: 'sub2@test.co',
},
{ subscriberId: subscriber.subscriberId, firstName: 'update name' },
{ subscriberId: 'test2', firstName: 'update name' },
],
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);

expect(body.data).to.be.ok;
const response = body.data;
const { updated, created, failed } = response;

expect(updated?.length).to.equal(2);
expect(updated[0].subscriberId).to.equal(subscriber.subscriberId);
expect(updated[1].subscriberId).to.equal('test2');

expect(created?.length).to.equal(2);
expect(created[0].subscriberId).to.equal('test1');
expect(created[1].subscriberId).to.equal('test2');

expect(failed?.length).to.equal(0);
});

it('should create and update subscribers', async function () {
const { data: body } = await axiosInstance.post(
`${session.serverUrl}${BULK_API_ENDPOINT}`,
{
subscribers: [
{
subscriberId: 'sub1',
firstName: 'John',
lastName: 'Doe',
email: 'john@doe.com',
phone: '+972523333333',
locale: 'en',
data: { test1: 'test value1', test2: 'test value2' },
},
{
subscriberId: 'test2',
firstName: 'sub2',
email: 'sub2@test.co',
},
{
subscriberId: 'test3',
firstName: 'sub3',
email: 'sub3@test.co',
},
{ subscriberId: subscriber.subscriberId, firstName: 'update' },
{
subscriberId: 'test4',
firstName: 'sub4',
email: 'sub4@test.co',
},
],
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);
expect(body.data).to.be.ok;

const createdSubscriber = await subscriberRepository.findBySubscriberId(session.environment._id, 'sub1');
const updatedSubscriber = await subscriberRepository.findBySubscriberId(
session.environment._id,
subscriber.subscriberId
);

expect(updatedSubscriber?.firstName).to.equal('update');
expect(createdSubscriber?.firstName).to.equal('John');
expect(createdSubscriber?.email).to.equal('john@doe.com');
expect(createdSubscriber?.phone).to.equal('+972523333333');
expect(createdSubscriber?.locale).to.equal('en');
expect(createdSubscriber?.data?.test1).to.equal('test value1');
});

it('should throw an error when sending more than 500 subscribers', async function () {
const payload = {
Comment on lines +117 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am missing a test where we show how we handle if any of the subscribers fail in the creation/update and its response. Or if the user is not providing enough information to do the creation/update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a way to fake a failed update during the write executions... If the user is not providing enough information, it will be caught by the nested validator

Copy link
Contributor

@p-fernandez p-fernandez Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so assuming the nested validator is tested already to solve the other failure case we could provide proper uploads and fake the failure in the DAL service, for example, with Sinon. So in:

      bulkResponse = await this.bulkWrite(bulkWriteOps);

we would need to fake the response so for example from 3 subscribers, 2 go well and 1 fails. Therefore we can test the rest of the logic for the wrong case.
The approach would be similar to what is done in the Node.js SDK tests.

Anyway, if you need to move on other tasks let's not spend too much time on it. 👍🏻

subscriberId: 'test2',
firstName: 'sub2',
email: 'sub2@test.co',
};

try {
await axiosInstance.post(
`${session.serverUrl}${BULK_API_ENDPOINT}`,
{
subscribers: Array.from({ length: 501 }, () => payload),
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);
expect.fail();
} catch (error) {
expect(error).to.be.ok;
expect(error.response.status).to.equal(400);
expect(error.response.data.message[0]).to.equal('subscribers must contain no more than 500 elements');
}
});
});
24 changes: 24 additions & 0 deletions apps/api/src/app/subscribers/subscribers.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { JwtAuthGuard } from '../auth/framework/auth.guard';
import { ExternalApiAccessible } from '../auth/framework/external-api.decorator';
import { UserSession } from '../shared/framework/user.decorator';
import {
BulkSubscriberCreateDto,
CreateSubscriberRequestDto,
DeleteSubscriberResponseDto,
SubscriberResponseDto,
Expand Down Expand Up @@ -80,12 +81,15 @@ import {
import { MarkAllMessagesAsCommand } from '../widgets/usecases/mark-all-messages-as/mark-all-messages-as.command';
import { MarkAllMessagesAs } from '../widgets/usecases/mark-all-messages-as/mark-all-messages-as.usecase';
import { MarkAllMessageAsRequestDto } from './dtos/mark-all-messages-as-request.dto';
import { BulkCreateSubscribers } from './usecases/bulk-create-subscribers/bulk-create-subscribers.usecase';
import { BulkCreateSubscribersCommand } from './usecases/bulk-create-subscribers';

@Controller('/subscribers')
@ApiTags('Subscribers')
export class SubscribersController {
constructor(
private createSubscriberUsecase: CreateSubscriber,
private bulkCreateSubscribersUsecase: BulkCreateSubscribers,
private updateSubscriberUsecase: UpdateSubscriber,
private updateSubscriberChannelUsecase: UpdateSubscriberChannel,
private removeSubscriberUsecase: RemoveSubscriber,
Expand Down Expand Up @@ -178,6 +182,26 @@ export class SubscribersController {
);
}

@Post('/bulk')
@ExternalApiAccessible()
@UseGuards(JwtAuthGuard)
@ApiOperation({
summary: 'Bulk create subscribers',
description: `
Using this endpoint you can create multiple subscribers at once, to avoid multiple calls to the API.
The bulk API is limited to 500 subscribers per request.
`,
})
async bulkCreateSubscribers(@UserSession() user: IJwtPayload, @Body() body: BulkSubscriberCreateDto) {
return await this.bulkCreateSubscribersUsecase.execute(
BulkCreateSubscribersCommand.create({
environmentId: user.environmentId,
organizationId: user.organizationId,
subscribers: body.subscribers,
})
);
}

@Put('/:subscriberId')
@ExternalApiAccessible()
@UseGuards(JwtAuthGuard)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { IsArray, ValidateNested } from 'class-validator';
import { EnvironmentCommand } from '@novu/application-generic';
import { CreateSubscriberRequestDto } from '../../dtos';

export class BulkCreateSubscribersCommand extends EnvironmentCommand {
@IsArray()
@ValidateNested()
subscribers: CreateSubscriberRequestDto[];
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Injectable } from '@nestjs/common';
import { BulkCreateSubscribersCommand } from './bulk-create-subscribers.command';
import { SubscriberRepository } from '@novu/dal';
import { ApiException } from '../../../shared/exceptions/api.exception';

@Injectable()
export class BulkCreateSubscribers {
constructor(private subscriberRepository: SubscriberRepository) {}

async execute(command: BulkCreateSubscribersCommand) {
try {
return await this.subscriberRepository.bulkCreateSubscribers(
command.subscribers,
command.environmentId,
command.organizationId
);
} catch (e) {
throw new ApiException(e.message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { BulkCreateSubscribers } from './bulk-create-subscribers.usecase';
export { BulkCreateSubscribersCommand } from './bulk-create-subscribers.command';
2 changes: 2 additions & 0 deletions apps/api/src/app/subscribers/usecases/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { UpdateSubscriberOnlineFlag } from './update-subscriber-online-flag';
import { ChatOauth } from './chat-oauth/chat-oauth.usecase';
import { ChatOauthCallback } from './chat-oauth-callback/chat-oauth-callback.usecase';
import { DeleteSubscriberCredentials } from './delete-subscriber-credentials/delete-subscriber-credentials.usecase';
import { BulkCreateSubscribers } from './bulk-create-subscribers/bulk-create-subscribers.usecase';

export {
SearchByExternalSubscriberIds,
Expand All @@ -40,4 +41,5 @@ export const USE_CASES = [
ChatOauthCallback,
ChatOauth,
DeleteSubscriberCredentials,
BulkCreateSubscribers,
];
4 changes: 2 additions & 2 deletions libs/dal/src/repositories/base-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ export class BaseRepository<T_DBModel, T_MappedEntity, T_Enforcement = object> {
return await Promise.all(promises);
}

async bulkWrite(bulkOperations: any) {
await this.MongooseModel.bulkWrite(bulkOperations);
async bulkWrite(bulkOperations: any, ordered = false): Promise<any> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mongoose doesn't export the Bulk types. Not sure how we can still type this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check if it is a version thing and we need to upgrade the Mongoose version. Found some comments mentioning that with the same problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that they don't fix that also in the latest :( super weird...

return await this.MongooseModel.bulkWrite(bulkOperations, { ordered });
}

protected mapEntity<TData>(data: TData): TData extends null ? null : T_MappedEntity {
Expand Down
Loading