-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(wip): initial bulk subscriber create #3938
feat(wip): initial bulk subscriber create #3938
Conversation
NV-2191 📡 Bulk subscriber creation over the API
Why?For larger organizations allow creating subscribers in bulk Definition of Done
|
@@ -163,8 +163,8 @@ export class BaseRepository<T_DBModel, T_MappedEntity, T_Enforcement = object> { | |||
return await Promise.all(promises); | |||
} | |||
|
|||
async bulkWrite(bulkOperations: any) { | |||
await this.MongooseModel.bulkWrite(bulkOperations); | |||
async bulkWrite(bulkOperations: any, ordered = false): Promise<any> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mongoose doesn't export the Bulk types. Not sure how we can still type this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's check if it is a version thing and we need to upgrade the Mongoose version. Found some comments mentioning that with the same problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that they don't fix that also in the latest :( super weird...
if (!e.writeErrors) { | ||
throw new DalException(e.message); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the error is not from write errors related to the bulk execution, throw the error.
Otherwise, we want to parse that into the response and not throw error
…er-creation-over-the-api
it('should throw an error when sending more than 500 subscribers', async function () { | ||
const payload = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am missing a test where we show how we handle if any of the subscribers fail in the creation/update and its response. Or if the user is not providing enough information to do the creation/update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of a way to fake a failed update during the write executions... If the user is not providing enough information, it will be caught by the nested validator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so assuming the nested validator is tested already to solve the other failure case we could provide proper uploads and fake the failure in the DAL service, for example, with Sinon. So in:
bulkResponse = await this.bulkWrite(bulkWriteOps);
we would need to fake the response so for example from 3 subscribers, 2 go well and 1 fails. Therefore we can test the rest of the logic for the wrong case.
The approach would be similar to what is done in the Node.js SDK tests.
Anyway, if you need to move on other tasks let's not spend too much time on it. 👍🏻
apps/api/src/app/subscribers/e2e/bulk-create-subscribers.e2e.ts
Outdated
Show resolved
Hide resolved
@Post('/bulk') | ||
@ExternalApiAccessible() | ||
@UseGuards(JwtAuthGuard) | ||
@ApiNoContentResponse() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is an endpoint that would benefit of a 204 response.
From the cases we discussed will probably be a situation where:
- We return a 200 if all the subscribers are created or updated.
- We could return a 207 if we have created and failed (though it is not a very common status code as it was oriented to WebDAV protocol: https://www.rfc-editor.org/rfc/rfc4918#section-13).
- Of course 400 validation responses.
There is a last twist of this that is to provide the status code of the success of the operation per subscriber in the response while returning a 200. Second code snippet: https://evertpot.com/http/207-multi-status
@@ -163,8 +163,8 @@ export class BaseRepository<T_DBModel, T_MappedEntity, T_Enforcement = object> { | |||
return await Promise.all(promises); | |||
} | |||
|
|||
async bulkWrite(bulkOperations: any) { | |||
await this.MongooseModel.bulkWrite(bulkOperations); | |||
async bulkWrite(bulkOperations: any, ordered = false): Promise<any> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's check if it is a version thing and we need to upgrade the Mongoose version. Found some comments mentioning that with the same problem.
const insertedSubscribers = created.map((inserted) => { | ||
indexes.push(inserted.index); | ||
|
||
return { subscriberId: subscribers[inserted.index]?.subscriberId }; | ||
}); | ||
|
||
let failed = []; | ||
if (writeErrors.length > 0) { | ||
failed = writeErrors.map((error) => { | ||
indexes.push(error.err.index); | ||
|
||
return { | ||
message: error.err.errmsg, | ||
subscriberId: error.err.op?.subscriberId, | ||
}; | ||
}); | ||
} | ||
|
||
const updatedSubscribers = subscribers | ||
.filter((subId, index) => !indexes.includes(index)) | ||
.map((subscriber) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we have a bit more clear the direction here let's clean up the different pieces in separate functions so we can isolate the actions for the different type of affected subscribers so we improve the readability of the code.
We can have a dedicated function for the created, other for the updated and other for the failed.
bulkResponse = e.result; | ||
} | ||
|
||
const { upserted: created, writeErrors } = bulkResponse.result; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The result
property is deprecated, it's recommended to use:
/** Returns an array of all inserted ids */
getInsertedIds(): Document[];
/** Returns an array of all upserted ids */
getUpsertedIds(): Document[];
/** Retrieve all write errors */
getWriteErrors(): WriteError[];
and it will simplify the logic a little ;)
@@ -163,8 +163,8 @@ export class BaseRepository<T_DBModel, T_MappedEntity, T_Enforcement = object> { | |||
return await Promise.all(promises); | |||
} | |||
|
|||
async bulkWrite(bulkOperations: any) { | |||
await this.MongooseModel.bulkWrite(bulkOperations); | |||
async bulkWrite(bulkOperations: any, ordered = false): Promise<any> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that they don't fix that also in the latest :( super weird...
@@ -27,7 +27,7 @@ interface ILogTopicSubscribersPayload { | |||
} | |||
|
|||
const isNotTopic = (recipient: TriggerRecipientSubscriber): recipient is TriggerRecipientSubscriber => | |||
typeof recipient === 'string' || recipient?.type !== TriggerRecipientsTypeEnum.TOPIC; | |||
typeof recipient === 'string' || !recipient.hasOwnProperty('type'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason behind changing this? We might not be using the type for a different situation of a Topic, but we wanted to leave that way open for future solutions. I can totally see us allowing to trigger an event to all the subscribers of a TenantId (becoming TriggerRecipientsTypeEnum.TENANT
) as related to the work we are doing on that side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@p-fernandez Because I removed the unused 'record' fields from subscriber type (it was never used or saved anywhere), so in the context of this function - which subscriber is of type TriggerRecipientSubscriber
- type script no longer accepts that there could be type property for subscriber. It wasn't exactly right even before, as it accepted it by 'mistake'
We could refactor the function to be of different type/ different check when we need to and get to a specific case. Wdyt? makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't totally understand the reason. isNotTopic
is being used here:
novu/apps/api/src/app/events/usecases/map-trigger-recipients/map-trigger-recipients.use-case.ts
Line 132 in b8cb9da
private findSubscribers(recipients: TriggerRecipients): ISubscribersDefine[] { |
If we do this change if we introduce new types of recipients as I mentioned or if someone is using this construction for subscribers (as it is available here:
SUBSCRIBER = 'Subscriber', |
If I am still missing the point you are making let's review this in private to not clutter this PR. 👍🏻
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🌟
Brilliant work. The endpoint is nicely implemented and the API response is well thought.
Just one question regarding one change that I am not sure about it.
What change does this PR introduce?
For larger organizations allow creating subscribers in bulk.
Introduce a subscriber bulk API to create subscriber in batches of up to 500 entries.
Why was this change needed?
Other information (Screenshots)