diff --git a/packages/libp2p-daemon-client/src/index.ts b/packages/libp2p-daemon-client/src/index.ts index ef56b204..eff21d2b 100644 --- a/packages/libp2p-daemon-client/src/index.ts +++ b/packages/libp2p-daemon-client/src/index.ts @@ -54,6 +54,9 @@ class Client implements DaemonClient { async send (request: Request): Promise { const maConn = await this.connectDaemon() + const subtype = request.pubsub?.type ?? request.dht?.type ?? request.peerStore?.type ?? '' + log('send', request.type, subtype) + const streamHandler = new StreamHandler({ stream: maConn }) streamHandler.write(Request.encode(request)) return streamHandler @@ -291,9 +294,14 @@ export interface DHTClient { getClosestPeers: (key: Uint8Array) => AsyncIterable } +export interface Subscription { + messages: () => AsyncIterable + cancel: () => Promise +} + export interface PubSubClient { publish: (topic: string, data: Uint8Array) => Promise - subscribe: (topic: string) => AsyncIterable + subscribe: (topic: string) => Promise getTopics: () => Promise getSubscribers: (topic: string) => Promise } diff --git a/packages/libp2p-daemon-client/src/pubsub.ts b/packages/libp2p-daemon-client/src/pubsub.ts index 2133741e..a8562265 100644 --- a/packages/libp2p-daemon-client/src/pubsub.ts +++ b/packages/libp2p-daemon-client/src/pubsub.ts @@ -5,7 +5,7 @@ import { PSRequest, PSMessage } from '@libp2p/daemon-protocol' -import type { DaemonClient } from './index.js' +import type { DaemonClient, Subscription } from './index.js' import type { PeerId } from '@libp2p/interface-peer-id' import { peerIdFromBytes } from '@libp2p/peer-id' @@ -89,7 +89,7 @@ export class Pubsub { /** * Request to subscribe a certain topic */ - async * subscribe (topic: string): AsyncGenerator { + async subscribe (topic: string): Promise { if (typeof topic !== 'string') { throw new CodeError('invalid topic received', 'ERR_INVALID_TOPIC') } @@ -114,16 +114,27 @@ export class Pubsub { throw new CodeError(response.error?.msg ?? 'Pubsub publish failed', 'ERR_PUBSUB_PUBLISH_FAILED') } - // stream messages - while (true) { - message = await sh.read() + let subscribed = true - if (message == null) { - throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE') - } + const subscription: Subscription = { + async * messages () { + while (subscribed) { // eslint-disable-line no-unmodified-loop-condition + message = await sh.read() + + if (message == null) { + throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE') + } - yield PSMessage.decode(message) + yield PSMessage.decode(message) + } + }, + async cancel () { + subscribed = false + await sh.close() + } } + + return subscription } async getSubscribers (topic: string): Promise {