Skip to content

Commit

Permalink
fix(cli): support pub muliti lines on pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
ysfscream committed Dec 4, 2024
1 parent 4039ead commit dcf74a8
Showing 1 changed file with 49 additions and 4 deletions.
53 changes: 49 additions & 4 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,46 @@ const multiSend = (
})
}

const handlePipedMultiline = (connOpts: IClientOptions, pubOpts: { topic: string; opts: IClientPublishOptions }) => {
const client = mqtt.connect(connOpts)
let messageQueue: string[] = []
let publishedCount = 0

process.stdin.pipe(split2()).on('data', (chunk) => {
const message = chunk.toString()
if (message.length > 0) {
messageQueue.push(message)
}
})

client.on('connect', () => {
basicLog.connected()
if (messageQueue.length > 0) {
logWrapper.await(`Publishing ${messageQueue.length} messages...`)
messageQueue.forEach((message) => {
client.publish(pubOpts.topic, Buffer.from(message), pubOpts.opts, (err) => {
if (err) {
basicLog.error(err)
} else {
publishedCount++
if (publishedCount === messageQueue.length) {
logWrapper.success(`Successfully published ${publishedCount} messages`)
}
}
})
})
setTimeout(() => {
client.end()
}, 1000)
}
})

client.on('error', (err) => {
basicLog.error(err)
client.end()
})
}

const handleFileRead = (filePath: string) => {
try {
basicLog.fileReading()
Expand Down Expand Up @@ -231,16 +271,21 @@ const pub = (options: PublishOptions) => {
const pubOpts = parsePublishOptions(options)

const handleStdin = () => {
if (options.multiline) {
multiSend(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes)
} else {
process.stdin.pipe(
// One line mode
if (!options.multiline) {
return process.stdin.pipe(
concat((data) => {
pubOpts.message = data
send(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes)
}),
)
}

// Multiline mode
const isPiped = !process.stdin.isTTY
return isPiped
? handlePipedMultiline(connOpts, pubOpts)
: multiSend(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes)
}

if (options.fileRead) {
Expand Down

0 comments on commit dcf74a8

Please sign in to comment.