Skip to content

Commit

Permalink
chore: remove topology factory
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Jun 27, 2023
1 parent 862880f commit c5b2af1
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions packages/pubsub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { EventEmitter, CustomEvent } from '@libp2p/interface/events'
import { type PubSub, type Message, type StrictNoSign, type StrictSign, type PubSubInit, type PubSubEvents, type PeerStreams, type PubSubRPCMessage, type PubSubRPC, type PubSubRPCSubscription, type SubscriptionChangeData, type PublishResult, type TopicValidatorFn, TopicValidatorResult } from '@libp2p/interface/pubsub'
import { logger } from '@libp2p/logger'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { createTopology } from '@libp2p/topology'
import { pipe } from 'it-pipe'
import Queue from 'p-queue'
import { codes } from './errors.js'
Expand Down Expand Up @@ -128,10 +127,10 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu

// register protocol with topology
// Topology callbacks called on connection manager changes
const topology = createTopology({
const topology = {
onConnect: this._onPeerConnected,
onDisconnect: this._onPeerDisconnected
})
}
this._registrarTopologyIds = await Promise.all(this.multicodecs.map(async multicodec => registrar.register(multicodec, topology)))

log('started')
Expand Down Expand Up @@ -181,12 +180,12 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
const { stream, connection } = data
const peerId = connection.remotePeer

if (stream.stat.protocol == null) {
if (stream.protocol == null) {
stream.abort(new Error('Stream was not multiplexed'))
return
}

const peer = this.addPeer(peerId, stream.stat.protocol)
const peer = this.addPeer(peerId, stream.protocol)
const inboundStream = peer.attachInboundStream(stream)

this.processMessages(peerId, inboundStream, peer)
Expand All @@ -203,12 +202,12 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
try {
const stream = await conn.newStream(this.multicodecs)

if (stream.stat.protocol == null) {
if (stream.protocol == null) {
stream.abort(new Error('Stream was not multiplexed'))
return
}

const peer = this.addPeer(peerId, stream.stat.protocol)
const peer = this.addPeer(peerId, stream.protocol)
await peer.attachOutboundStream(stream)
} catch (err: any) {
log.error(err)
Expand Down

0 comments on commit c5b2af1

Please sign in to comment.