Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): support multi-threading for the bench conn command #1573

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cli/src/index.ts
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import {
parsePubTopic,
parseFormat,
parseOutputMode,
parseThreads,
} from './utils/parse'
import { conn, benchConn } from './lib/conn'
import { pub, benchPub, simulatePub } from './lib/pub'
@@ -321,6 +322,12 @@ export class Commander {
.description('Create a custom number of connections.')
.option('-c, --count <NUMBER>', 'the number of connections', parseNumber, 1000)
.option('-i, --interval <MILLISECONDS>', 'interval of connecting to the broker', parseNumber, 10)
.option(
'--cores <number>',
'the number of CPU cores to use, default is 1, maximum is the number of CPU cores',
parseThreads,
1,
)
.option('-V, --mqtt-version <5/3.1.1/3.1>', 'the MQTT version', parseMQTTVersion, 5)
.option('-h, --hostname <HOST>', 'the broker host', 'localhost')
.option('-p, --port <PORT>', 'the broker port', parseNumber)
@@ -391,7 +398,7 @@ export class Commander {
.option('-im, --message-interval <MILLISECONDS>', 'interval of publishing messages', parseNumber, 1000)
.option(
'-L, --limit <NUMBER>',
'The maximum number of messages to publish. A value of 0 means no limit on the number of messages',
'the maximum number of messages to publish. A value of 0 means no limit on the number of messages',
parseNumber,
0,
)
188 changes: 133 additions & 55 deletions cli/src/lib/conn.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as mqtt from 'mqtt'
import cluster from 'cluster'
import { Signale, signale, basicLog, benchLog } from '../utils/signale'
import { parseConnectOptions } from '../utils/parse'
import delay from '../utils/delay'
@@ -55,81 +56,158 @@ const conn = (options: ConnectOptions) => {
}

const benchConn = async (options: BenchConnectOptions) => {
const { save, config } = options
if (cluster.isPrimary) {
const { save, config } = options

config && (options = loadConfig('benchConn', config))
config && (options = loadConfig('benchConn', config))

save && saveConfig('benchConn', options)
save && saveConfig('benchConn', options)

const { count, interval, hostname, port, clientId, maximumReconnectTimes } = options
const { count, interval, cores, hostname, port, clientId, maximumReconnectTimes } = options

const connOpts = parseConnectOptions(options, 'conn')
const connOpts = parseConnectOptions(options, 'conn')

let connectedCount = 0
let connectedCount = 0

const isNewConnArray = Array(count).fill(true)
const isNewConnArray = Array(count).fill(true)

const retryTimesArray = Array(count).fill(0)
const retryTimesArray = Array(count).fill(0)

const interactive = new Signale({ interactive: true })
const interactive = new Signale({ interactive: true })

benchLog.start.conn(config, count, interval, hostname, port)
benchLog.start.conn(config, count, interval, hostname, port)

const start = Date.now()
const start = Date.now()

for (let i = 1; i <= count; i++) {
;((i: number, connOpts: mqtt.IClientOptions) => {
const opts = { ...connOpts }
let startConnIndex = 1

opts.clientId = clientId.includes('%i') ? clientId.replaceAll('%i', i.toString()) : `${clientId}_${i}`
for (let i = 0; i < cores; i++) {
const worker = cluster.fork()

const client = mqtt.connect(opts)
let masterMessage: ConnMasterMessage
const connCount = Math.floor(count / cores) + (i < count % cores ? 1 : 0)
const endConnIndex = startConnIndex + connCount - 1

interactive.await('[%d/%d] - Connecting...', connectedCount, count)
masterMessage = {
type: 'init',
data: {
startConnIndex,
endConnIndex,
options,
connOpts,
},
}
worker.send(masterMessage)

client.on('connect', () => {
connectedCount += 1
retryTimesArray[i - 1] = 0
if (isNewConnArray[i - 1]) {
interactive.success('[%d/%d] - Connected', connectedCount, count)
startConnIndex += connCount

if (connectedCount === count) {
const end = Date.now()
signale.info(`Done, total time: ${(end - start) / 1000}s`)
worker.on('message', (msg: ConnWorkerMessage) => {
const { type } = msg
if (type === 'connecting') {
interactive.await('[%d/%d] - Connecting...', connectedCount, count)
}
if (type === 'connected') {
const { clientIndex } = msg.data
connectedCount += 1
retryTimesArray[clientIndex - 1] = 0
if (isNewConnArray[clientIndex - 1]) {
interactive.success('[%d/%d] - Connected', connectedCount, count)

if (connectedCount === count) {
const end = Date.now()
signale.info(`Done, total time: ${(end - start) / 1000}s`)
}
} else {
benchLog.reconnected(connectedCount, count, clientId)
}
} else {
benchLog.reconnected(connectedCount, count, opts.clientId!)
}
})

client.on('error', (err) => {
benchLog.error(connectedCount, count, opts.clientId!, err)
client.end()
})

client.on('reconnect', () => {
retryTimesArray[i - 1] += 1
if (retryTimesArray[i - 1] > maximumReconnectTimes) {
client.end(false, {}, () => {
benchLog.reconnectTimesLimit(connectedCount, count, opts.clientId!)
})
} else {
benchLog.reconnecting(connectedCount, count, opts.clientId!)
isNewConnArray[i - 1] = false
if (type === 'error') {
const {
opts: { clientId },
error,
} = msg.data
benchLog.error(connectedCount, count, clientId!, error)
}
if (type === 'reconnect') {
const {
opts: { clientId },
} = msg.data
retryTimesArray[i - 1] += 1
if (retryTimesArray[i - 1] > maximumReconnectTimes) {
// TODO: Force close connection
// client.end(false, {}, () => {
// benchLog.reconnectTimesLimit(connectedCount, count, clientId!)
// })
} else {
benchLog.reconnecting(connectedCount, count, clientId!)
isNewConnArray[i - 1] = false
}
}
if (type === 'close') {
const {
opts: { clientId },
} = msg.data
connectedCount > 0 && (connectedCount -= 1)
benchLog.close(connectedCount, count, clientId!)
}
if (type === 'disconnect') {
const {
opts: { clientId },
} = msg.data
basicLog.disconnect(clientId!)
}
})

client.on('close', () => {
connectedCount > 0 && (connectedCount -= 1)
benchLog.close(connectedCount, count, opts.clientId!)
})

client.on('disconnect', () => {
basicLog.disconnect(opts.clientId!)
})
})(i, connOpts)

await delay(interval)
}
} else if (cluster.isWorker) {
process.on('message', async (msg: ConnMasterMessage) => {
const { type, data } = msg
if (type === 'init') {
let workerMessage: ConnWorkerMessage
const { startConnIndex, endConnIndex, options, connOpts } = data
const { interval, clientId } = options
for (let i = startConnIndex; i <= endConnIndex; i++) {
;((i: number, connOpts: mqtt.IClientOptions) => {
const opts = { ...connOpts }

opts.clientId = clientId.includes('%i') ? clientId.replaceAll('%i', i.toString()) : `${clientId}_${i}`

const client = mqtt.connect(opts)

workerMessage = { type: 'connecting' }
process.send!(workerMessage)

client.on('connect', () => {
workerMessage = { type: 'connected', data: { clientIndex: i } }
process.send!(workerMessage)
})

client.on('error', (err) => {
workerMessage = { type: 'error', data: { opts, error: err } }
process.send!(workerMessage, undefined, undefined, () => {
client.end()
})
})

client.on('reconnect', () => {
workerMessage = { type: 'reconnect', data: { opts } }
process.send!(workerMessage)
})

client.on('close', () => {
workerMessage = { type: 'close', data: { opts } }
process.send!(workerMessage)
})

client.on('disconnect', () => {
workerMessage = { type: 'disconnect', data: { opts } }
process.send!(workerMessage)
})
})(i, connOpts)

await delay(interval)
}
}
})
}
}

61 changes: 60 additions & 1 deletion cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Faker } from '@faker-js/faker'
import type { IClientOptions } from 'mqtt'

declare global {
type CommandType = 'conn' | 'pub' | 'sub' | 'benchConn' | 'benchPub' | 'benchSub' | 'simulate'
@@ -100,6 +100,7 @@ declare global {
interface BenchConnectOptions extends OmitConnectOptions {
count: number
interval: number
cores: number
}

type OmitPublishOptions = Omit<
@@ -157,6 +158,64 @@ declare global {
interface LsOptions {
scenarios: boolean
}

interface ConnMasterMessage {
type: 'init'
data: {
startConnIndex: number
endConnIndex: number
options: BenchConnectOptions
connOpts: IClientOptions
}
}

interface ConnWorkerMessageConnecting {
type: 'connecting'
}

interface ConnWorkerMessageConnected {
type: 'connected'
data: {
clientIndex: number
}
}

interface ConnWorkerMessageError {
type: 'error'
data: {
opts: IClientOptions
error: Error
}
}

interface ConnWorkerMessageReconnect {
type: 'reconnect'
data: {
opts: IClientOptions
}
}

interface ConnWorkerMessageClose {
type: 'close'
data: {
opts: IClientOptions
}
}

interface ConnWorkerMessageDisconnect {
type: 'disconnect'
data: {
opts: IClientOptions
}
}

type ConnWorkerMessage =
| ConnWorkerMessageConnecting
| ConnWorkerMessageConnected
| ConnWorkerMessageError
| ConnWorkerMessageReconnect
| ConnWorkerMessageClose
| ConnWorkerMessageDisconnect
}

export {}
13 changes: 13 additions & 0 deletions cli/src/utils/parse.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as fs from 'fs'
import os from 'os'
import signale from '../utils/signale'
import { getSpecialTypesOption } from '../utils/generator'

@@ -113,6 +114,17 @@ const parseOutputMode = (value: string) => {
return value
}

const parseThreads = (value: string) => {
const threads = parseInt(value, 10)
const cpuCount = os.cpus().length

if (isNaN(threads) || threads < 1 || threads > cpuCount) {
throw new Error(`Invalid number of threads. Please provide a value between 1 and ${cpuCount}.`)
}

return threads
}

const checkScenarioExists = (name?: string, file?: string) => {
if (!name && !file) {
console.log(
@@ -382,6 +394,7 @@ export {
parsePubTopic,
parseFormat,
parseOutputMode,
parseThreads,
parseConnectOptions,
parsePublishOptions,
parseSubscribeOptions,
Loading