Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wip): initial bulk subscriber create #3938

Merged
merged 12 commits into from
Aug 15, 2023
Merged
4 changes: 3 additions & 1 deletion .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@
"stddev",
"shortid",
"upsert",
"upserted",
"cond",
"redismaster",
"appendonly",
Expand Down Expand Up @@ -497,7 +498,8 @@
"reactjs",
"nextjs",
"vanillajs",
"quckstart"
"quckstart",
"errmsg"
],
"flagWords": [],
"patterns": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ interface ILogTopicSubscribersPayload {
}

const isNotTopic = (recipient: TriggerRecipientSubscriber): recipient is TriggerRecipientSubscriber =>
typeof recipient === 'string' || recipient?.type !== TriggerRecipientsTypeEnum.TOPIC;
typeof recipient === 'string' || !recipient.hasOwnProperty('type');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind changing this? We might not be using the type for a different situation of a Topic, but we wanted to leave that way open for future solutions. I can totally see us allowing to trigger an event to all the subscribers of a TenantId (becoming TriggerRecipientsTypeEnum.TENANT) as related to the work we are doing on that side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@p-fernandez Because I removed the unused 'record' fields from subscriber type (it was never used or saved anywhere), so in the context of this function - which subscriber is of type TriggerRecipientSubscriber - type script no longer accepts that there could be type property for subscriber. It wasn't exactly right even before, as it accepted it by 'mistake'

We could refactor the function to be of different type/ different check when we need to and get to a specific case. Wdyt? makes sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't totally understand the reason. isNotTopic is being used here:

