diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java index 8771339b767..c4a6162a5f5 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java @@ -74,10 +74,11 @@ public static void main(String[] args) throws MQClientException { final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false); final boolean aclEnable = getOptionValue(commandLine, 'a', false); final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c')); + final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000; System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, " + - "aclEnable: %s%n compressEnable: %s%n", - topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress); + "aclEnable: %s%n compressEnable: %s, reportInterval: %d%n", + topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress, reportInterval); StringBuilder sb = new StringBuilder(messageSize); for (int i = 0; i < messageSize; i++) { @@ -85,7 +86,7 @@ public static void main(String[] args) throws MQClientException { } msgBody = sb.toString().getBytes(StandardCharsets.UTF_8); - final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer(); + final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer(reportInterval); statsBenchmark.start(); RPCHook rpcHook = null; @@ -253,6 +254,10 @@ public static Options buildCommandlineOptions(final Options options) { opt.setRequired(false); options.addOption(opt); + opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -359,6 +364,12 @@ class StatsBenchmarkBatchProducer { private final LinkedList snapshotList = new LinkedList<>(); + private final int reportInterval; + + public StatsBenchmarkBatchProducer(int reportInterval) { + this.reportInterval = reportInterval; + } + public Long[] createSnapshot() { Long[] snap = new Long[] { System.currentTimeMillis(), @@ -432,7 +443,7 @@ public void run() { e.printStackTrace(); } } - }, 10000, 10000, TimeUnit.MILLISECONDS); + }, reportInterval, reportInterval, TimeUnit.MILLISECONDS); } public void shutdown() { diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index 87388edc9a6..57270fcd006 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -68,14 +68,15 @@ public static void main(String[] args) throws MQClientException, IOException { final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m')); final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); final boolean clientRebalanceEnable = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c')) : true; + final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000; String group = groupPrefix; if (Boolean.parseBoolean(isSuffixEnable)) { group = groupPrefix + "_" + (System.currentTimeMillis() % 100); } - System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n", - topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable); + System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s, reportInterval: %d%n", + topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable, reportInterval); final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); @@ -124,7 +125,7 @@ public void run() { e.printStackTrace(); } } - }, 10000, 10000, TimeUnit.MILLISECONDS); + }, reportInterval, reportInterval, TimeUnit.MILLISECONDS); RPCHook rpcHook = null; if (aclEnable) { @@ -235,6 +236,10 @@ public static Options buildCommandlineOptions(final Options options) { opt.setRequired(false); options.addOption(opt); + opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000"); + opt.setRequired(false); + options.addOption(opt); + return options; } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index ab474fcf4fd..480d16b7581 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -82,12 +82,13 @@ public static void main(String[] args) throws MQClientException { final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y')); final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64; final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c')); + final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000; System.out.printf("topic: %s, threadCount: %d, messageSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, " + "traceEnable: %s, aclEnable: %s, messageQuantity: %d, delayEnable: %s, delayLevel: %s, " + - "asyncEnable: %s%n compressEnable: %s%n", + "asyncEnable: %s%n compressEnable: %s, reportInterval: %d%n", topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum, - delayEnable, delayLevel, asyncEnable, enableCompress); + delayEnable, delayLevel, asyncEnable, enableCompress, reportInterval); StringBuilder sb = new StringBuilder(messageSize); for (int i = 0; i < messageSize; i++) { @@ -139,7 +140,7 @@ public void run() { e.printStackTrace(); } } - }, 10000, 10000, TimeUnit.MILLISECONDS); + }, reportInterval, reportInterval, TimeUnit.MILLISECONDS); RPCHook rpcHook = null; if (aclEnable) { @@ -370,6 +371,10 @@ public static Options buildCommandlineOptions(final Options options) { opt.setRequired(false); options.addOption(opt); + opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000"); + opt.setRequired(false); + options.addOption(opt); + return options; } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index ebe3e01fdc1..34cdeb49dbb 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -79,6 +79,7 @@ public static void main(String[] args) throws MQClientException, UnsupportedEnco config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0; config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); config.msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m')); + config.reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000; final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount); @@ -105,8 +106,7 @@ private void printStats() { Snapshot begin = snapshotList.getFirst(); Snapshot end = snapshotList.getLast(); - final long sendCount = (end.sendRequestSuccessCount - begin.sendRequestSuccessCount) - + (end.sendRequestFailedCount - begin.sendRequestFailedCount); + final long sendCount = end.sendRequestSuccessCount - begin.sendRequestSuccessCount; final long sendTps = (sendCount * 1000L) / (end.endTime - begin.endTime); final double averageRT = (end.sendMessageTimeTotal - begin.sendMessageTimeTotal) / (double) (end.sendRequestSuccessCount - begin.sendRequestSuccessCount); @@ -131,7 +131,7 @@ public void run() { e.printStackTrace(); } } - }, 10000, 10000, TimeUnit.MILLISECONDS); + }, config.reportInterval, config.reportInterval, TimeUnit.MILLISECONDS); RPCHook rpcHook = null; if (config.aclEnable) { @@ -291,6 +291,10 @@ public static Options buildCommandlineOptions(final Options options) { opt.setRequired(false); options.addOption(opt); + opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000"); + opt.setRequired(false); + options.addOption(opt); + return options; } } @@ -475,6 +479,7 @@ class TxSendConfig { int sendInterval; boolean aclEnable; boolean msgTraceEnable; + int reportInterval; } class LRUMap extends LinkedHashMap {