Skip to content

Commit

Permalink
feat(cil): support for reading messages from files in bench
Browse files Browse the repository at this point in the history
  • Loading branch information
DM1-1 committed Apr 23, 2024
1 parent 25412b0 commit a6cb136
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
4 changes: 4 additions & 0 deletions cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,10 @@ export class Commander {
'--config [PATH]',
'load the parameters from the local configuration file, which supports json and yaml format, default path is ./mqttx-cli-config.json',
)
.option(
'--file-read <PATH>',
'read the message body from the file',
)
.allowUnknownOption(false)
.action(simulatePub)

Expand Down
34 changes: 32 additions & 2 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ const pub = (options: PublishOptions) => {
}

try {
signale.pending('Reading file...')
basicLog.fileReading()
const bufferData = readFile(filePath)
if (bufferData.length >= MQTT_SINGLE_MESSAGE_BYTE_LIMIT) {
signale.error('File size over 256MB not supported by MQTT.')
process.exit(1)
}
signale.success('File read successfully')
basicLog.fileReadSuccess()

pubOpts.message = bufferData
send(config, connOpts, pubOpts)
Expand Down Expand Up @@ -237,10 +237,37 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
topic,
clientId,
message,
fileRead,
verbose,
maximumReconnectTimes,
} = options

const handleFileRead = () => {
const filePath = processPath(fileRead!)
if (!filePath) {
signale.error('File path is required when reading from file.')
process.exit(1)
}

try {
basicLog.fileReading()
const bufferData = readFile(filePath)
if (bufferData.length >= MQTT_SINGLE_MESSAGE_BYTE_LIMIT) {
signale.error('File size over 256MB not supported by MQTT.')
process.exit(1)
}
basicLog.fileReadSuccess()
return bufferData
} catch(error) {
signale.error('Failed to read file:', error)
process.exit(1)
}
}
let fileData: Buffer | string
if(fileRead) {
fileData = handleFileRead()
}

checkTopicExists(topic, commandType)

const connOpts = parseConnectOptions(options, 'pub')
Expand Down Expand Up @@ -327,6 +354,9 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
}
publishMessage = simulationResult.message
}
if(fileRead) {
publishMessage = fileData
}
client.publish(publishTopic, publishMessage, pubOpts.opts, (err) => {
inFlightMessageCount -= 1
if (err) {
Expand Down
2 changes: 1 addition & 1 deletion cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ declare global {

type OmitSubscribeOptions = Omit<
SubscribeOptions,
'format' | 'outputMode' | 'protobufPath' | 'protobufMessageName' | 'debug'
'format' | 'outputMode' | 'protobufPath' | 'protobufMessageName' | 'debug' | 'fileWrite' | 'fileSave'
>

interface BenchSubscribeOptions extends OmitSubscribeOptions {
Expand Down
4 changes: 3 additions & 1 deletion cli/src/utils/signale.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ const basicLog = {
const { reasonCode } = packet
const reason = reasonCode === 0 ? 'Normal disconnection' : getErrorReason(reasonCode)
signale.warn(`${clientId ? `Client ID: ${clientId}, ` : ''}The Broker has actively disconnected, Reason: ${reason} (Code: ${reasonCode})`)
}
},
fileReading: () => signale.await('Reading file...'),
fileReadSuccess: () => signale.success('Read file successfully'),
}

const benchLog = {
Expand Down

0 comments on commit a6cb136

Please sign in to comment.