diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index ebfb303ecb..b8760e3a4f 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -3,7 +3,6 @@ import { EventEmitter, CustomEvent } from '@libp2p/interface/events' import { type PubSub, type Message, type StrictNoSign, type StrictSign, type PubSubInit, type PubSubEvents, type PeerStreams, type PubSubRPCMessage, type PubSubRPC, type PubSubRPCSubscription, type SubscriptionChangeData, type PublishResult, type TopicValidatorFn, TopicValidatorResult } from '@libp2p/interface/pubsub' import { logger } from '@libp2p/logger' import { PeerMap, PeerSet } from '@libp2p/peer-collections' -import { createTopology } from '@libp2p/topology' import { pipe } from 'it-pipe' import Queue from 'p-queue' import { codes } from './errors.js' @@ -128,10 +127,10 @@ export abstract class PubSubBaseProtocol = Pu // register protocol with topology // Topology callbacks called on connection manager changes - const topology = createTopology({ + const topology = { onConnect: this._onPeerConnected, onDisconnect: this._onPeerDisconnected - }) + } this._registrarTopologyIds = await Promise.all(this.multicodecs.map(async multicodec => registrar.register(multicodec, topology))) log('started') @@ -181,12 +180,12 @@ export abstract class PubSubBaseProtocol = Pu const { stream, connection } = data const peerId = connection.remotePeer - if (stream.stat.protocol == null) { + if (stream.protocol == null) { stream.abort(new Error('Stream was not multiplexed')) return } - const peer = this.addPeer(peerId, stream.stat.protocol) + const peer = this.addPeer(peerId, stream.protocol) const inboundStream = peer.attachInboundStream(stream) this.processMessages(peerId, inboundStream, peer) @@ -203,12 +202,12 @@ export abstract class PubSubBaseProtocol = Pu try { const stream = await conn.newStream(this.multicodecs) - if (stream.stat.protocol == null) { + if (stream.protocol == null) { stream.abort(new Error('Stream was not multiplexed')) return } - const peer = this.addPeer(peerId, stream.stat.protocol) + const peer = this.addPeer(peerId, stream.protocol) await peer.attachOutboundStream(stream) } catch (err: any) { log.error(err)