private findSubscribers(recipients: TriggerRecipients): ISubscribersDefine[] {

If we do this change if we introduce new types of recipients as I mentioned or if someone is using this construction for subscribers (as it is available here:
SUBSCRIBER = 'Subscriber',
) it will fail the mapping of the users.
If I am still missing the point you are making let's review this in private to not clutter this PR. 👍🏻


const isTopic = (recipient: ITopic): recipient is ITopic => recipient?.type === TriggerRecipientsTypeEnum.TOPIC;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { IsDefined, IsEmail, IsLocale, IsOptional, IsString } from 'class-validator';
import {
ArrayMaxSize,
ArrayNotEmpty,
IsArray,
IsDefined,
IsEmail,
IsLocale,
IsOptional,
IsString,
ValidateNested,
} from 'class-validator';
import { SubscriberCustomData } from '@novu/shared';
import { Type } from 'class-transformer';

export class CreateSubscriberRequestDto {
@ApiProperty({
Expand Down Expand Up @@ -47,3 +58,13 @@ export class CreateSubscriberRequestDto {
@IsOptional()
data?: SubscriberCustomData;
}

export class BulkSubscriberCreateDto {
@ApiProperty()
@IsArray()
@ArrayNotEmpty()
@ArrayMaxSize(500)
@ValidateNested()
@Type(() => CreateSubscriberRequestDto)
subscribers: CreateSubscriberRequestDto[];
}
143 changes: 143 additions & 0 deletions apps/api/src/app/subscribers/e2e/bulk-create-subscribers.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { expect } from 'chai';
import axios from 'axios';
import { SubscriberRepository, SubscriberEntity } from '@novu/dal';
import { UserSession, SubscribersService } from '@novu/testing';

const axiosInstance = axios.create();

describe('Bulk create subscribers - /v1/subscribers/bulk (POST)', function () {
const BULK_API_ENDPOINT = '/v1/subscribers/bulk';
let session: UserSession;
let subscriber: SubscriberEntity;
let subscriberService: SubscribersService;
const subscriberRepository = new SubscriberRepository();

beforeEach(async () => {
session = new UserSession();
await session.initialize();
subscriberService = new SubscribersService(session.organization._id, session.environment._id);
subscriber = await subscriberService.createSubscriber();
});

it('should return the response array in correct format', async function () {
const { data: body } = await axiosInstance.post(
`${session.serverUrl}${BULK_API_ENDPOINT}`,
{
subscribers: [
{
subscriberId: 'test1',
firstName: 'sub1',
email: 'sub1@test.co',
},
{
subscriberId: 'test2',
firstName: 'sub2',
email: 'sub2@test.co',
},
{ subscriberId: subscriber.subscriberId, firstName: 'update name' },
{ subscriberId: 'test2', firstName: 'update name' },
],
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);

expect(body.data).to.be.ok;
const response = body.data;
const { updated, created, failed } = response;

expect(updated?.length).to.equal(2);
expect(updated[0].subscriberId).to.equal(subscriber.subscriberId);
expect(updated[1].subscriberId).to.equal('test2');

expect(created?.length).to.equal(2);
expect(created[0].subscriberId).to.equal('test1');
expect(created[1].subscriberId).to.equal('test2');

expect(failed?.length).to.equal(0);
});

it('should create and update subscribers', async function () {
const { data: body } = await axiosInstance.post(
`${session.serverUrl}${BULK_API_ENDPOINT}`,
{
subscribers: [
{
subscriberId: 'sub1',
firstName: 'John',
lastName: 'Doe',
email: 'john@doe.com',
phone: '+972523333333',
locale: 'en',
data: { test1: 'test value1', test2: 'test value2' },
},
{
subscriberId: 'test2',
firstName: 'sub2',
email: 'sub2@test.co',
},
{
subscriberId: 'test3',
firstName: 'sub3',
email: 'sub3@test.co',
},
{ subscriberId: subscriber.subscriberId, firstName: 'update' },
{
subscriberId: 'test4',
firstName: 'sub4',
email: 'sub4@test.co',
},
],
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);
expect(body.data).to.be.ok;

const createdSubscriber = await subscriberRepository.findBySubscriberId(session.environment._id, 'sub1');
const updatedSubscriber = await subscriberRepository.findBySubscriberId(
session.environment._id,
subscriber.subscriberId
);

expect(updatedSubscriber?.firstName).to.equal('update');
expect(createdSubscriber?.firstName).to.equal('John');
expect(createdSubscriber?.email).to.equal('john@doe.com');
expect(createdSubscriber?.phone).to.equal('+972523333333');
expect(createdSubscriber?.locale).to.equal('en');
expect(createdSubscriber?.data?.test1).to.equal('test value1');
});

it('should throw an error when sending more than 500 subscribers', async function () {
const payload = {
Comment on lines +117 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am missing a test where we show how we handle if any of the subscribers fail in the creation/update and its response. Or if the user is not providing enough information to do the creation/update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a way to fake a failed update during the write executions... If the user is not providing enough information, it will be caught by the nested validator

Copy link
Contributor

@p-fernandez p-fernandez Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so assuming the nested validator is tested already to solve the other failure case we could provide proper uploads and fake the failure in the DAL service, for example, with Sinon. So in:

      bulkResponse = await this.bulkWrite(bulkWriteOps);

we would need to fake the response so for example from 3 subscribers, 2 go well and 1 fails. Therefore we can test the rest of the logic for the wrong case.
The approach would be similar to what is done in the Node.js SDK tests.

Anyway, if you need to move on other tasks let's not spend too much time on it. 👍🏻

subscriberId: 'test2',
firstName: 'sub2',
email: 'sub2@test.co',
};

try {
await axiosInstance.post(
`${session.serverUrl}${BULK_API_ENDPOINT}`,
{
subscribers: Array.from({ length: 501 }, () => payload),
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);
expect.fail();
} catch (error) {
expect(error).to.be.ok;
expect(error.response.status).to.equal(400);
expect(error.response.data.message[0]).to.equal('subscribers must contain no more than 500 elements');
}
});
});
24 changes: 24 additions & 0 deletions apps/api/src/app/subscribers/subscribers.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { JwtAuthGuard } from '../auth/framework/auth.guard';
import { ExternalApiAccessible } from '../auth/framework/external-api.decorator';
import { UserSession } from '../shared/framework/user.decorator';
import {
BulkSubscriberCreateDto,
CreateSubscriberRequestDto,
DeleteSubscriberResponseDto,
SubscriberResponseDto,
Expand Down Expand Up @@ -80,12 +81,15 @@ import {
import { MarkAllMessagesAsCommand } from '../widgets/usecases/mark-all-messages-as/mark-all-messages-as.command';
import { MarkAllMessagesAs } from '../widgets/usecases/mark-all-messages-as/mark-all-messages-as.usecase';
import { MarkAllMessageAsRequestDto } from './dtos/mark-all-messages-as-request.dto';
import { BulkCreateSubscribers } from './usecases/bulk-create-subscribers/bulk-create-subscribers.usecase';
import { BulkCreateSubscribersCommand } from './usecases/bulk-create-subscribers';

@Controller('/subscribers')
@ApiTags('Subscribers')
export class SubscribersController {
constructor(
private createSubscriberUsecase: CreateSubscriber,
private bulkCreateSubscribersUsecase: BulkCreateSubscribers,
private updateSubscriberUsecase: UpdateSubscriber,
private updateSubscriberChannelUsecase: UpdateSubscriberChannel,
private removeSubscriberUsecase: RemoveSubscriber,
Expand Down Expand Up @@ -178,6 +182,26 @@ export class SubscribersController {
);
}

@Post('/bulk')
@ExternalApiAccessible()
@UseGuards(JwtAuthGuard)
@ApiOperation({
summary: 'Bulk create subscribers',
description: `
Using this endpoint you can create multiple subscribers at once, to avoid multiple calls to the API.
The bulk API is limited to 500 subscribers per request.
`,
})
async bulkCreateSubscribers(@UserSession() user: IJwtPayload, @Body() body: BulkSubscriberCreateDto) {
return await this.bulkCreateSubscribersUsecase.execute(
BulkCreateSubscribersCommand.create({
environmentId: user.environmentId,
organizationId: user.organizationId,
subscribers: body.subscribers,
})
);
}

@Put('/:subscriberId')
@ExternalApiAccessible()
@UseGuards(JwtAuthGuard)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { IsArray, ValidateNested } from 'class-validator';
import { EnvironmentCommand } from '@novu/application-generic';
import { CreateSubscriberRequestDto } from '../../dtos';

export class BulkCreateSubscribersCommand extends EnvironmentCommand {
@IsArray()
@ValidateNested()
subscribers: CreateSubscriberRequestDto[];
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Injectable } from '@nestjs/common';
import { BulkCreateSubscribersCommand } from './bulk-create-subscribers.command';
import { SubscriberRepository } from '@novu/dal';
import { ApiException } from '../../../shared/exceptions/api.exception';

@Injectable()
export class BulkCreateSubscribers {
constructor(private subscriberRepository: SubscriberRepository) {}

async execute(command: BulkCreateSubscribersCommand) {
try {
return await this.subscriberRepository.bulkCreateSubscribers(
command.subscribers,
command.environmentId,
command.organizationId
);
} catch (e) {
throw new ApiException(e.message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { BulkCreateSubscribers } from './bulk-create-subscribers.usecase';
export { BulkCreateSubscribersCommand } from './bulk-create-subscribers.command';
2 changes: 2 additions & 0 deletions apps/api/src/app/subscribers/usecases/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { UpdateSubscriberOnlineFlag } from './update-subscriber-online-flag';
import { ChatOauth } from './chat-oauth/chat-oauth.usecase';
import { ChatOauthCallback } from './chat-oauth-callback/chat-oauth-callback.usecase';
import { DeleteSubscriberCredentials } from './delete-subscriber-credentials/delete-subscriber-credentials.usecase';
import { BulkCreateSubscribers } from './bulk-create-subscribers/bulk-create-subscribers.usecase';

export {
SearchByExternalSubscriberIds,
Expand All @@ -40,4 +41,5 @@ export const USE_CASES = [
ChatOauthCallback,
ChatOauth,
DeleteSubscriberCredentials,
BulkCreateSubscribers,
];
4 changes: 2 additions & 2 deletions libs/dal/src/repositories/base-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ export class BaseRepository<T_DBModel, T_MappedEntity, T_Enforcement = object> {
return await Promise.all(promises);
}

async bulkWrite(bulkOperations: any) {
await this.MongooseModel.bulkWrite(bulkOperations);
async bulkWrite(bulkOperations: any, ordered = false): Promise<any> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mongoose doesn't export the Bulk types. Not sure how we can still type this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check if it is a version thing and we need to upgrade the Mongoose version. Found some comments mentioning that with the same problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that they don't fix that also in the latest :( super weird...

return await this.MongooseModel.bulkWrite(bulkOperations, { ordered });
}

protected mapEntity<TData>(data: TData): TData extends null ? null : T_MappedEntity {
Expand Down
Loading