-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Nv 3567 subscribers are duplicated during the burst of events #5308
Merged
djabarovgeorge
merged 10 commits into
next
from
nv-3567-subscribers-are-duplicated-during-the-burst-of-events
Apr 16, 2024
Merged
Changes from 7 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
0c449f4
chore(ws): update query fetch by environment id
djabarovgeorge 8aff2d8
chore(dal): remove redundant indexes
djabarovgeorge ac064ce
feat(dal): add unique index
djabarovgeorge 23ca77b
feat(dal): add deleted index and type safety index
djabarovgeorge d504201
Merge pull request #5309 from novuhq/nv-3472-create-subscribers-delet…
LetItRock 336570f
feat: fetch in case race condition occurred on subscriber create
djabarovgeorge 313edab
refactor(migration): merge subscriber metadata
djabarovgeorge 596b213
feat(migration): add considerate channel merge
djabarovgeorge 1a37963
feat(dal): remove delete param
djabarovgeorge 0f386d8
fix(dal): typo
djabarovgeorge File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
264 changes: 264 additions & 0 deletions
264
...subscribers/remove-duplicated-subscribers/remove-duplicated-subscribers.migration.spec.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,264 @@ | ||
import { SubscriberRepository } from '@novu/dal'; | ||
import { SubscribersService, UserSession } from '@novu/testing'; | ||
|
||
import { removeDuplicatedSubscribers } from './remove-duplicated-subscribers.migration'; | ||
import { expect } from 'chai'; | ||
|
||
describe('Migration: Remove Duplicated Subscribers', () => { | ||
let session: UserSession; | ||
let subscriberService: SubscribersService; | ||
const subscriberRepository = new SubscriberRepository(); | ||
|
||
beforeEach(async () => { | ||
session = new UserSession(); | ||
await session.initialize(); | ||
subscriberService = new SubscribersService(session.organization._id, session.environment._id); | ||
}); | ||
|
||
it('should remove duplicated subscribers', async () => { | ||
const duplicatedSubscriberId = '123'; | ||
|
||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'first_subscriber', | ||
_environmentId: session.environment._id, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'mid_subscriber', | ||
_environmentId: session.environment._id, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'last_subscriber', | ||
_environmentId: session.environment._id, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
const duplicates = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
|
||
expect(duplicates.length).to.equal(3); | ||
|
||
await removeDuplicatedSubscribers(); | ||
|
||
const remainingDuplicates = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
|
||
expect(remainingDuplicates.length).to.equal(1); | ||
expect(remainingDuplicates[0].firstName).to.equal('last_subscriber'); | ||
}); | ||
|
||
it('should always keep one subscriber per environment', async () => { | ||
const duplicatedSubscriberId = '123'; | ||
const firstEnvironmentId = session.environment._id; | ||
|
||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'env_1', | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'env_1', | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
const secondEnvironmentId = session.organization._id; | ||
|
||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'env_2', | ||
_environmentId: secondEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'env_2', | ||
_environmentId: secondEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
const duplicates = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
expect(duplicates.length).to.equal(2); | ||
|
||
const duplicates2 = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
expect(duplicates2.length).to.equal(2); | ||
|
||
await removeDuplicatedSubscribers(); | ||
|
||
const remainingDuplicates = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
expect(remainingDuplicates.length).to.equal(1); | ||
expect(remainingDuplicates[0].firstName).to.equal('env_1'); | ||
|
||
const remainingDuplicates2 = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: secondEnvironmentId, | ||
}); | ||
expect(remainingDuplicates2.length).to.equal(1); | ||
expect(remainingDuplicates2[0].firstName).to.equal('env_2'); | ||
}); | ||
|
||
it('should merge the metadata across duplicated subscribers', async () => { | ||
const duplicatedSubscriberId = '123'; | ||
const firstEnvironmentId = session.environment._id; | ||
|
||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'first_name', | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
lastName: 'last_name', | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
const duplicates = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
expect(duplicates.length).to.equal(2); | ||
|
||
await removeDuplicatedSubscribers(); | ||
|
||
const remainingDuplicates = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
expect(remainingDuplicates.length).to.equal(1); | ||
expect(remainingDuplicates[0].firstName).to.equal('first_name'); | ||
expect(remainingDuplicates[0].lastName).to.equal('last_name'); | ||
}); | ||
|
||
it('should merge the metadata across duplicated subscribers by latest created subscriber', async () => { | ||
const duplicatedSubscriberId = '123'; | ||
const firstEnvironmentId = session.environment._id; | ||
|
||
const firstCreatedSubscriber = await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'first_name_1', | ||
lastName: 'last_name_1', | ||
email: 'email_1', | ||
phone: 'phone_1', | ||
avatar: 'avatar_1', | ||
locale: 'locale_1', | ||
data: { key: 'value_1' }, | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'first_name_2', | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
email: 'email_3', | ||
phone: 'phone_3', | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
avatar: 'avatar_4', | ||
data: { newStuff: 'value_4' }, | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'first_name_5', | ||
locale: 'locale_5', | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
const duplicates = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
expect(duplicates.length).to.equal(5); | ||
|
||
await removeDuplicatedSubscribers(); | ||
|
||
const remainingDuplicates = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
expect(remainingDuplicates.length).to.equal(1); | ||
expect(remainingDuplicates[0]._id).to.equal(firstCreatedSubscriber._id); | ||
expect(remainingDuplicates[0]._organizationId).to.equal(firstCreatedSubscriber._organizationId); | ||
expect(remainingDuplicates[0]._environmentId).to.equal(firstCreatedSubscriber._environmentId); | ||
expect(remainingDuplicates[0].__v).to.equal(firstCreatedSubscriber.__v); | ||
|
||
expect(remainingDuplicates[0].firstName).to.equal('first_name_5'); | ||
expect(remainingDuplicates[0].lastName).to.equal('last_name_1'); | ||
expect(remainingDuplicates[0].email).to.equal('email_3'); | ||
expect(remainingDuplicates[0].phone).to.equal('phone_3'); | ||
expect(remainingDuplicates[0].avatar).to.equal('avatar_4'); | ||
expect(remainingDuplicates[0].locale).to.equal('locale_5'); | ||
expect(remainingDuplicates[0].data?.key).to.be.undefined; | ||
expect(remainingDuplicates[0].data?.newStuff).to.equal('value_4'); | ||
}); | ||
|
||
it('should keep the first created subscriber after merge', async () => { | ||
const duplicatedSubscriberId = '123'; | ||
const firstEnvironmentId = session.environment._id; | ||
|
||
const firstCreatedSubscriber = await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'first_name', | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
await subscriberRepository.create({ | ||
subscriberId: duplicatedSubscriberId, | ||
firstName: 'first_name_2', | ||
_environmentId: firstEnvironmentId, | ||
_organizationId: session.organization._id, | ||
}); | ||
|
||
const duplicates = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
expect(duplicates.length).to.equal(2); | ||
|
||
await removeDuplicatedSubscribers(); | ||
|
||
const remainingDuplicates = await subscriberRepository.find({ | ||
subscriberId: duplicatedSubscriberId, | ||
_environmentId: session.environment._id, | ||
}); | ||
|
||
expect(remainingDuplicates.length).to.equal(1); | ||
expect(remainingDuplicates[0]._id).to.equal(firstCreatedSubscriber._id); | ||
}); | ||
}); | ||
LetItRock marked this conversation as resolved.
Show resolved
Hide resolved
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests won't work with the new index because it won't allow to create of new documents with duplicated indexes.
i ran them locally without the new index because I was not able to find a better way to manipulate the indexes from the API without creating an additional connection to the DB.