Skip to content

Commit

Permalink
fix(cli): improve type definition
Browse files Browse the repository at this point in the history
add JSON validation
fix typo & doc comment
  • Loading branch information
LAST7 authored and Red-Asuka committed Aug 23, 2024
1 parent da06148 commit d28945b
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 30 deletions.
2 changes: 1 addition & 1 deletion cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ export class Commander {
)
.option(
'-Ap, --avsc-path <PATH>',
'the path to the .avsc file that defines the avro schema for AVRO decoding',
'the path to the .avsc file that defines the avro schema for AVRO encoding',
parseFileRead,
)
.option('--debug', 'enable debug mode for MQTT.js', false)
Expand Down
17 changes: 8 additions & 9 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import { loadSimulator } from '../utils/simulate'
* encapsulate the message into a protobuf format. If these settings are absent,
* the message remains unchanged.
* Flow:
* Input Message -> [Format Conversion] -> [Protobuf Serialization] -> Output Message
* Input Message -> [Format Conversion] -> Schema [Protobuf, Avro] -> Output Message
* @param {string | Buffer} message - The message to be processed.
* @param {SchemaOptions} [schemaOptions] - Options for schema-based encoding
* @param {FormatType} [format] - The format to convert the message to.
* @returns {Buffer | string} - The processed message.
*/
const processPublishMessage = (
message: string | Buffer,
schemaOptions: SchemaOptions,
schemaOptions?: SchemaOptions,
format?: FormatType,
): Buffer | string => {
const convertMessageFormat = (msg: string | Buffer): string | Buffer => {
Expand All @@ -44,10 +44,9 @@ const processPublishMessage = (
}

const serializeWithSchema = (msg: string | Buffer): string | Buffer => {
switch (schemaOptions.type) {
case 'none':
return msg
if (!schemaOptions) return msg

switch (schemaOptions.type) {
case 'protobuf':
return serializeProtobufToBuffer(msg, schemaOptions.protobufPath, schemaOptions.protobufMessageName)

Expand All @@ -67,8 +66,8 @@ const send = (
pubOpts: {
topic: string
message: string | Buffer
schemaOptions: SchemaOptions
format: FormatType | undefined
schemaOptions?: SchemaOptions
format?: FormatType
opts: IClientPublishOptions
},
maximumReconnectTimes: number,
Expand Down Expand Up @@ -131,8 +130,8 @@ const multiSend = (
pubOpts: {
topic: string
message: string | Buffer
schemaOptions: SchemaOptions
format: FormatType | undefined
schemaOptions?: SchemaOptions
format?: FormatType
opts: IClientPublishOptions
},
maximumReconnectTimes: number,
Expand Down
15 changes: 5 additions & 10 deletions cli/src/lib/sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ import { deserializeBufferToAvro } from '../utils/avro'
* 1. Protobuf Deserialization --> Utilized if both protobuf path and message name are defined, otherwise message passes as is.
* 2. Format Conversion --> Engaged if a format is defined, converting the message accordingly; if not defined, message passes unchanged.
* Flow:
* payload -> [Protobuf Deserialization] -> [Format Conversion] -> Processed Message
* payload -> Schema [Protobuf, Avro] -> [Format Conversion] -> Processed Message
* @param payload - The message payload to be processed.
* @param {SchemaOptions} [schemaOptions] - Options for schema-based encoding
* @param format - The format of the payload.
* @returns The processed message as a string or Buffer.
*/
const processReceivedMessage = (
payload: Buffer,
schemaOptions: SchemaOptions,
schemaOptions?: SchemaOptions,
format?: FormatType,
): string | Buffer => {
let message: string | Buffer = payload
Expand All @@ -37,10 +37,9 @@ const processReceivedMessage = (
}

const deserializeWithSchema = (msg: string | Buffer): string | Buffer => {
switch (schemaOptions.type) {
case 'none':
return msg
if (!schemaOptions) return msg

switch (schemaOptions.type) {
case 'protobuf':
return deserializeBufferToProtobuf(
payload,
Expand Down Expand Up @@ -160,7 +159,7 @@ const sub = (options: SubscribeOptions) => {
client.on('message', (topic, payload, packet) => {
const { format, protobufPath, protobufMessageName, avscPath, fileSave, fileWrite, delimiter } = options

let schemaOptions: SchemaOptions
let schemaOptions: SchemaOptions | undefined
if (protobufPath && protobufMessageName) {
schemaOptions = {
type: 'protobuf',
Expand All @@ -172,10 +171,6 @@ const sub = (options: SubscribeOptions) => {
type: 'avro',
avscPath,
}
} else {
schemaOptions = {
type: 'none',
}
}

const msgData: MsgItem[] = []
Expand Down
6 changes: 1 addition & 5 deletions cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ declare global {
avscPath: string
}

interface NoSchema {
type: 'none'
}

type SchemaOptions = ProtobufSchemaOptions | AvroSchemaOptions | NoSchema
type SchemaOptions = ProtobufSchemaOptions | AvroSchemaOptions

interface PublishOptions extends ConnectOptions {
topic: string
Expand Down
15 changes: 15 additions & 0 deletions cli/src/utils/avro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ const getAvroType = (schemaPath: string): avro.Type => {

try {
const schemaStr = fs.readFileSync(schemaPath, 'utf-8')

try {
JSON.parse(schemaStr)
} catch (err: unknown) {
logWrapper.fail(`Schema not following JSON format: ${(err as Error).message}`)
process.exit(1)
}

const type = avro.Type.forSchema(JSON.parse(schemaStr))

// cache the parsed schema
Expand All @@ -29,6 +37,13 @@ export const serializeAvroToBuffer = (raw: string | Buffer, avscSchemaPath: stri

let rawMessage = raw.toString('utf-8')

try {
JSON.parse(rawMessage)
} catch (err: unknown) {
logWrapper.fail(`Invalid JSON input: ${(err as Error).message}`)
process.exit(1)
}

try {
const serializedMessage = type.toBuffer(JSON.parse(rawMessage))
return Buffer.from(serializedMessage)
Expand Down
6 changes: 1 addition & 5 deletions cli/src/utils/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ const parsePublishOptions = (options: PublishOptions) => {
dup,
}

let schemaOptions: SchemaOptions
let schemaOptions: SchemaOptions | undefined
if (protobufPath && protobufMessageName) {
schemaOptions = {
type: 'protobuf',
Expand All @@ -371,10 +371,6 @@ const parsePublishOptions = (options: PublishOptions) => {
type: 'avro',
avscPath,
}
} else {
schemaOptions = {
type: 'none',
}
}

if (options.mqttVersion === 5) {
Expand Down

0 comments on commit d28945b

Please sign in to comment.