-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Nv 3567 subscribers are duplicated during the burst of events #5308
Nv 3567 subscribers are duplicated during the burst of events #5308
Conversation
✅ Deploy Preview for dev-web-novu ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
@@ -30,7 +30,7 @@ export class SubscriberOnlineService { | |||
|
|||
private async updateOnlineStatus(subscriber: ISubscriberJwt, updatePayload: IUpdateSubscriberPayload) { | |||
await this.subscriberRepository.update( | |||
{ _id: subscriber._id, _organizationId: subscriber.organizationId }, | |||
{ _id: subscriber._id, _environmentId: subscriber.environmentId }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does anyone know if there is a reason to query by organization id here?
Updating it to environment id means that we can remove the _organizaitionId index from the DB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove it.
}, | ||
_environmentId: { | ||
type: Schema.Types.ObjectId, | ||
ref: 'Environment', | ||
index: true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant index 🧹
…ed-index feat(dal): add deleted index and type safety index
@djabarovgeorge sorry that I didn't noticed that the PR #5309 was pointing to this one |
No worries both of them are my PR's are they are small, so there won't be any issue with it ;) |
❌ Deploy Preview for novu-design failed. Why did it fail? →
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests won't work with the new index because it won't allow to create of new documents with duplicated indexes.
i ran them locally without the new index because I was not able to find a better way to manipulate the indexes from the API without creating an additional connection to the DB.
}); | ||
|
||
for (const group of cursor) { | ||
const docsToRemove = group.docs.slice(0, -1); // Keep the last created document, remove others |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment I decided to keep the last created subscriber, does someone know other practices for removing invalid entries?
type SubscriberDeleteQuery = Pick<SubscriberQuery, 'subscriberId' | '_environmentId'> & EnforceEnvOrOrgIds; | ||
type SubscriberDeleteManyQuery = Pick<SubscriberQuery, 'subscriberId' | '_id' | '_environmentId'> & EnforceEnvOrOrgIds; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i decided to create those types because, without those types, the client of the methods could think that other query params are in use in the query while they are not at the moment.
return await this.subscriber.delete(requestQuery); | ||
} | ||
|
||
async deleteMany(query: SubscriberDeleteManyQuery) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need deleteMany
override method in order to perform soft delete
export enum ErrorCodesEnum { | ||
DUPLICATE_KEY = '11000', | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this enum as a temporary solution. It helps the application layer recognize errors from the data access layer without being dependent on MongoDB. But I'm considering whether it's better to generate our own error codes and link them to the application layer for better control and clarity.
if (currentSubscriber[key] !== null && currentSubscriber[key] !== undefined) { | ||
mergedSubscriber[key] = currentSubscriber[key]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠ issue: I think that this part might be problematic for the channels
field, which is an array, so the latter will override the first. Should we have here more custom logic and compare channels by integrationId
and merge all the unique deviceTokens
together? wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I wasn't sure how specific we wanted to be because we might say the same with 'data.' but, since we're uncertain about the type of data being stored, we could check by object size. Yet again, I wasn't certain if this level of detail was what we were aiming for.
What change does this PR introduce?
During the burst of trigger events the processed subscribers might get duplicated in the database. This could happen due to the race conditions in the worker's concurrent processing.
Why was this change needed?
Duplicated subscribers mean that the wrong document might be picked from the database and for example some fields might be missing (email), resulting in messages not being delivered.
Other information (Screenshots)
We have to introduce the unique index on the subscriberId and environmentId combo to prevent and fix this problem.