Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): add avro support #1735

Merged
merged 3 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
},
"dependencies": {
"@inquirer/prompts": "^5.0.3",
"avsc": "^5.7.7",
"cbor": "^9.0.1",
"chalk": "~4.1.2",
"cli-table3": "^0.6.3",
Expand Down
10 changes: 10 additions & 0 deletions cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ export class Commander {
'-Pmn, --protobuf-message-name <NAME>',
'the name of the protobuf message type (must exist in the .proto file)',
)
.option(
'-Ap, --avsc-path <PATH>',
'the path to the .avsc file that defines the avro schema for AVRO encoding',
parseFileRead,
)
.option('--debug', 'enable debug mode for MQTT.js', false)
.allowUnknownOption(false)
.action(pub)
Expand Down Expand Up @@ -367,6 +372,11 @@ export class Commander {
'-Pmn, --protobuf-message-name <NAME>',
'the name of the protobuf message type (must exist in the .proto file)',
)
.option(
'-Ap, --avsc-path <PATH>',
'the path to the .avsc file that defines the avro schema for AVRO decoding',
parseFileRead,
)
.option('--debug', 'enable debug mode for MQTT.js', false)
.allowUnknownOption(false)
.action(sub)
Expand Down
46 changes: 24 additions & 22 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import logWrapper, { basicLog, benchLog, Signale, signale, simulateLog, singaleC
import { handleLoadOptions, handleSaveOptions } from '../utils/options'
import { checkScenarioExists, checkTopicExists, parseConnectOptions, parsePublishOptions } from '../utils/parse'
import { serializeProtobufToBuffer } from '../utils/protobuf'
import { serializeAvroToBuffer } from '../utils/avro'
import { loadSimulator } from '../utils/simulate'

/**
Expand All @@ -23,37 +24,40 @@ import { loadSimulator } from '../utils/simulate'
* encapsulate the message into a protobuf format. If these settings are absent,
* the message remains unchanged.
* Flow:
* Input Message -> [Format Conversion] -> [Protobuf Serialization] -> Output Message
* Input Message -> [Format Conversion] -> Schema [Protobuf, Avro] -> Output Message
* @param {string | Buffer} message - The message to be processed.
* @param {string} [protobufPath] - The path to the protobuf definition.
* @param {string} [protobufMessageName] - The name of the protobuf message.
* @param {SchemaOptions} [schemaOptions] - Options for schema-based encoding
* @param {FormatType} [format] - The format to convert the message to.
* @returns {Buffer | string} - The processed message.
*/
const processPublishMessage = (
message: string | Buffer,
protobufPath?: string,
protobufMessageName?: string,
schemaOptions?: SchemaOptions,
format?: FormatType,
): Buffer | string => {
Copy link
Member

@Red-Asuka Red-Asuka Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that schemaOptions should be an optional parameter, and that schemaOptions and format should be combined into a single options object.

The code could be refactored like this:

const processReceivedMessage = (
  payload: Buffer,
  options?: {
    format?: "base64" | "json" | "hex" | "cbor" | "binary"
    schema?: 'protobuf' | 'avro'
    protobufPath?: string
    protobufMessageName?: string
    avscPath?: string
  },
): string | Buffer => {

The rationale is that these parameters are all related to processing the payload and are optional. By merging them into a single options object, we eliminate concerns about argument order and make the code more intuitive and easier to maintain.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your effort on reviewing my code. But I have a different opinion on the design of this parameter.

  1. format and schemaOptions serve different purposes within the processing pipeline.(They are used in different functions)
  2. merging them into one parameter would bypass the restriction set in type SchemaOptions, which I believe is kind of unsafe.

But I do understand your suggestion on simplifying the signature of this function, maybe we can come to a middle ground which retains safety while simplifies the signature.

const convertMessageFormat = (msg: string | Buffer): Buffer | string => {
const convertMessageFormat = (msg: string | Buffer): string | Buffer => {
if (!format) {
return msg
}
const bufferMsg = Buffer.isBuffer(msg) ? msg : Buffer.from(msg.toString())
return convertPayload(bufferMsg, format, 'encode')
}

const serializeProtobufMessage = (msg: string | Buffer): Buffer | string => {
if (protobufPath && protobufMessageName) {
return serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName)
const serializeWithSchema = (msg: string | Buffer): string | Buffer => {
if (!schemaOptions) return msg

switch (schemaOptions.type) {
case 'protobuf':
return serializeProtobufToBuffer(msg, schemaOptions.protobufPath, schemaOptions.protobufMessageName)

case 'avro':
return serializeAvroToBuffer(msg, schemaOptions.avscPath)
}
return msg
}

const pipeline = [convertMessageFormat, serializeProtobufMessage]
const pipeline = [convertMessageFormat, serializeWithSchema]

return pipeline.reduce((msg, transformer) => transformer(msg), message) as Buffer
return pipeline.reduce((msg: string | Buffer, transformer) => transformer(msg), message) as Buffer
}

const send = (
Expand All @@ -62,9 +66,8 @@ const send = (
pubOpts: {
topic: string
message: string | Buffer
protobufPath: string | undefined
protobufMessageName: string | undefined
format: FormatType | undefined
schemaOptions?: SchemaOptions
format?: FormatType
opts: IClientPublishOptions
},
maximumReconnectTimes: number,
Expand All @@ -77,9 +80,9 @@ const send = (
client.on('connect', () => {
retryTimes = 0
basicLog.connected()
const { topic, message, protobufPath, protobufMessageName, format } = pubOpts
const { topic, message, schemaOptions, format } = pubOpts
basicLog.publishing()
const publishMessage = processPublishMessage(message, protobufPath, protobufMessageName, format)
const publishMessage = processPublishMessage(message, schemaOptions, format)
client.publish(topic, publishMessage, pubOpts.opts, (err) => {
if (err) {
basicLog.error(err)
Expand Down Expand Up @@ -127,9 +130,8 @@ const multiSend = (
pubOpts: {
topic: string
message: string | Buffer
protobufPath: string | undefined
protobufMessageName: string | undefined
format: FormatType | undefined
schemaOptions?: SchemaOptions
format?: FormatType
opts: IClientPublishOptions
},
maximumReconnectTimes: number,
Expand All @@ -143,10 +145,10 @@ const multiSend = (
})
let count = 0
sender._write = (line, _enc, cb) => {
const { topic, opts, protobufPath, protobufMessageName, format } = pubOpts
const { topic, opts, schemaOptions, format } = pubOpts
count++
let omitTopic = opts.properties?.topicAlias && count >= 2
const publishMessage = processPublishMessage(line.trim(), protobufPath, protobufMessageName, format)
const publishMessage = processPublishMessage(line.trim(), schemaOptions, format)
client.publish(omitTopic ? '' : topic, publishMessage, opts, cb)
}

Expand Down
52 changes: 36 additions & 16 deletions cli/src/lib/sub.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,61 @@
import * as mqtt from 'mqtt'
import logWrapper, { Signale, msgLog, basicLog, benchLog, singaleConfig, signale } from '../utils/logWrapper'
import { parseConnectOptions, parseSubscribeOptions, checkTopicExists } from '../utils/parse'
import { parseConnectOptions, parseSubscribeOptions, parseSchemaOptions, checkTopicExists } from '../utils/parse'
import delay from '../utils/delay'
import convertPayload from '../utils/convertPayload'
import { handleSaveOptions, handleLoadOptions } from '../utils/options'
import { writeFile, appendFile, getPathExtname, createNextNumberedFileName } from '../utils/fileUtils'
import { deserializeBufferToProtobuf } from '../utils/protobuf'
import isSupportedBinaryFormatForMQTT from '../utils/binaryFormats'
import * as Debug from 'debug'
import { deserializeBufferToAvro } from '../utils/avro'

/**
*
* Pipeline for processing incoming messages, following two potential steps:
* 1. Protobuf Deserialization --> Utilized if both protobuf path and message name are defined, otherwise message passes as is.
* 2. Format Conversion --> Engaged if a format is defined, converting the message accordingly; if not defined, message passes unchanged.
* Flow:
* payload -> [Protobuf Deserialization] -> [Format Conversion] -> Processed Message
* payload -> Schema [Protobuf, Avro] -> [Format Conversion] -> Processed Message
* @param payload - The message payload to be processed.
* @param protobufPath - The path to the Protobuf definition file.
* @param protobufMessageName - The name of the Protobuf message.
* @param {SchemaOptions} [schemaOptions] - Options for schema-based encoding
* @param format - The format of the payload.
* @returns The processed message as a string or Buffer.
*/
const processReceivedMessage = (
payload: Buffer,
protobufPath?: string,
protobufMessageName?: string,
schemaOptions?: SchemaOptions,
format?: FormatType,
): string | Buffer => {
let message: string | Buffer = payload
const pipeline = [
(msg: Buffer) =>
protobufPath && protobufMessageName
? deserializeBufferToProtobuf(msg, protobufPath, protobufMessageName, format)
: msg,
(msg: Buffer) => (format ? convertPayload(msg, format, 'decode') : msg),
]

message = pipeline.reduce((msg, transformer) => transformer(msg), message)
const convertMessageFormat = (msg: string | Buffer): string | Buffer => {
if (!format) {
return msg
}
return convertPayload(msg, format, 'decode')
}

const deserializeWithSchema = (msg: string | Buffer): string | Buffer => {
if (!schemaOptions) return msg

switch (schemaOptions.type) {
case 'protobuf':
return deserializeBufferToProtobuf(
payload,
schemaOptions.protobufPath,
schemaOptions.protobufMessageName,
format,
)

case 'avro':
return deserializeBufferToAvro(payload, schemaOptions.avscPath, format)
}
}

const pipeline = [deserializeWithSchema, convertMessageFormat]

message = pipeline.reduce((msg: string | Buffer, transformer) => transformer(msg), message)

if (Buffer.isBuffer(message) && format !== 'binary') {
message = message.toString('utf-8')
Expand Down Expand Up @@ -139,11 +157,13 @@ const sub = (options: SubscribeOptions) => {
client.on('connect', subscribeToTopics)

client.on('message', (topic, payload, packet) => {
const { format, protobufPath, protobufMessageName, fileSave, fileWrite, delimiter } = options
const { format, protobufPath, protobufMessageName, avscPath, fileSave, fileWrite, delimiter } = options

const schemaOptions: SchemaOptions | undefined = parseSchemaOptions(protobufPath, protobufMessageName, avscPath)

const msgData: MsgItem[] = []

const receivedMessage = processReceivedMessage(payload, protobufPath, protobufMessageName, format)
const receivedMessage = processReceivedMessage(payload, schemaOptions, format)

const savePath = fileSave ? createNextNumberedFileName(fileSave) : fileWrite
if (savePath) {
Expand Down
17 changes: 16 additions & 1 deletion cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ declare global {
debug?: boolean
}

interface ProtobufSchemaOptions {
type: 'protobuf'
protobufPath: string
protobufMessageName: string
}

interface AvroSchemaOptions {
type: 'avro'
avscPath: string
}

type SchemaOptions = ProtobufSchemaOptions | AvroSchemaOptions

interface PublishOptions extends ConnectOptions {
topic: string
message: string | Buffer
Expand All @@ -76,6 +89,7 @@ declare global {
connUserProperties?: Record<string, string | string[]>
protobufPath?: string
protobufMessageName?: string
avscPath?: string
format?: FormatType
}

Expand All @@ -92,11 +106,12 @@ declare global {
fileWrite?: string
fileSave?: string
delimiter?: string
format?: FormatType
outputMode?: OutputMode
verbose: boolean
protobufPath?: string
protobufMessageName?: string
avscPath?: string
format?: FormatType
}

type OmitConnectOptions = Omit<ConnectOptions, 'debug'>
Expand Down
74 changes: 74 additions & 0 deletions cli/src/utils/avro.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import * as avro from 'avsc'
import * as fs from 'fs'
import logWrapper from './logWrapper'

const schemaCache: { [key: string]: avro.Type } = {}

const getAvroType = (schemaPath: string): avro.Type => {
// first search from cache
if (schemaCache[schemaPath]) {
return schemaCache[schemaPath]
}
ysfscream marked this conversation as resolved.
Show resolved Hide resolved

try {
const schemaStr = fs.readFileSync(schemaPath, 'utf-8')

try {
JSON.parse(schemaStr)
} catch (err: unknown) {
logWrapper.fail(`Schema not following JSON format: ${(err as Error).message}`)
process.exit(1)
}

const type = avro.Type.forSchema(JSON.parse(schemaStr))
LAST7 marked this conversation as resolved.
Show resolved Hide resolved

// cache the parsed schema
schemaCache[schemaPath] = type

return type
} catch (err: unknown) {
logWrapper.fail(`Unable to load avro schema from ${schemaPath}: ${(err as Error).message}`)
process.exit(1)
}
}

export const serializeAvroToBuffer = (raw: string | Buffer, avscSchemaPath: string): Buffer => {
const type: avro.Type = getAvroType(avscSchemaPath)

let rawMessage = raw.toString('utf-8')

try {
JSON.parse(rawMessage)
} catch (err: unknown) {
logWrapper.fail(`Invalid JSON input: ${(err as Error).message}`)
process.exit(1)
}

try {
const serializedMessage = type.toBuffer(JSON.parse(rawMessage))
LAST7 marked this conversation as resolved.
Show resolved Hide resolved
return Buffer.from(serializedMessage)
} catch (err: unknown) {
logWrapper.fail(`Unable to serialize message to avro buffer: ${err}`)
process.exit(1)
}
}

export const deserializeBufferToAvro = (
payload: Buffer,
avscSchemaPath: string,
needFormat?: FormatType,
): string | Buffer => {
const type: avro.Type = getAvroType(avscSchemaPath)

try {
const message = type.fromBuffer(payload)

if (needFormat) {
return Buffer.from(JSON.stringify(message))
}
return JSON.stringify(message)
} catch (err: unknown) {
logWrapper.fail(`Unable to deserialize avro encoded buffer: ${(err as Error).message}`)
process.exit(1)
}
}
Loading
Loading