diff --git a/cli/src/lib/pub.ts b/cli/src/lib/pub.ts index d4cb4a335..6b106209b 100644 --- a/cli/src/lib/pub.ts +++ b/cli/src/lib/pub.ts @@ -10,6 +10,29 @@ import delay from '../utils/delay' import { saveConfig, loadConfig } from '../utils/config' import { loadSimulator } from '../utils/simulate' import { serializeProtobufToBuffer } from '../utils/protobuf' +import convertPayload from '../utils/convertPayload' + +const processPublishMessage = ( + message: string | Buffer, + protobufPath: string | undefined, + protobufMessageName: string | undefined, + format: FormatType | undefined, +): Buffer | string => { + /* + * Pipeline for processing outgoing messages in two potential stages: + * 1. Format Conversion --> Applied if a format is specified, transforming the message into that format; if absent, the message retains its initial state. + * 2. Protobuf Serialization --> Engaged if both protobuf path and message name are present, encapsulating the message into a protobuf format; without these settings, the message circulates unchanged. + */ + const pipeline = [ + (msg: string | Buffer) => (format ? convertPayload(Buffer.from(msg.toString()), format, 'encode') : msg), + (msg: string | Buffer) => + protobufPath && protobufMessageName + ? serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName) + : msg, + ] + + return pipeline.reduce((msg, transformer) => transformer(msg), message) as Buffer +} const send = ( config: boolean | string | undefined, @@ -29,8 +52,8 @@ const send = ( basicLog.connected() const { topic, message, protobufPath, protobufMessageName, format } = pubOpts basicLog.publishing() - let bufferMessage = serializeProtobufToBuffer(message, protobufPath, protobufMessageName, format) - client.publish(topic, bufferMessage, pubOpts.opts, (err) => { + const publishMessage = processPublishMessage(message, protobufPath, protobufMessageName, format) + client.publish(topic, publishMessage, pubOpts.opts, (err) => { if (err) { signale.warn(err) } else { @@ -74,9 +97,8 @@ const multisend = ( }) sender._write = (line, _enc, cb) => { const { topic, opts, protobufPath, protobufMessageName, format } = pubOpts - - let bufferMessage = serializeProtobufToBuffer(line.trim(), protobufPath, protobufMessageName, format) - client.publish(topic, bufferMessage, opts, cb) + const publishMessage = processPublishMessage(line.trim(), protobufPath, protobufMessageName, format) + client.publish(topic, publishMessage, opts, cb) } client.on('connect', () => { diff --git a/cli/src/lib/sub.ts b/cli/src/lib/sub.ts index ce41572d9..8ee8250d5 100644 --- a/cli/src/lib/sub.ts +++ b/cli/src/lib/sub.ts @@ -6,6 +6,35 @@ import convertPayload from '../utils/convertPayload' import { saveConfig, loadConfig } from '../utils/config' import { deserializeBufferToProtobuf } from '../utils/protobuf' +const processReceivedMessage = ( + payload: Buffer, + protobufPath: string | undefined, + protobufMessageName: string | undefined, + format: FormatType | undefined, +): string => { + let message: string | Buffer = payload + /* + * Pipeline for processing incoming messages, following two potential steps: + * 1. Protobuf Deserialization --> Utilized if both protobuf path and message name are defined, otherwise message passes as is. + * 2. Format Conversion --> Engaged if a format is defined, converting the message accordingly; if not defined, message passes unchanged. + */ + const pipeline = [ + (msg: Buffer) => + protobufPath && protobufMessageName + ? deserializeBufferToProtobuf(msg, protobufPath, protobufMessageName, format) + : msg, + (msg: Buffer) => (format ? convertPayload(msg, format, 'decode') : msg), + ] + + message = pipeline.reduce((msg, transformer) => transformer(msg), message) + + if (Buffer.isBuffer(message)) { + message = message.toString('utf-8') + } + + return message +} + const sub = (options: SubscribeOptions) => { const { save, config } = options @@ -66,12 +95,8 @@ const sub = (options: SubscribeOptions) => { options.verbose && msgData.push({ label: 'topic', value: topic }) - let payloadMessage = deserializeBufferToProtobuf(payload, protobufPath, protobufMessageName, format) - if (payloadMessage) { - msgData.push({ label: 'payload', value: format ? convertPayload(payloadMessage, format) : payloadMessage }) - } else { - msgData.push({ label: 'payload', value: convertPayload(payload, format) }) - } + let receivedMessage = processReceivedMessage(payload, protobufPath, protobufMessageName, format) + msgData.push({ label: 'payload', value: receivedMessage }) packet.retain && msgData.push({ label: 'retain', value: packet.retain }) @@ -91,7 +116,7 @@ const sub = (options: SubscribeOptions) => { !outputModeClean ? msgLog(msgData) - : console.log(JSON.stringify({ topic, payload: convertPayload(payload, format), packet }, null, 2)) + : console.log(JSON.stringify({ topic, payload: convertPayload(payload, format, 'decode'), packet }, null, 2)) }) client.on('error', (err) => { diff --git a/cli/src/utils/convertPayload.ts b/cli/src/utils/convertPayload.ts index 7abe4d0cc..f1846818a 100644 --- a/cli/src/utils/convertPayload.ts +++ b/cli/src/utils/convertPayload.ts @@ -1,24 +1,36 @@ import chalk from 'chalk' -const convertJSON = (value: Buffer) => { +const convertJSON = (value: Buffer | string, action: 'encode' | 'decode') => { try { - return JSON.stringify(JSON.parse(value.toString()), null, 2) + if (action === 'decode') { + return JSON.stringify(JSON.parse(value.toString()), null, 2) + } else { + return Buffer.from(JSON.stringify(JSON.parse(value.toString()))) + } } catch (err) { return chalk.red(err) } } -const convertPayload = (payload: Buffer, to?: FormatType) => { - switch (to) { - case 'base64': - return payload.toString('base64') - case 'json': - return convertJSON(payload) - case 'hex': - return payload.toString('hex').replace(/(.{4})/g, '$1 ') - default: - return payload.toString('utf-8') +const convertPayload = (payload: Buffer | string, format?: FormatType, action: 'encode' | 'decode' = 'decode') => { + const actions = { + encode: { + base64: () => Buffer.from(payload.toString(), 'base64'), + json: () => convertJSON(payload, 'encode'), + hex: () => Buffer.from(payload.toString().replace(/\s+/g, ''), 'hex'), + default: () => Buffer.from(payload.toString(), 'utf-8'), + }, + decode: { + base64: () => payload.toString('base64'), + json: () => convertJSON(payload, 'decode'), + hex: () => payload.toString('hex').replace(/(.{4})/g, '$1 '), + default: () => payload.toString('utf-8'), + }, } + const actionSet = actions[action] + const runAction = actionSet[format || 'default'] + + return runAction ? runAction() : payload } export default convertPayload diff --git a/cli/src/utils/protobuf.ts b/cli/src/utils/protobuf.ts index 059cfa827..7157fd433 100644 --- a/cli/src/utils/protobuf.ts +++ b/cli/src/utils/protobuf.ts @@ -2,78 +2,53 @@ import protobuf from 'protobufjs' import signale from './signale' import { transformPBJSError } from './protobufErrors' -const convertObject = (raw: string | Buffer, format?: FormatType | undefined) => { - switch (format) { - case 'base64': - return Buffer.from(raw.toString('utf-8'), 'base64').toString('utf-8') - case 'hex': - return Buffer.from(raw.toString('utf-8').replaceAll(' ', ''), 'hex').toString('utf-8') - case 'json': - return JSON.stringify(JSON.parse(raw.toString('utf-8')), null, 2) - default: - return raw.toString('utf-8') - } -} - export const serializeProtobufToBuffer = ( raw: string | Buffer, - protobufPath: string | undefined, - protobufMessageName: string | undefined, - format?: FormatType | undefined, + protobufPath: string, + protobufMessageName: string, ): Buffer => { - let rawData - try { - rawData = convertObject(raw, format) - } catch (error: unknown) { - signale.error(`Message format type error : ${(error as Error).message.split('\n')[0]}`) - process.exit(1) - } - + let rawData = raw.toString('utf-8') let bufferMessage = Buffer.from(rawData) - if (protobufPath && protobufMessageName) { - try { - const root = protobuf.loadSync(protobufPath) - const Message = root.lookupType(protobufMessageName) - const err = Message.verify(JSON.parse(rawData)) - if (err) { - signale.error(`Message serialization error: ${err}`) - process.exit(1) - } - const data = Message.create(JSON.parse(rawData)) - const serializedMessage = Message.encode(data).finish() - bufferMessage = Buffer.from(serializedMessage) - } catch (error: unknown) { - signale.error(`Message serialization error: ${(error as Error).message.split('\n')[0]}`) + try { + const root = protobuf.loadSync(protobufPath) + const Message = root.lookupType(protobufMessageName) + const err = Message.verify(JSON.parse(rawData)) + if (err) { + signale.error(`Message serialization error: ${err}`) process.exit(1) } + const data = Message.create(JSON.parse(rawData)) + const serializedMessage = Message.encode(data).finish() + bufferMessage = Buffer.from(serializedMessage) + } catch (error: unknown) { + signale.error(`Message serialization error: ${(error as Error).message.split('\n')[0]}`) + process.exit(1) } return bufferMessage } export const deserializeBufferToProtobuf = ( payload: Buffer, - protobufPath: string | undefined, - protobufMessageName: string | undefined, - to?: FormatType, + protobufPath: string, + protobufMessageName: string, + needFormat: FormatType | undefined, ): any => { - if (protobufPath && protobufMessageName) { - try { - const root = protobuf.loadSync(protobufPath) - const Message = root.lookupType(protobufMessageName) - const MessageData = Message.decode(payload) - const err = Message.verify(MessageData) - if (err) { - signale.error(`Message deserialization error: ${err}`) - process.exit(1) - } - if (to) { - return Buffer.from(JSON.stringify(MessageData.toJSON())) - } - return MessageData - } catch (error: unknown) { - let err = transformPBJSError(error as Error) - signale.error(err.message.split('\n')[0]) + try { + const root = protobuf.loadSync(protobufPath) + const Message = root.lookupType(protobufMessageName) + const MessageData = Message.decode(payload) + const err = Message.verify(MessageData) + if (err) { + signale.error(`Message deserialization error: ${err}`) process.exit(1) } + if (needFormat) { + return Buffer.from(JSON.stringify(MessageData.toJSON())) + } + return MessageData + } catch (error: unknown) { + let err = transformPBJSError(error as Error) + signale.error(err.message.split('\n')[0]) + process.exit(1) } }