From 00151a6dd3e75f5530d4a1f3870cae90175d407f Mon Sep 17 00:00:00 2001 From: Last Date: Mon, 19 Aug 2024 15:42:07 +0800 Subject: [PATCH 1/3] feat(cli): add avro support --- cli/package.json | 1 + cli/src/index.ts | 10 ++++++ cli/src/lib/pub.ts | 41 ++++++++++++------------ cli/src/lib/sub.ts | 65 ++++++++++++++++++++++++++++++--------- cli/src/types/global.d.ts | 21 ++++++++++++- cli/src/utils/avro.ts | 59 +++++++++++++++++++++++++++++++++++ cli/src/utils/parse.ts | 21 ++++++++++++- cli/src/utils/protobuf.ts | 4 +-- cli/yarn.lock | 5 +++ 9 files changed, 190 insertions(+), 37 deletions(-) create mode 100644 cli/src/utils/avro.ts diff --git a/cli/package.json b/cli/package.json index 389296141..41961d778 100644 --- a/cli/package.json +++ b/cli/package.json @@ -22,6 +22,7 @@ }, "dependencies": { "@inquirer/prompts": "^5.0.3", + "avsc": "^5.7.7", "cbor": "^9.0.1", "chalk": "~4.1.2", "cli-table3": "^0.6.3", diff --git a/cli/src/index.ts b/cli/src/index.ts index b4141256d..0bbd074bb 100755 --- a/cli/src/index.ts +++ b/cli/src/index.ts @@ -245,6 +245,11 @@ export class Commander { '-Pmn, --protobuf-message-name ', 'the name of the protobuf message type (must exist in the .proto file)', ) + .option( + '-Ap, --avsc-path ', + 'the path to the .avsc file that defines the avro schema for AVRO decoding', + parseFileRead, + ) .option('--debug', 'enable debug mode for MQTT.js', false) .allowUnknownOption(false) .action(pub) @@ -367,6 +372,11 @@ export class Commander { '-Pmn, --protobuf-message-name ', 'the name of the protobuf message type (must exist in the .proto file)', ) + .option( + '-Ap, --avsc-path ', + 'the path to the .avsc file that defines the avro schema for AVRO decoding', + parseFileRead, + ) .option('--debug', 'enable debug mode for MQTT.js', false) .allowUnknownOption(false) .action(sub) diff --git a/cli/src/lib/pub.ts b/cli/src/lib/pub.ts index b63def111..4de8f2aa6 100644 --- a/cli/src/lib/pub.ts +++ b/cli/src/lib/pub.ts @@ -13,6 +13,7 @@ import logWrapper, { basicLog, benchLog, Signale, signale, simulateLog, singaleC import { handleLoadOptions, handleSaveOptions } from '../utils/options' import { checkScenarioExists, checkTopicExists, parseConnectOptions, parsePublishOptions } from '../utils/parse' import { serializeProtobufToBuffer } from '../utils/protobuf' +import { serializeAvroToBuffer } from '../utils/avro' import { loadSimulator } from '../utils/simulate' /** @@ -25,18 +26,16 @@ import { loadSimulator } from '../utils/simulate' * Flow: * Input Message -> [Format Conversion] -> [Protobuf Serialization] -> Output Message * @param {string | Buffer} message - The message to be processed. - * @param {string} [protobufPath] - The path to the protobuf definition. - * @param {string} [protobufMessageName] - The name of the protobuf message. + * @param {SchemaOptions} [schemaOptions] - Options for schema-based encoding * @param {FormatType} [format] - The format to convert the message to. * @returns {Buffer | string} - The processed message. */ const processPublishMessage = ( message: string | Buffer, - protobufPath?: string, - protobufMessageName?: string, + schemaOptions: SchemaOptions, format?: FormatType, ): Buffer | string => { - const convertMessageFormat = (msg: string | Buffer): Buffer | string => { + const convertMessageFormat = (msg: string | Buffer): string | Buffer => { if (!format) { return msg } @@ -44,16 +43,22 @@ const processPublishMessage = ( return convertPayload(bufferMsg, format, 'encode') } - const serializeProtobufMessage = (msg: string | Buffer): Buffer | string => { - if (protobufPath && protobufMessageName) { - return serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName) + const serializeWithSchema = (msg: string | Buffer): string | Buffer => { + switch (schemaOptions.type) { + case 'none': + return msg + + case 'protobuf': + return serializeProtobufToBuffer(msg, schemaOptions.protobufPath, schemaOptions.protobufMessageName) + + case 'avro': + return serializeAvroToBuffer(msg, schemaOptions.avscPath) } - return msg } - const pipeline = [convertMessageFormat, serializeProtobufMessage] + const pipeline = [convertMessageFormat, serializeWithSchema] - return pipeline.reduce((msg, transformer) => transformer(msg), message) as Buffer + return pipeline.reduce((msg: string | Buffer, transformer) => transformer(msg), message) as Buffer } const send = ( @@ -62,8 +67,7 @@ const send = ( pubOpts: { topic: string message: string | Buffer - protobufPath: string | undefined - protobufMessageName: string | undefined + schemaOptions: SchemaOptions format: FormatType | undefined opts: IClientPublishOptions }, @@ -77,9 +81,9 @@ const send = ( client.on('connect', () => { retryTimes = 0 basicLog.connected() - const { topic, message, protobufPath, protobufMessageName, format } = pubOpts + const { topic, message, schemaOptions, format } = pubOpts basicLog.publishing() - const publishMessage = processPublishMessage(message, protobufPath, protobufMessageName, format) + const publishMessage = processPublishMessage(message, schemaOptions, format) client.publish(topic, publishMessage, pubOpts.opts, (err) => { if (err) { basicLog.error(err) @@ -127,8 +131,7 @@ const multiSend = ( pubOpts: { topic: string message: string | Buffer - protobufPath: string | undefined - protobufMessageName: string | undefined + schemaOptions: SchemaOptions format: FormatType | undefined opts: IClientPublishOptions }, @@ -143,10 +146,10 @@ const multiSend = ( }) let count = 0 sender._write = (line, _enc, cb) => { - const { topic, opts, protobufPath, protobufMessageName, format } = pubOpts + const { topic, opts, schemaOptions, format } = pubOpts count++ let omitTopic = opts.properties?.topicAlias && count >= 2 - const publishMessage = processPublishMessage(line.trim(), protobufPath, protobufMessageName, format) + const publishMessage = processPublishMessage(line.trim(), schemaOptions, format) client.publish(omitTopic ? '' : topic, publishMessage, opts, cb) } diff --git a/cli/src/lib/sub.ts b/cli/src/lib/sub.ts index 3cf4d59d0..4a117c5d2 100644 --- a/cli/src/lib/sub.ts +++ b/cli/src/lib/sub.ts @@ -8,6 +8,7 @@ import { writeFile, appendFile, getPathExtname, createNextNumberedFileName } fro import { deserializeBufferToProtobuf } from '../utils/protobuf' import isSupportedBinaryFormatForMQTT from '../utils/binaryFormats' import * as Debug from 'debug' +import { deserializeBufferToAvro } from '../utils/avro' /** * @@ -17,27 +18,45 @@ import * as Debug from 'debug' * Flow: * payload -> [Protobuf Deserialization] -> [Format Conversion] -> Processed Message * @param payload - The message payload to be processed. - * @param protobufPath - The path to the Protobuf definition file. - * @param protobufMessageName - The name of the Protobuf message. + * @param {SchemaOptions} [schemaOptions] - Options for schema-based encoding * @param format - The format of the payload. * @returns The processed message as a string or Buffer. */ const processReceivedMessage = ( payload: Buffer, - protobufPath?: string, - protobufMessageName?: string, + schemaOptions: SchemaOptions, format?: FormatType, ): string | Buffer => { let message: string | Buffer = payload - const pipeline = [ - (msg: Buffer) => - protobufPath && protobufMessageName - ? deserializeBufferToProtobuf(msg, protobufPath, protobufMessageName, format) - : msg, - (msg: Buffer) => (format ? convertPayload(msg, format, 'decode') : msg), - ] - message = pipeline.reduce((msg, transformer) => transformer(msg), message) + const convertMessageFormat = (msg: string | Buffer): string | Buffer => { + if (!format) { + return msg + } + return convertPayload(msg, format, 'decode') + } + + const deserializeWithSchema = (msg: string | Buffer): string | Buffer => { + switch (schemaOptions.type) { + case 'none': + return msg + + case 'protobuf': + return deserializeBufferToProtobuf( + payload, + schemaOptions.protobufPath, + schemaOptions.protobufMessageName, + format, + ) + + case 'avro': + return deserializeBufferToAvro(payload, schemaOptions.avscPath, format) + } + } + + const pipeline = [deserializeWithSchema, convertMessageFormat] + + message = pipeline.reduce((msg: string | Buffer, transformer) => transformer(msg), message) if (Buffer.isBuffer(message) && format !== 'binary') { message = message.toString('utf-8') @@ -139,11 +158,29 @@ const sub = (options: SubscribeOptions) => { client.on('connect', subscribeToTopics) client.on('message', (topic, payload, packet) => { - const { format, protobufPath, protobufMessageName, fileSave, fileWrite, delimiter } = options + const { format, protobufPath, protobufMessageName, avscPath, fileSave, fileWrite, delimiter } = options + + let schemaOptions: SchemaOptions + if (protobufPath && protobufMessageName) { + schemaOptions = { + type: 'protobuf', + protobufPath, + protobufMessageName, + } + } else if (avscPath) { + schemaOptions = { + type: 'avro', + avscPath, + } + } else { + schemaOptions = { + type: 'none', + } + } const msgData: MsgItem[] = [] - const receivedMessage = processReceivedMessage(payload, protobufPath, protobufMessageName, format) + const receivedMessage = processReceivedMessage(payload, schemaOptions, format) const savePath = fileSave ? createNextNumberedFileName(fileSave) : fileWrite if (savePath) { diff --git a/cli/src/types/global.d.ts b/cli/src/types/global.d.ts index 8a3e9344d..1786e18d4 100644 --- a/cli/src/types/global.d.ts +++ b/cli/src/types/global.d.ts @@ -56,6 +56,23 @@ declare global { debug?: boolean } + interface ProtobufSchemaOptions { + type: 'protobuf' + protobufPath: string + protobufMessageName: string + } + + interface AvroSchemaOptions { + type: 'avro' + avscPath: string + } + + interface NoSchema { + type: 'none' + } + + type SchemaOptions = ProtobufSchemaOptions | AvroSchemaOptions | NoSchema + interface PublishOptions extends ConnectOptions { topic: string message: string | Buffer @@ -76,6 +93,7 @@ declare global { connUserProperties?: Record protobufPath?: string protobufMessageName?: string + avscPath?: string format?: FormatType } @@ -92,11 +110,12 @@ declare global { fileWrite?: string fileSave?: string delimiter?: string - format?: FormatType outputMode?: OutputMode verbose: boolean protobufPath?: string protobufMessageName?: string + avscPath?: string + format?: FormatType } type OmitConnectOptions = Omit diff --git a/cli/src/utils/avro.ts b/cli/src/utils/avro.ts new file mode 100644 index 000000000..263454d05 --- /dev/null +++ b/cli/src/utils/avro.ts @@ -0,0 +1,59 @@ +import * as avro from 'avsc' +import * as fs from 'fs' +import logWrapper from './logWrapper' + +const schemaCache: { [key: string]: avro.Type } = {} + +const getAvroType = (schemaPath: string): avro.Type => { + // first search from cache + if (schemaCache[schemaPath]) { + return schemaCache[schemaPath] + } + + try { + const schemaStr = fs.readFileSync(schemaPath, 'utf-8') + const type = avro.Type.forSchema(JSON.parse(schemaStr)) + + // cache the parsed schema + schemaCache[schemaPath] = type + + return type + } catch (err: unknown) { + logWrapper.fail(`Unable to load avro schema from ${schemaPath}: ${(err as Error).message}`) + process.exit(1) + } +} + +export const serializeAvroToBuffer = (raw: string | Buffer, avscSchemaPath: string): Buffer => { + const type: avro.Type = getAvroType(avscSchemaPath) + + let rawMessage = raw.toString('utf-8') + + try { + const serializedMessage = type.toBuffer(JSON.parse(rawMessage)) + return Buffer.from(serializedMessage) + } catch (err: unknown) { + logWrapper.fail(`Unable to serialize message to avro buffer: ${err}`) + process.exit(1) + } +} + +export const deserializeBufferToAvro = ( + payload: Buffer, + avscSchemaPath: string, + needFormat?: FormatType, +): string | Buffer => { + const type: avro.Type = getAvroType(avscSchemaPath) + + try { + const message = type.fromBuffer(payload) + + if (needFormat) { + return Buffer.from(JSON.stringify(message)) + } + return JSON.stringify(message) + } catch (err: unknown) { + logWrapper.fail(`Unable to deserialize avro encoded buffer: ${(err as Error).message}`) + process.exit(1) + } +} diff --git a/cli/src/utils/parse.ts b/cli/src/utils/parse.ts index 44b2a3a0d..6af813cec 100644 --- a/cli/src/utils/parse.ts +++ b/cli/src/utils/parse.ts @@ -349,6 +349,7 @@ const parsePublishOptions = (options: PublishOptions) => { contentType, protobufPath, protobufMessageName, + avscPath, format, } = options @@ -358,6 +359,24 @@ const parsePublishOptions = (options: PublishOptions) => { dup, } + let schemaOptions: SchemaOptions + if (protobufPath && protobufMessageName) { + schemaOptions = { + type: 'protobuf', + protobufPath, + protobufMessageName, + } + } else if (avscPath) { + schemaOptions = { + type: 'avro', + avscPath, + } + } else { + schemaOptions = { + type: 'none', + } + } + if (options.mqttVersion === 5) { const properties = { payloadFormatIndicator, @@ -375,7 +394,7 @@ const parsePublishOptions = (options: PublishOptions) => { ) } - return { topic, message, protobufPath, protobufMessageName, format, opts: publishOptions } + return { topic, message, schemaOptions, format, opts: publishOptions } } const parseSubscribeOptions = (options: SubscribeOptions) => { diff --git a/cli/src/utils/protobuf.ts b/cli/src/utils/protobuf.ts index 1a071b984..a4d5c028e 100644 --- a/cli/src/utils/protobuf.ts +++ b/cli/src/utils/protobuf.ts @@ -32,7 +32,7 @@ export const deserializeBufferToProtobuf = ( protobufPath: string, protobufMessageName: string, needFormat: FormatType | undefined, -): any => { +): string | Buffer => { try { const root = protobuf.loadSync(protobufPath) const Message = root.lookupType(protobufMessageName) @@ -45,7 +45,7 @@ export const deserializeBufferToProtobuf = ( if (needFormat) { return Buffer.from(JSON.stringify(MessageData.toJSON())) } - return MessageData + return JSON.stringify(MessageData.toJSON()) } catch (error: unknown) { let err = transformPBJSError(error as Error) logWrapper.fail(err.message.split('\n')[0]) diff --git a/cli/yarn.lock b/cli/yarn.lock index 61277fab6..0004884f4 100644 --- a/cli/yarn.lock +++ b/cli/yarn.lock @@ -317,6 +317,11 @@ argparse@^2.0.1: resolved "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz" integrity sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q== +avsc@^5.7.7: + version "5.7.7" + resolved "https://registry.yarnpkg.com/avsc/-/avsc-5.7.7.tgz#8d1b5fd85904cc96a1e439450633ff33f4aff57b" + integrity sha512-9cYNccliXZDByFsFliVwk5GvTq058Fj513CiR4E60ndDwmuXzTJEp/Bp8FyuRmGyYupLjHLs+JA9/CBoVS4/NQ== + balanced-match@^1.0.0: version "1.0.2" resolved "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz" From 123927bed37f7cbc45bf615f20ef6823d7d3bd45 Mon Sep 17 00:00:00 2001 From: Last Date: Thu, 22 Aug 2024 12:13:34 +0800 Subject: [PATCH 2/3] fix(cli): improve type definition add JSON validation fix typo & doc comment --- cli/src/index.ts | 2 +- cli/src/lib/pub.ts | 17 ++++++++--------- cli/src/lib/sub.ts | 15 +++++---------- cli/src/types/global.d.ts | 6 +----- cli/src/utils/avro.ts | 15 +++++++++++++++ cli/src/utils/parse.ts | 6 +----- 6 files changed, 31 insertions(+), 30 deletions(-) diff --git a/cli/src/index.ts b/cli/src/index.ts index 0bbd074bb..789941abb 100755 --- a/cli/src/index.ts +++ b/cli/src/index.ts @@ -247,7 +247,7 @@ export class Commander { ) .option( '-Ap, --avsc-path ', - 'the path to the .avsc file that defines the avro schema for AVRO decoding', + 'the path to the .avsc file that defines the avro schema for AVRO encoding', parseFileRead, ) .option('--debug', 'enable debug mode for MQTT.js', false) diff --git a/cli/src/lib/pub.ts b/cli/src/lib/pub.ts index 4de8f2aa6..82334ab19 100644 --- a/cli/src/lib/pub.ts +++ b/cli/src/lib/pub.ts @@ -24,7 +24,7 @@ import { loadSimulator } from '../utils/simulate' * encapsulate the message into a protobuf format. If these settings are absent, * the message remains unchanged. * Flow: - * Input Message -> [Format Conversion] -> [Protobuf Serialization] -> Output Message + * Input Message -> [Format Conversion] -> Schema [Protobuf, Avro] -> Output Message * @param {string | Buffer} message - The message to be processed. * @param {SchemaOptions} [schemaOptions] - Options for schema-based encoding * @param {FormatType} [format] - The format to convert the message to. @@ -32,7 +32,7 @@ import { loadSimulator } from '../utils/simulate' */ const processPublishMessage = ( message: string | Buffer, - schemaOptions: SchemaOptions, + schemaOptions?: SchemaOptions, format?: FormatType, ): Buffer | string => { const convertMessageFormat = (msg: string | Buffer): string | Buffer => { @@ -44,10 +44,9 @@ const processPublishMessage = ( } const serializeWithSchema = (msg: string | Buffer): string | Buffer => { - switch (schemaOptions.type) { - case 'none': - return msg + if (!schemaOptions) return msg + switch (schemaOptions.type) { case 'protobuf': return serializeProtobufToBuffer(msg, schemaOptions.protobufPath, schemaOptions.protobufMessageName) @@ -67,8 +66,8 @@ const send = ( pubOpts: { topic: string message: string | Buffer - schemaOptions: SchemaOptions - format: FormatType | undefined + schemaOptions?: SchemaOptions + format?: FormatType opts: IClientPublishOptions }, maximumReconnectTimes: number, @@ -131,8 +130,8 @@ const multiSend = ( pubOpts: { topic: string message: string | Buffer - schemaOptions: SchemaOptions - format: FormatType | undefined + schemaOptions?: SchemaOptions + format?: FormatType opts: IClientPublishOptions }, maximumReconnectTimes: number, diff --git a/cli/src/lib/sub.ts b/cli/src/lib/sub.ts index 4a117c5d2..4ca35b2e0 100644 --- a/cli/src/lib/sub.ts +++ b/cli/src/lib/sub.ts @@ -16,7 +16,7 @@ import { deserializeBufferToAvro } from '../utils/avro' * 1. Protobuf Deserialization --> Utilized if both protobuf path and message name are defined, otherwise message passes as is. * 2. Format Conversion --> Engaged if a format is defined, converting the message accordingly; if not defined, message passes unchanged. * Flow: - * payload -> [Protobuf Deserialization] -> [Format Conversion] -> Processed Message + * payload -> Schema [Protobuf, Avro] -> [Format Conversion] -> Processed Message * @param payload - The message payload to be processed. * @param {SchemaOptions} [schemaOptions] - Options for schema-based encoding * @param format - The format of the payload. @@ -24,7 +24,7 @@ import { deserializeBufferToAvro } from '../utils/avro' */ const processReceivedMessage = ( payload: Buffer, - schemaOptions: SchemaOptions, + schemaOptions?: SchemaOptions, format?: FormatType, ): string | Buffer => { let message: string | Buffer = payload @@ -37,10 +37,9 @@ const processReceivedMessage = ( } const deserializeWithSchema = (msg: string | Buffer): string | Buffer => { - switch (schemaOptions.type) { - case 'none': - return msg + if (!schemaOptions) return msg + switch (schemaOptions.type) { case 'protobuf': return deserializeBufferToProtobuf( payload, @@ -160,7 +159,7 @@ const sub = (options: SubscribeOptions) => { client.on('message', (topic, payload, packet) => { const { format, protobufPath, protobufMessageName, avscPath, fileSave, fileWrite, delimiter } = options - let schemaOptions: SchemaOptions + let schemaOptions: SchemaOptions | undefined if (protobufPath && protobufMessageName) { schemaOptions = { type: 'protobuf', @@ -172,10 +171,6 @@ const sub = (options: SubscribeOptions) => { type: 'avro', avscPath, } - } else { - schemaOptions = { - type: 'none', - } } const msgData: MsgItem[] = [] diff --git a/cli/src/types/global.d.ts b/cli/src/types/global.d.ts index 1786e18d4..7afb5db73 100644 --- a/cli/src/types/global.d.ts +++ b/cli/src/types/global.d.ts @@ -67,11 +67,7 @@ declare global { avscPath: string } - interface NoSchema { - type: 'none' - } - - type SchemaOptions = ProtobufSchemaOptions | AvroSchemaOptions | NoSchema + type SchemaOptions = ProtobufSchemaOptions | AvroSchemaOptions interface PublishOptions extends ConnectOptions { topic: string diff --git a/cli/src/utils/avro.ts b/cli/src/utils/avro.ts index 263454d05..9aa658123 100644 --- a/cli/src/utils/avro.ts +++ b/cli/src/utils/avro.ts @@ -12,6 +12,14 @@ const getAvroType = (schemaPath: string): avro.Type => { try { const schemaStr = fs.readFileSync(schemaPath, 'utf-8') + + try { + JSON.parse(schemaStr) + } catch (err: unknown) { + logWrapper.fail(`Schema not following JSON format: ${(err as Error).message}`) + process.exit(1) + } + const type = avro.Type.forSchema(JSON.parse(schemaStr)) // cache the parsed schema @@ -29,6 +37,13 @@ export const serializeAvroToBuffer = (raw: string | Buffer, avscSchemaPath: stri let rawMessage = raw.toString('utf-8') + try { + JSON.parse(rawMessage) + } catch (err: unknown) { + logWrapper.fail(`Invalid JSON input: ${(err as Error).message}`) + process.exit(1) + } + try { const serializedMessage = type.toBuffer(JSON.parse(rawMessage)) return Buffer.from(serializedMessage) diff --git a/cli/src/utils/parse.ts b/cli/src/utils/parse.ts index 6af813cec..9f202cba3 100644 --- a/cli/src/utils/parse.ts +++ b/cli/src/utils/parse.ts @@ -359,7 +359,7 @@ const parsePublishOptions = (options: PublishOptions) => { dup, } - let schemaOptions: SchemaOptions + let schemaOptions: SchemaOptions | undefined if (protobufPath && protobufMessageName) { schemaOptions = { type: 'protobuf', @@ -371,10 +371,6 @@ const parsePublishOptions = (options: PublishOptions) => { type: 'avro', avscPath, } - } else { - schemaOptions = { - type: 'none', - } } if (options.mqttVersion === 5) { From edab96fac12c900f4882355d5a39b41ddb46d9f4 Mon Sep 17 00:00:00 2001 From: Last Date: Fri, 23 Aug 2024 13:37:15 +0800 Subject: [PATCH 3/3] improve(cli): helper function for extracting schema options --- cli/src/lib/sub.ts | 16 ++-------------- cli/src/utils/parse.ts | 36 +++++++++++++++++++++++------------- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/cli/src/lib/sub.ts b/cli/src/lib/sub.ts index 4ca35b2e0..09b5a2e9d 100644 --- a/cli/src/lib/sub.ts +++ b/cli/src/lib/sub.ts @@ -1,6 +1,6 @@ import * as mqtt from 'mqtt' import logWrapper, { Signale, msgLog, basicLog, benchLog, singaleConfig, signale } from '../utils/logWrapper' -import { parseConnectOptions, parseSubscribeOptions, checkTopicExists } from '../utils/parse' +import { parseConnectOptions, parseSubscribeOptions, parseSchemaOptions, checkTopicExists } from '../utils/parse' import delay from '../utils/delay' import convertPayload from '../utils/convertPayload' import { handleSaveOptions, handleLoadOptions } from '../utils/options' @@ -159,19 +159,7 @@ const sub = (options: SubscribeOptions) => { client.on('message', (topic, payload, packet) => { const { format, protobufPath, protobufMessageName, avscPath, fileSave, fileWrite, delimiter } = options - let schemaOptions: SchemaOptions | undefined - if (protobufPath && protobufMessageName) { - schemaOptions = { - type: 'protobuf', - protobufPath, - protobufMessageName, - } - } else if (avscPath) { - schemaOptions = { - type: 'avro', - avscPath, - } - } + const schemaOptions: SchemaOptions | undefined = parseSchemaOptions(protobufPath, protobufMessageName, avscPath) const msgData: MsgItem[] = [] diff --git a/cli/src/utils/parse.ts b/cli/src/utils/parse.ts index 9f202cba3..f37fefd9d 100644 --- a/cli/src/utils/parse.ts +++ b/cli/src/utils/parse.ts @@ -141,6 +141,27 @@ const parseFormat = (value: string) => { return value } +const parseSchemaOptions = ( + protobufPath?: string, + protobufMessageName?: string, + avscPath?: string, +): SchemaOptions | undefined => { + if (protobufPath && protobufMessageName) { + return { + type: 'protobuf', + protobufPath, + protobufMessageName, + } + } else if (avscPath) { + return { + type: 'avro', + avscPath, + } + } else { + return undefined + } +} + const parseOutputMode = (value: string) => { if (!['clean', 'default'].includes(value)) { logWrapper.fail('Not a valid output mode.') @@ -359,19 +380,7 @@ const parsePublishOptions = (options: PublishOptions) => { dup, } - let schemaOptions: SchemaOptions | undefined - if (protobufPath && protobufMessageName) { - schemaOptions = { - type: 'protobuf', - protobufPath, - protobufMessageName, - } - } else if (avscPath) { - schemaOptions = { - type: 'avro', - avscPath, - } - } + const schemaOptions: SchemaOptions | undefined = parseSchemaOptions(protobufPath, protobufMessageName, avscPath) if (options.mqttVersion === 5) { const properties = { @@ -446,6 +455,7 @@ export { parseFileSave, parseFileWrite, parseFormat, + parseSchemaOptions, parseOutputMode, parseConnectOptions, parsePublishOptions,