Skip to content

Commit

Permalink
fix: do not unsubscribe after publish (#78)
Browse files Browse the repository at this point in the history
Now that we route all messages via a `'message'` event, we can't check
the number of listeners for a given topic using the listener count.
  • Loading branch information
achingbrain authored Jun 23, 2022
1 parent 9cb24bc commit 760594e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
11 changes: 4 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,10 +621,6 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
super.dispatchEvent(new CustomEvent<Message>('message', {
detail: rpcMessage
}))

if (this.listenerCount(topic) === 0) {
this.unsubscribe(topic)
}
}
}

Expand Down Expand Up @@ -655,6 +651,8 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
throw new Error('Pubsub has not started')
}

log('subscribe to topic: %s', topic)

if (!this.subscriptions.has(topic)) {
this.subscriptions.add(topic)

Expand All @@ -676,11 +674,10 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
super.removeEventListener(topic)

const wasSubscribed = this.subscriptions.has(topic)
const listeners = this.listenerCount(topic)

log('unsubscribe from %s - am subscribed %s, listeners %d', topic, wasSubscribed, listeners)
log('unsubscribe from %s - am subscribed %s', topic, wasSubscribed)

if (wasSubscribed && listeners === 0) {
if (wasSubscribed) {
this.subscriptions.delete(topic)

for (const peerId of this.peers.keys()) {
Expand Down
25 changes: 24 additions & 1 deletion test/pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ describe('pubsub base implementation', () => {
beforeEach(async () => {
const peerId = await createPeerId()
pubsub = new PubsubImplementation({
multicodecs: [protocol]
multicodecs: [protocol],
emitSelf: true
})
pubsub.init(new Components({
peerId: peerId,
Expand Down Expand Up @@ -75,6 +76,28 @@ describe('pubsub base implementation', () => {

await expect(pubsub.validate(signedMessage)).to.eventually.be.undefined()
})

it('calls publishes messages twice', async () => {
let count = 0

await pubsub.start()
pubsub.subscribe(topic)

pubsub.addEventListener('message', evt => {
if (evt.detail.topic === topic) {
count++
}
})
await pubsub.publish(topic, message)
await pubsub.publish(topic, message)

// event dispatch is async
await pWaitFor(() => {
return count === 2
})

expect(count).to.eql(2)
})
})

describe('subscribe', () => {
Expand Down

0 comments on commit 760594e

Please sign in to comment.