From ae6e9384dc9f588886d394cb9ed5dc25a49c8c75 Mon Sep 17 00:00:00 2001 From: "sen.chai" Date: Mon, 12 Jun 2023 22:12:51 +0800 Subject: [PATCH 1/8] =?UTF-8?q?1.=20=E5=AE=8C=E5=96=84=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E5=92=8C=E9=94=99=E8=AF=AF=E6=97=A5=E5=BF=97=E6=89=93=E5=8D=B0?= =?UTF-8?q?=202.=20pullLog=E9=81=8D=E5=8E=86=E7=9A=84=E6=97=B6=E5=80=99?= =?UTF-8?q?=EF=BC=8C=E9=9C=80=E8=A6=81copy=203.=20batchExecutor=E9=98=9F?= =?UTF-8?q?=E5=88=97=E6=BB=A1=E4=BA=86=EF=BC=8C=E4=BC=98=E5=8C=96=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E9=80=BB=E8=BE=91=EF=BC=8C=E9=87=8D=E8=AF=95=E5=89=8D?= =?UTF-8?q?=E9=9A=8F=E6=9C=BA=E7=AD=89=E5=BE=85=204.=20Wheel=E9=87=8C?= =?UTF-8?q?=E7=9A=84=E9=98=9F=E5=88=97=E6=BB=A1=E4=BA=86=EF=BC=8Ccatch?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E5=B9=B6=E8=BF=9B=E8=A1=8C=E5=9B=BA=E5=AE=9A?= =?UTF-8?q?=E7=AD=89=E5=BE=85=E9=87=8D=E8=AF=95=205.=20Wheel=E9=87=8Cexpir?= =?UTF-8?q?e=E9=80=BB=E8=BE=91=E6=8A=9BException=E6=88=96=E8=80=85Error?= =?UTF-8?q?=E9=9C=80=E8=A6=81catch=E4=BD=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/qunar/tc/qmq/delay/monitor/QMon.java | 24 ++++++++++++++ .../tc/qmq/delay/sender/SenderGroup.java | 21 ++++++------ .../tc/qmq/delay/sender/SenderProcessor.java | 27 +++++++++++++++- .../tc/qmq/delay/startup/ServerWrapper.java | 27 ++++++++-------- .../tc/qmq/delay/wheel/HashedWheelTimer.java | 17 +++++++++- .../tc/qmq/delay/wheel/WheelTickManager.java | 32 ++++++++++++++++--- .../qunar/tc/qmq/store/PullLogManager.java | 10 ++++-- 7 files changed, 127 insertions(+), 31 deletions(-) diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java index f162b1bb..f544a5a6 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java @@ -127,4 +127,28 @@ public static void appendFailedByMessageIllegal(String subject) { public static void processTime(String subject, long time) { Metrics.timer("processTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS); } + + public static void sendBatchExecutorAddFailed(String subject) { + countInc("sendBatchExecutorAddFailed", subject); + } + + public static void hashedWheelTimerExpireError() { + Metrics.counter("hashed_wheel_timer_expire_error", EMPTY, EMPTY).inc(); + } + + public static void sendBatchExecutorAddFailedRetryFailed(String subject) { + countInc("sendBatchExecutorAddFailedRetryFailed", subject); + } + + public static void addWheelFailed(String subject) { + countInc("addWheelFailed", subject); + } + + public static void sendProcessorFailed() { + Metrics.counter("sendProcessorFailed", EMPTY, EMPTY).inc(); + } + + public static void sendProcessorPartlyFailed() { + Metrics.counter("sendProcessorPartlyFailed", EMPTY, EMPTY).inc(); + } } diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java index 1aaa8892..56a7cd9a 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java @@ -69,7 +69,8 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { boolean success = false; while (!success) { try { - success = workQueue.add(r); + success = workQueue.offer(r); + //success = workQueue.add(r); 这里会频繁抛异常,微秒级别性能会有问题。 } catch (Throwable ignore) { } @@ -184,18 +185,18 @@ private Map getSendResult(Datagram response) { private Datagram sendMessages(final List records, final Sender sender) { long start = System.currentTimeMillis(); - try { return sender.send(records, this); } catch (ClientSendException e) { ClientSendException.SendErrorCode errorCode = e.getSendErrorCode(); - monitorSendError(records, groupInfo.get(), errorCode.ordinal()); + LOGGER.error("SenderGroup sendMessages error, client send exception, broker group={}, errorCode={}", groupInfo.get(), errorCode, e); + monitorSendError(records, groupInfo.get(), errorCode.ordinal(), e); } catch (Exception e) { - monitorSendError(records, groupInfo.get(), -1); + LOGGER.error("SenderGroup sendMessages error, broker group={}", groupInfo.get(), e); + monitorSendError(records, groupInfo.get(), -1, e); } finally { QMon.sendMsgTime(groupInfo.get().getGroupName(), System.currentTimeMillis() - start); } - return null; } @@ -210,13 +211,15 @@ private void monitorSendFail(String subject, String groupName) { QMon.nettySendMessageFailCount(subject, groupName); } - private void monitorSendError(List records, BrokerGroupInfo group, int errorCode) { - records.parallelStream().forEach(record -> monitorSendError(record.getSubject(), group, errorCode)); + private void monitorSendError(List records, BrokerGroupInfo group, int errorCode, Throwable throwable) { + for (ScheduleSetRecord record : records) { + monitorSendError(record.getSubject(), group, errorCode, throwable); + } } - private void monitorSendError(String subject, BrokerGroupInfo group, int errorCode) { + private void monitorSendError(String subject, BrokerGroupInfo group, int errorCode, Throwable throwable) { if (LOG_LIMITER.tryAcquire()) { - LOGGER.error("netty delay sender send error,subject:{},group:{},code:{}", subject, group, errorCode); + LOGGER.error("netty delay sender send error,subject:{},group:{},code:{}", subject, group, errorCode, throwable); } QMon.nettySendMessageFailCount(subject, group.getGroupName()); } diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java index f8ef3455..05c17e17 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java @@ -27,11 +27,13 @@ import qunar.tc.qmq.delay.DelayLogFacade; import qunar.tc.qmq.delay.ScheduleIndex; import qunar.tc.qmq.delay.meta.BrokerRoleManager; +import qunar.tc.qmq.delay.monitor.QMon; import qunar.tc.qmq.delay.store.model.DispatchLogRecord; import qunar.tc.qmq.delay.store.model.ScheduleSetRecord; import java.util.List; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** @@ -90,6 +92,7 @@ public void send(ScheduleIndex index) { } catch (InterruptedException e) { return; } + //这里卡住重试 if (!add) { reject(index); } @@ -106,7 +109,27 @@ public void process(List indexList) { } private void reject(ScheduleIndex index) { - send(index); + QMon.sendBatchExecutorAddFailed(index.getSubject()); + boolean result = false; + int retryTimes = config.getInt("delay.send.batch.executor.retry.times", 50); + long sleepBaseMillis = config.getLong("delay.send.batch.executor.sleep.min.millis", 1000L); + for (int i = 0; i < retryTimes; i++) { + try { + final ThreadLocalRandom random = ThreadLocalRandom.current(); + long sleepMillis = sleepBaseMillis + random.nextInt(1000); + Thread.sleep(sleepMillis); + send(index); + result = true; + break; + } catch (InterruptedException e) { + LOGGER.error("send processor reject InterruptedException", e); + } catch (Throwable throwable) { + LOGGER.error("send processor reject error, subject={}, scheduleTime={}, offset={}", index.getSubject(), index.getScheduleTime(), index.getOffset(), throwable); + } + } + if (!result) { + QMon.sendBatchExecutorAddFailedRetryFailed(index.getSubject()); + } } private void success(ScheduleSetRecord record) { @@ -117,6 +140,7 @@ private void retry(List records, Set messageIds) { final Set refreshSubject = Sets.newHashSet(); for (ScheduleSetRecord record : records) { if (messageIds.contains(record.getMessageId())) { + QMon.sendProcessorPartlyFailed(); ScheduleIndex index = new ScheduleIndex(record.getSubject(), record.getScheduleTime(), record.getStartWroteOffset(), record.getRecordSize(), record.getSequence()); refresh(index, refreshSubject); send(index); @@ -153,6 +177,7 @@ public void success(List indexList, Set messageIds) { @Override public void fail(List indexList) { + QMon.sendProcessorFailed(); retry(indexList); } diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/startup/ServerWrapper.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/startup/ServerWrapper.java index b9a9a063..ba500817 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/startup/ServerWrapper.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/startup/ServerWrapper.java @@ -76,19 +76,19 @@ public class ServerWrapper implements Disposable { private DynamicConfig config; private DefaultStoreConfiguration storeConfig; - public void start(boolean autoOnline) { - init(); - register(); - startServer(); - sync(); - if (autoOnline) - online(); - } - - public void online() { - BrokerConfig.markAsWritable(); - brokerRegisterService.healthSwitch(true); - } + public void start(boolean autoOnline) { + init(); + register(); + startServer(); + sync(); + if (autoOnline) + online(); + } + + public void online() { + BrokerConfig.markAsWritable(); + brokerRegisterService.healthSwitch(true); + } private void init() { this.config = DynamicConfigLoader.load("delay.properties"); @@ -122,7 +122,6 @@ private boolean iterateCallback(final ScheduleIndex index) { wheelTickManager.addWHeel(index); return true; } - return false; } diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/HashedWheelTimer.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/HashedWheelTimer.java index 919614f3..6f914d29 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/HashedWheelTimer.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/HashedWheelTimer.java @@ -17,7 +17,10 @@ import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import qunar.tc.qmq.delay.ScheduleIndex; +import qunar.tc.qmq.delay.monitor.QMon; import java.util.Collections; import java.util.HashSet; @@ -30,6 +33,9 @@ @SuppressWarnings("all") public class HashedWheelTimer { + + private static final Logger LOGGER = LoggerFactory.getLogger(HashedWheelTimer.class); + private static final AtomicIntegerFieldUpdater WORKER_STATE_UPDATER; static { @@ -168,6 +174,9 @@ public Set stop() { return worker.unprocessedTimeouts(); } + /** + * java.lang.IllegalStateException: Queue full---队列满了会抛这个异常 + */ public void newTimeout(ScheduleIndex index, long delay, TimeUnit unit) { long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; HashedWheelTimeout timeout = new HashedWheelTimeout(this, index, deadline); @@ -340,7 +349,13 @@ public void expire() { return; } - timer.expire(entity); + try { + timer.expire(entity); + } catch (Throwable throwable) { + LOGGER.error("timer expire op error, entityInfo:subject={},offset={},scheduleTime={}", + entity.getSubject(), entity.getOffset(), entity.getScheduleTime(), throwable); + QMon.hashedWheelTimerExpireError(); + } } @Override diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/WheelTickManager.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/WheelTickManager.java index 8feb867f..c8b34394 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/WheelTickManager.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/WheelTickManager.java @@ -119,6 +119,11 @@ private void doRecover(DispatchLogSegment dispatchLogSegment) { loadedCursor.shiftCursor(baseOffset); } + /** + * 看看哪些已经dispatch过了,recover的时候就忽略 + * @param currentDispatchLog + * @return + */ private LongHashSet loadDispatchLog(final DispatchLogSegment currentDispatchLog) { LogVisitor visitor = currentDispatchLog.newVisitor(0); final LongHashSet recordSet = new LongHashSet(currentDispatchLog.entries()); @@ -138,6 +143,9 @@ private boolean isStarted() { return started.get(); } + /** + * 周期性的兜底---每隔30分钟执行一次, + */ private void load() { long next = System.currentTimeMillis() + config.getLoadInAdvanceTimesInMillis(); long prepareLoadBaseOffset = resolveSegment(next, segmentScale); @@ -266,9 +274,10 @@ private void refresh(ScheduleIndex index) { try { scheduleTime = index.getScheduleTime(); timer.newTimeout(index, scheduleTime - now, TimeUnit.MILLISECONDS); - } catch (Throwable e) { - LOGGER.error("wheel refresh error, scheduleTime:{}, delay:{}", scheduleTime, scheduleTime - now); - throw Throwables.propagate(e); + } catch (Throwable throwable) { + LOGGER.error("wheel refresh error, scheduleTime:{}, delay:{}", scheduleTime, scheduleTime - now, throwable); + QMon.addWheelFailed(index.getSubject()); + throw Throwables.propagate(throwable); } } @@ -284,7 +293,22 @@ public void shutdown() { } public void addWHeel(ScheduleIndex index) { - refresh(index); + boolean result = false; + do { + try { + //Wheel里面的队列满了,需要等待重试,这里卡住 + refresh(index); + result = true; + } catch (Throwable throwable) { + LOGGER.error("add wheel temp error, {}, {}, {}", index.getSubject(), index.getScheduleTime(), index.getOffset(), throwable); + try { + long sleepMillis = config.getConfig().getLong("add.wheel.exception.sleep.millis", 500L); + Thread.sleep(sleepMillis); + } catch (InterruptedException e) { + LOGGER.error("add wheel temp sleep error", e); + } + } + } while (!result); } public boolean canAdd(long scheduleTime, long offset) { diff --git a/qmq-store/src/main/java/qunar/tc/qmq/store/PullLogManager.java b/qmq-store/src/main/java/qunar/tc/qmq/store/PullLogManager.java index e5e5bb59..5a54990f 100644 --- a/qmq-store/src/main/java/qunar/tc/qmq/store/PullLogManager.java +++ b/qmq-store/src/main/java/qunar/tc/qmq/store/PullLogManager.java @@ -17,6 +17,7 @@ package qunar.tc.qmq.store; import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,7 +130,11 @@ public Table getLogs() { public void flush() { final long start = System.currentTimeMillis(); try { - for (final PullLog log : logs.values()) { + /** + * 报java.util.ConcurrentModificationException,需要copy + */ + Collection pullLogs = ImmutableList.copyOf(logs.values()); + for (final PullLog log : pullLogs) { log.flush(); } } finally { @@ -171,7 +176,8 @@ private void remove(String subject, String group, String consumerId) { @Override public void close() { - for (final PullLog log : logs.values()) { + Collection pullLogs = ImmutableList.copyOf(logs.values()); + for (final PullLog log : pullLogs) { log.close(); } } From 30ec8d3a5751cc70f9e276bb5f0496a652052cf2 Mon Sep 17 00:00:00 2001 From: "sen.chai" Date: Mon, 12 Jun 2023 23:44:35 +0800 Subject: [PATCH 2/8] =?UTF-8?q?reject=E4=B8=8D=E9=9C=80=E8=A6=81=EF=BC=8Cb?= =?UTF-8?q?atchExecutor=E6=B7=BB=E5=8A=A0=E9=87=8D=E8=AF=95=E7=9B=B4?= =?UTF-8?q?=E5=88=B0=E6=88=90=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/qunar/tc/qmq/delay/monitor/QMon.java | 4 -- .../tc/qmq/delay/sender/SenderProcessor.java | 56 ++++++------------- 2 files changed, 18 insertions(+), 42 deletions(-) diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java index f544a5a6..f52007ce 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java @@ -136,10 +136,6 @@ public static void hashedWheelTimerExpireError() { Metrics.counter("hashed_wheel_timer_expire_error", EMPTY, EMPTY).inc(); } - public static void sendBatchExecutorAddFailedRetryFailed(String subject) { - countInc("sendBatchExecutorAddFailedRetryFailed", subject); - } - public static void addWheelFailed(String subject) { countInc("addWheelFailed", subject); } diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java index 05c17e17..90f90893 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java @@ -80,22 +80,26 @@ public void send(ScheduleIndex index) { if (!BrokerRoleManager.isDelayMaster()) { return; } - boolean add; - try { - long waitTime = Math.abs(sendWaitTime); - if (waitTime > 0) { - add = batchExecutor.addItem(index, waitTime, TimeUnit.MILLISECONDS); - } else { - add = batchExecutor.addItem(index); + do { + try { + long waitTime = Math.abs(sendWaitTime); + if (waitTime > 0) { + add = batchExecutor.addItem(index, waitTime, TimeUnit.MILLISECONDS); + } else { + add = batchExecutor.addItem(index); + } + if (!add) { + QMon.sendBatchExecutorAddFailed(index.getSubject()); + long sleepBaseMillis = config.getLong("delay.send.batch.executor.sleep.min.millis", 500L); + final ThreadLocalRandom random = ThreadLocalRandom.current(); + long sleepMillis = sleepBaseMillis + random.nextInt(500); + Thread.sleep(sleepMillis); + } + } catch (InterruptedException e) { + return; } - } catch (InterruptedException e) { - return; - } - //这里卡住重试 - if (!add) { - reject(index); - } + } while (!add); } @Override @@ -108,30 +112,6 @@ public void process(List indexList) { } } - private void reject(ScheduleIndex index) { - QMon.sendBatchExecutorAddFailed(index.getSubject()); - boolean result = false; - int retryTimes = config.getInt("delay.send.batch.executor.retry.times", 50); - long sleepBaseMillis = config.getLong("delay.send.batch.executor.sleep.min.millis", 1000L); - for (int i = 0; i < retryTimes; i++) { - try { - final ThreadLocalRandom random = ThreadLocalRandom.current(); - long sleepMillis = sleepBaseMillis + random.nextInt(1000); - Thread.sleep(sleepMillis); - send(index); - result = true; - break; - } catch (InterruptedException e) { - LOGGER.error("send processor reject InterruptedException", e); - } catch (Throwable throwable) { - LOGGER.error("send processor reject error, subject={}, scheduleTime={}, offset={}", index.getSubject(), index.getScheduleTime(), index.getOffset(), throwable); - } - } - if (!result) { - QMon.sendBatchExecutorAddFailedRetryFailed(index.getSubject()); - } - } - private void success(ScheduleSetRecord record) { facade.appendDispatchLog(new DispatchLogRecord(record.getSubject(), record.getMessageId(), record.getScheduleTime(), record.getSequence())); } From e9539ae225769273fd1c50ef6b07554119e402bb Mon Sep 17 00:00:00 2001 From: "sen.chai" Date: Tue, 13 Jun 2023 10:58:48 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E7=89=88=E6=9C=AC=E5=8F=B7=E5=8D=87?= =?UTF-8?q?=E7=BA=A7=E5=88=B01.1.44-SNAPSHOT?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- qmq-api/pom.xml | 2 +- qmq-backup/pom.xml | 2 +- qmq-client/pom.xml | 2 +- qmq-common/pom.xml | 2 +- qmq-delay-server/pom.xml | 2 +- qmq-deploy/pom.xml | 2 +- qmq-dist/pom.xml | 2 +- qmq-gateway/pom.xml | 2 +- qmq-metaserver/pom.xml | 2 +- qmq-metrics-prometheus/pom.xml | 2 +- qmq-remoting/pom.xml | 2 +- qmq-server-common/pom.xml | 2 +- qmq-server/pom.xml | 2 +- qmq-store/pom.xml | 2 +- qmq-sync/pom.xml | 2 +- qmq-tools/pom.xml | 2 +- qmq-watchdog/pom.xml | 2 +- 18 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pom.xml b/pom.xml index 7ff9de45..467a61bd 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.qunar.qmq qmq-parent - 1.1.43 + 1.1.44-SNAPSHOT pom qmq diff --git a/qmq-api/pom.xml b/qmq-api/pom.xml index 20bacc68..f4440059 100644 --- a/qmq-api/pom.xml +++ b/qmq-api/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-api diff --git a/qmq-backup/pom.xml b/qmq-backup/pom.xml index 9cb9f086..dbbebbfa 100644 --- a/qmq-backup/pom.xml +++ b/qmq-backup/pom.xml @@ -5,7 +5,7 @@ com.qunar.qmq qmq-parent - 1.1.43 + 1.1.44-SNAPSHOT 4.0.0 diff --git a/qmq-client/pom.xml b/qmq-client/pom.xml index 6d7f4714..87f1ce00 100644 --- a/qmq-client/pom.xml +++ b/qmq-client/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-client diff --git a/qmq-common/pom.xml b/qmq-common/pom.xml index 697e373c..7863d076 100644 --- a/qmq-common/pom.xml +++ b/qmq-common/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-common diff --git a/qmq-delay-server/pom.xml b/qmq-delay-server/pom.xml index 7f032a0d..0acd0e06 100644 --- a/qmq-delay-server/pom.xml +++ b/qmq-delay-server/pom.xml @@ -5,7 +5,7 @@ com.qunar.qmq qmq-parent - 1.1.43 + 1.1.44-SNAPSHOT qmq-delay-server diff --git a/qmq-deploy/pom.xml b/qmq-deploy/pom.xml index b33155a0..3588da0b 100644 --- a/qmq-deploy/pom.xml +++ b/qmq-deploy/pom.xml @@ -4,7 +4,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT 4.0.0 diff --git a/qmq-dist/pom.xml b/qmq-dist/pom.xml index 95685fe7..beb21904 100644 --- a/qmq-dist/pom.xml +++ b/qmq-dist/pom.xml @@ -6,7 +6,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-dist diff --git a/qmq-gateway/pom.xml b/qmq-gateway/pom.xml index 66cb2842..ab61c780 100644 --- a/qmq-gateway/pom.xml +++ b/qmq-gateway/pom.xml @@ -7,7 +7,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-gateway diff --git a/qmq-metaserver/pom.xml b/qmq-metaserver/pom.xml index cc2c459d..7e047b7a 100644 --- a/qmq-metaserver/pom.xml +++ b/qmq-metaserver/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-metaserver diff --git a/qmq-metrics-prometheus/pom.xml b/qmq-metrics-prometheus/pom.xml index 90ad14eb..f681248e 100644 --- a/qmq-metrics-prometheus/pom.xml +++ b/qmq-metrics-prometheus/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-metrics-prometheus diff --git a/qmq-remoting/pom.xml b/qmq-remoting/pom.xml index 388a5fc0..273b0bec 100644 --- a/qmq-remoting/pom.xml +++ b/qmq-remoting/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-remoting diff --git a/qmq-server-common/pom.xml b/qmq-server-common/pom.xml index f81feb39..2abd99ea 100644 --- a/qmq-server-common/pom.xml +++ b/qmq-server-common/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-server-common diff --git a/qmq-server/pom.xml b/qmq-server/pom.xml index 22da86be..1fb676fa 100644 --- a/qmq-server/pom.xml +++ b/qmq-server/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-server diff --git a/qmq-store/pom.xml b/qmq-store/pom.xml index 52123dc1..de26e4ff 100644 --- a/qmq-store/pom.xml +++ b/qmq-store/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-store diff --git a/qmq-sync/pom.xml b/qmq-sync/pom.xml index a858fdf1..bbab2997 100644 --- a/qmq-sync/pom.xml +++ b/qmq-sync/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-sync diff --git a/qmq-tools/pom.xml b/qmq-tools/pom.xml index 3d40c83f..37e1fa91 100644 --- a/qmq-tools/pom.xml +++ b/qmq-tools/pom.xml @@ -5,7 +5,7 @@ qmq-parent com.qunar.qmq - 1.1.43 + 1.1.44-SNAPSHOT qmq-tools diff --git a/qmq-watchdog/pom.xml b/qmq-watchdog/pom.xml index f748655b..d187f6eb 100644 --- a/qmq-watchdog/pom.xml +++ b/qmq-watchdog/pom.xml @@ -5,7 +5,7 @@ com.qunar.qmq qmq-parent - 1.1.43 + 1.1.44-SNAPSHOT qmq-watchdog From 56594cecb5cb41ac18973a231b134ae330a7ff10 Mon Sep 17 00:00:00 2001 From: "sen.chai" Date: Tue, 13 Jun 2023 13:42:52 +0800 Subject: [PATCH 4/8] =?UTF-8?q?scheduleLog=E6=B7=BB=E5=8A=A0append?= =?UTF-8?q?=E7=9B=91=E6=8E=A7=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/qunar/tc/qmq/delay/monitor/QMon.java | 4 ++ .../tc/qmq/delay/store/log/ScheduleLog.java | 40 +++++++++++-------- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java index f52007ce..f8c44fa0 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java @@ -147,4 +147,8 @@ public static void sendProcessorFailed() { public static void sendProcessorPartlyFailed() { Metrics.counter("sendProcessorPartlyFailed", EMPTY, EMPTY).inc(); } + + public static void appendScheduleLogTime(String subject, long time) { + Metrics.timer("appendScheduleLogTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS); + } } diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/store/log/ScheduleLog.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/store/log/ScheduleLog.java index 479ff2db..9f40b3f7 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/store/log/ScheduleLog.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/store/log/ScheduleLog.java @@ -23,6 +23,7 @@ import qunar.tc.qmq.delay.ScheduleIndex; import qunar.tc.qmq.delay.base.LongHashSet; import qunar.tc.qmq.delay.config.StoreConfiguration; +import qunar.tc.qmq.delay.monitor.QMon; import qunar.tc.qmq.delay.store.DefaultDelaySegmentValidator; import qunar.tc.qmq.delay.store.ScheduleLogValidatorSupport; import qunar.tc.qmq.delay.store.appender.ScheduleSetAppender; @@ -71,25 +72,30 @@ private void reValidate(int singleMessageLimitSize) { @Override public AppendLogResult append(LogRecord record) { - if (!open.get()) { - return new AppendLogResult<>(MessageProducerCode.STORE_ERROR, "schedule log closed"); - } - AppendLogResult> result = scheduleSet.append(record); - int code = result.getCode(); - if (MessageProducerCode.SUCCESS != code) { - LOGGER.error("appendMessageLog schedule set error,log:{} {},code:{}", record.getSubject(), record.getMessageId(), code); - return new AppendLogResult<>(MessageProducerCode.STORE_ERROR, "appendScheduleSetError"); - } + long start = System.currentTimeMillis(); + try { + if (!open.get()) { + return new AppendLogResult<>(MessageProducerCode.STORE_ERROR, "schedule log closed"); + } + AppendLogResult> result = scheduleSet.append(record); + int code = result.getCode(); + if (MessageProducerCode.SUCCESS != code) { + LOGGER.error("appendMessageLog schedule set error,log:{} {},code:{}", record.getSubject(), record.getMessageId(), code); + return new AppendLogResult<>(MessageProducerCode.STORE_ERROR, "appendScheduleSetError"); + } - RecordResult recordResult = result.getAdditional(); - ScheduleIndex index = new ScheduleIndex( - record.getSubject(), - record.getScheduleTime(), - recordResult.getResult().getWroteOffset(), - recordResult.getResult().getWroteBytes(), - recordResult.getResult().getAdditional().getSequence()); + RecordResult recordResult = result.getAdditional(); + ScheduleIndex index = new ScheduleIndex( + record.getSubject(), + record.getScheduleTime(), + recordResult.getResult().getWroteOffset(), + recordResult.getResult().getWroteBytes(), + recordResult.getResult().getAdditional().getSequence()); - return new AppendLogResult<>(MessageProducerCode.SUCCESS, "", index); + return new AppendLogResult<>(MessageProducerCode.SUCCESS, "", index); + } finally { + QMon.appendScheduleLogTime(record.getSubject(), System.currentTimeMillis() - start); + } } @Override From a9512a22be8b3aaef330ee6fded57abd0496777b Mon Sep 17 00:00:00 2001 From: "sen.chai" Date: Wed, 14 Jun 2023 15:59:26 +0800 Subject: [PATCH 5/8] =?UTF-8?q?slave=E5=90=8C=E6=AD=A5=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=9A=84=E7=9B=91=E6=8E=A7=E4=BD=8D=E7=BD=AE=E5=8A=A0=E9=94=99?= =?UTF-8?q?=E4=BA=86=EF=BC=8C=E4=BF=AE=E6=AD=A3=E4=B8=80=E4=B8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/qunar/tc/qmq/delay/sync/slave/SlaveSynchronizer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sync/slave/SlaveSynchronizer.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sync/slave/SlaveSynchronizer.java index 00ab41de..d734471c 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sync/slave/SlaveSynchronizer.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sync/slave/SlaveSynchronizer.java @@ -104,8 +104,8 @@ void shutdown() { @Override public void run() { - final long start = System.currentTimeMillis(); while (running) { + final long start = System.currentTimeMillis(); try { sync(); } catch (Throwable e) { From df357f9b06e22423b17b7b7df501f998962d3c8f Mon Sep 17 00:00:00 2001 From: "sen.chai" Date: Wed, 12 Jul 2023 20:38:46 +0800 Subject: [PATCH 6/8] =?UTF-8?q?ClientMetaInfo=E7=9A=84id=E5=9C=A8sql?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E7=9A=84=E6=97=B6=E5=80=99=EF=BC=8C=E4=BD=BF?= =?UTF-8?q?=E7=94=A8getLong=E8=80=8C=E4=B8=8D=E6=98=AFgetInt=EF=BC=8CBLF?= =?UTF-8?q?=E5=B7=B2=E7=BB=8F=E5=87=BA=E7=8E=B0=E6=BA=A2=E5=87=BA=E7=9A=84?= =?UTF-8?q?=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../qunar/tc/qmq/meta/store/impl/ClientMetaInfoStoreImpl.java | 2 +- .../main/java/qunar/tc/qmq/meta/store/impl/DatabaseStore.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/ClientMetaInfoStoreImpl.java b/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/ClientMetaInfoStoreImpl.java index 1551a52d..161bd571 100644 --- a/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/ClientMetaInfoStoreImpl.java +++ b/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/ClientMetaInfoStoreImpl.java @@ -36,7 +36,7 @@ public class ClientMetaInfoStoreImpl implements ClientMetaInfoStore { public List queryConsumer(String subject) { return jdbcTemplate.query("SELECT id,subject_info,client_type,consumer_group,client_id,app_code,room FROM client_meta_info WHERE subject_info=? AND client_type=?", (rs, rowNum) -> { final ClientMetaInfo meta = new ClientMetaInfo(); - meta.setId(rs.getInt("id")); + meta.setId(rs.getLong("id")); meta.setSubject(rs.getString("subject_info")); meta.setClientTypeCode(rs.getInt("client_type")); meta.setConsumerGroup(rs.getString("consumer_group")); diff --git a/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/DatabaseStore.java b/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/DatabaseStore.java index f03038be..316a710b 100644 --- a/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/DatabaseStore.java +++ b/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/DatabaseStore.java @@ -231,7 +231,7 @@ public void insertClientMetaInfo(MetaInfoRequest request) { private static final RowMapper CLIENT_META_INFO_ROW_MAPPER= (rs, rowNum) -> { final ClientMetaInfo clientMetaInfo = new ClientMetaInfo(); - clientMetaInfo.setId(rs.getInt("id")); + clientMetaInfo.setId(rs.getLong("id")); clientMetaInfo.setSubject(rs.getString("subject_info")); clientMetaInfo.setClientTypeCode(rs.getInt("client_type")); clientMetaInfo.setConsumerGroup(rs.getString("consumer_group")); From 121d8f5831cfa19cfc5f2d21d48a5d01e189eb4c Mon Sep 17 00:00:00 2001 From: "sen.chai" Date: Thu, 30 Nov 2023 16:21:24 +0800 Subject: [PATCH 7/8] =?UTF-8?q?=E4=BF=AE=E5=A4=8DActor=E7=9A=84=E6=AF=94?= =?UTF-8?q?=E8=BE=83=E7=AD=96=E7=95=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../qunar/tc/qmq/concurrent/ActorSystem.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/qmq-server-common/src/main/java/qunar/tc/qmq/concurrent/ActorSystem.java b/qmq-server-common/src/main/java/qunar/tc/qmq/concurrent/ActorSystem.java index b76c12be..d67b1532 100644 --- a/qmq-server-common/src/main/java/qunar/tc/qmq/concurrent/ActorSystem.java +++ b/qmq-server-common/src/main/java/qunar/tc/qmq/concurrent/ActorSystem.java @@ -128,6 +128,7 @@ public static class Actor implements Runnable, Comparable { private final String name; private long total; private volatile long submitTs; + private volatile long executeTs; //通过Unsafe操作 private volatile int status; @@ -148,6 +149,7 @@ boolean dispatch(E message) { @Override public void run() { long start = System.currentTimeMillis(); + executeTs = start; String old = Thread.currentThread().getName(); try { Thread.currentThread().setName(systemName + "-" + name); @@ -228,8 +230,16 @@ private boolean updateStatus(int oldStatus, int newStatus) { @Override public int compareTo(Actor o) { + /* + 原策略"TotalDuration"会存在如下问题: + 当一个突发性的高QPS消息过来,由于之前量小,total值很低,所以每次都会优先执行这个消息,导致CPU被全部拿走,进而导致其他消息无法被拉取 int result = Long.compare(total, o.total); return result == 0 ? Long.compare(submitTs, o.submitTs) : result; + + 改进策略"LastExecuteTimestamp": + 记录executeTs“上次执行时间戳”,使用“上次执行时间戳”进行比较,如果“上次执行时间戳”相同,再降级到total + */ + return ActorCompareWay.LastExecuteTimestamp.compare(this, o); } @Override @@ -247,6 +257,26 @@ public int hashCode() { } } + private enum ActorCompareWay { + + LastExecuteTimestamp { + @Override + public int compare(Actor a1, Actor a2) { + int result = Long.compare(a1.executeTs, a2.executeTs); + return result == 0 ? Long.compare(a1.total, a2.total) : result; + } + }, + TotalDuration { + @Override + public int compare(Actor a1, Actor a2) { + int result = Long.compare(a1.total, a2.total); + return result == 0 ? Long.compare(a1.submitTs, a2.submitTs) : result; + } + }; + + public abstract int compare(Actor a1, Actor a2); + } + /** * Copyright (C) 2009-2016 Lightbend Inc. */ From 5c0bfaf810b7e6af342ed2a44d2a2230061953d9 Mon Sep 17 00:00:00 2001 From: "sen.chai" Date: Thu, 30 Nov 2023 16:52:12 +0800 Subject: [PATCH 8/8] =?UTF-8?q?delay=20queue=E9=97=AE=E9=A2=98=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/qunar/tc/qmq/delay/monitor/QMon.java | 12 ++ .../qunar/tc/qmq/delay/receiver/Receiver.java | 1 - .../tc/qmq/delay/sender/SenderExecutor.java | 15 ++- .../tc/qmq/delay/sender/SenderGroup.java | 108 +++++++++++++++--- .../tc/qmq/delay/sender/SenderProcessor.java | 19 ++- 5 files changed, 138 insertions(+), 17 deletions(-) diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java index f8c44fa0..cc63a7ad 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java @@ -151,4 +151,16 @@ public static void sendProcessorPartlyFailed() { public static void appendScheduleLogTime(String subject, long time) { Metrics.timer("appendScheduleLogTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS); } + + public static void retryNettySend() { + Metrics.counter("retryNettySend", EMPTY, EMPTY).inc(); + } + + public static void retryNettySendError() { + Metrics.counter("retryNettySendError", EMPTY, EMPTY).inc(); + } + + public static void syncSend(String subject) { + Metrics.counter("sync_send_count", SUBJECT_ARRAY, new String[] {subject}).inc(); + } } diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java index f54ed56d..44de6982 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java @@ -73,7 +73,6 @@ ListenableFuture receive(List messages, RemotingComm for (RawMessageExtend message : messages) { final MessageHeader header = message.getHeader(); monitorMessageReceived(header.getCreateTime(), header.getSubject()); - final ReceivedDelayMessage receivedDelayMessage = new ReceivedDelayMessage(message, cmd.getReceiveTime()); futures.add(receivedDelayMessage.getPromise()); invoker.invoke(receivedDelayMessage); diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java index 30732c10..642b5708 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java @@ -46,12 +46,14 @@ class SenderExecutor implements Disposable { private final Sender sender; private final DelayLogFacade store; private final int sendThreads; + private DynamicConfig sendConfig; SenderExecutor(final Sender sender, DelayLogFacade store, DynamicConfig sendConfig) { this.sender = sender; this.store = store; this.brokerLoadBalance = PollBrokerLoadBalance.getInstance(); this.sendThreads = sendConfig.getInt("delay.send.threads", DEFAULT_SEND_THREAD); + this.sendConfig = sendConfig; } void execute(final List indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) { @@ -61,6 +63,17 @@ void execute(final List indexList, final SenderGroup.ResultHandle } } + public void syncExecute(final List indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) { + Map> groups = groupByBroker(indexList, brokerService); + for (Map.Entry> entry : groups.entrySet()) { + doSyncExecute(entry.getKey(), entry.getValue(), handler); + } + } + + private void doSyncExecute(final SenderGroup group, final List list, final SenderGroup.ResultHandler handler) { + group.sendSync(list, sender, handler); + } + private void doExecute(final SenderGroup group, final List list, final SenderGroup.ResultHandler handler) { group.send(list, sender, handler); } @@ -88,7 +101,7 @@ private SenderGroup getGroup(BrokerGroupInfo groupInfo, int sendThreads) { String groupName = groupInfo.getGroupName(); SenderGroup senderGroup = groupSenders.get(groupName); if (null == senderGroup) { - senderGroup = new SenderGroup(groupInfo, sendThreads, store); + senderGroup = new SenderGroup(groupInfo, sendThreads, store, sendConfig); SenderGroup currentSenderGroup = groupSenders.putIfAbsent(groupName, senderGroup); senderGroup = null != currentSenderGroup ? currentSenderGroup : senderGroup; } else { diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java index 56a7cd9a..90e841e5 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import qunar.tc.qmq.broker.BrokerGroupInfo; import qunar.tc.qmq.common.Disposable; +import qunar.tc.qmq.configuration.DynamicConfig; import qunar.tc.qmq.delay.DelayLogFacade; import qunar.tc.qmq.delay.ScheduleIndex; import qunar.tc.qmq.delay.monitor.QMon; @@ -36,10 +37,7 @@ import qunar.tc.qmq.protocol.producer.SendResult; import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import static qunar.tc.qmq.delay.monitor.QMon.delayBrokerSendMsgCount; @@ -52,12 +50,16 @@ public class SenderGroup implements Disposable { private static final Logger LOGGER = LoggerFactory.getLogger(SenderGroup.class); private static final int MAX_SEND_BATCH_SIZE = 50; + private static final int RETRY_FALL_BACK = 1;//refresh再来重试 + private static final int RETRY_WITHOUT_FALL_BACK = 2;//不再refresh,直接在netty端重试 private final AtomicReference groupInfo; private final DelayLogFacade store; + private DynamicConfig sendConfig; private final ThreadPoolExecutor executorService; private final RateLimiter LOG_LIMITER = RateLimiter.create(2); - SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store) { + SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store, DynamicConfig sendConfig) { + this.sendConfig = sendConfig; this.groupInfo = new AtomicReference<>(groupInfo); this.store = store; final LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(1); @@ -79,27 +81,49 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { }); } + /** + * 绕过线程池直接执行 + * @param records + * @param sender + * @param handler + */ + public void sendSync(final List records, final Sender sender, final ResultHandler handler) { + monitorSyncSend(records); + doSend(records, sender, handler, RETRY_WITHOUT_FALL_BACK); + } + + private static void monitorSyncSend(List records) { + try { + for (int i = 0; i < records.size(); i++) { + ScheduleIndex scheduleIndex = records.get(i); + QMon.syncSend(scheduleIndex.getSubject()); + } + } catch (Throwable throwable) { + + } + } + public void send(final List records, final Sender sender, final ResultHandler handler) { - executorService.execute(() -> doSend(records, sender, handler)); + executorService.execute(() -> doSend(records, sender, handler, RETRY_FALL_BACK)); } - private void doSend(final List batch, final Sender sender, final ResultHandler handler) { + private void doSend(final List batch, final Sender sender, final ResultHandler handler, int retryStrategy) { BrokerGroupInfo groupInfo = this.groupInfo.get(); String groupName = groupInfo.getGroupName(); List> partitions = Lists.partition(batch, MAX_SEND_BATCH_SIZE); for (List partition : partitions) { - send(sender, handler, groupInfo, groupName, partition); + send(sender, handler, groupInfo, groupName, partition, retryStrategy); } } - private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List list) { + private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List list, int retryStrategy) { try { long start = System.currentTimeMillis(); List records = store.recoverLogRecord(list); QMon.loadMsgTime(System.currentTimeMillis() - start); - Datagram response = sendMessages(records, sender); + Datagram response = sendMessages(records, sender, retryStrategy); release(records); monitor(list, groupName); if (response == null) { @@ -183,21 +207,77 @@ private Map getSendResult(Datagram response) { } } - private Datagram sendMessages(final List records, final Sender sender) { + private Datagram sendMessages(final List records, final Sender sender, int retryStrategy) { long start = System.currentTimeMillis(); try { - return sender.send(records, this); + //return sender.send(records, this); + return doSendDatagram(records, sender); } catch (ClientSendException e) { ClientSendException.SendErrorCode errorCode = e.getSendErrorCode(); LOGGER.error("SenderGroup sendMessages error, client send exception, broker group={}, errorCode={}", groupInfo.get(), errorCode, e); monitorSendError(records, groupInfo.get(), errorCode.ordinal(), e); - } catch (Exception e) { + } catch (Throwable e) { LOGGER.error("SenderGroup sendMessages error, broker group={}", groupInfo.get(), e); monitorSendError(records, groupInfo.get(), -1, e); } finally { QMon.sendMsgTime(groupInfo.get().getGroupName(), System.currentTimeMillis() - start); } - return null; + return retrySendMessages(records, sender, retryStrategy); + } + + private Datagram doSendDatagram(List records, Sender sender) throws Exception { + return sender.send(records, this); + } + + /** + * 如果想更优雅,可以参考guava-retry + * @param records + * @param sender + * @param retryStrategy + * @return + */ + private Datagram retrySendMessages(List records, Sender sender, int retryStrategy) { + Datagram result = null; + int maxTimes = sendConfig.getInt("netty.send.message.retry.max.times", 20); + /** + * 如果是Refresh重试过来的,那就在这里一直重试netty + * 如果delay机器本身出问题,refresh也没用,就在这里卡住 + */ + if (retryStrategy == RETRY_WITHOUT_FALL_BACK) { + maxTimes = sendConfig.getInt("netty.send.message.retry.without.fallback.max.times", 36000); + } + for (int count = 0; count < maxTimes; count++) { + try { + QMon.retryNettySend(); + result = doSendDatagram(records, sender); + if (result != null) { + break; + } + } catch (Throwable e) { + if (LOG_LIMITER.tryAcquire()) { + for (ScheduleSetRecord record : records) { + LOGGER.error("retry senderGroup sendMessages error, retry_count={}, subject={}, brokerGroup={}", count, record.getSubject(), groupInfo.get(), e); + } + } + QMon.retryNettySendError(); + retryRandomSleep(retryStrategy); + } + } + return result; + } + + private void retryRandomSleep(int retryStrategy) { + long randomMinSleepMillis = sendConfig.getLong("netty.send.message.retry.min.sleep.millis", 1000L); + long randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.max.sleep.millis", 10000L); + if (retryStrategy == RETRY_WITHOUT_FALL_BACK) { + randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.without.fallback.max.sleep.millis", 30000L); + } + final ThreadLocalRandom random = ThreadLocalRandom.current(); + try { + Thread.sleep(random.nextLong(randomMinSleepMillis, randomMaxSleepMillis)); + } catch (InterruptedException ex) { + + } } private void monitorSendFail(List indexList, String groupName) { diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java index 90f90893..a281dc78 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java @@ -16,6 +16,7 @@ package qunar.tc.qmq.delay.sender; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +76,13 @@ public void init() { this.batchExecutor.init(); } + private void sendSync(ScheduleIndex index) { + if (!BrokerRoleManager.isDelayMaster()) { + return; + } + syncProcess(Lists.newArrayList(index)); + } + @Override public void send(ScheduleIndex index) { if (!BrokerRoleManager.isDelayMaster()) { @@ -112,6 +120,15 @@ public void process(List indexList) { } } + private void syncProcess(List indexList) { + try { + senderExecutor.syncExecute(indexList, this, brokerService); + } catch (Exception e) { + LOGGER.error("send message failed,messageSize:{} will retry", indexList.size(), e); + retry(indexList); + } + } + private void success(ScheduleSetRecord record) { facade.appendDispatchLog(new DispatchLogRecord(record.getSubject(), record.getMessageId(), record.getScheduleTime(), record.getSequence())); } @@ -138,7 +155,7 @@ private void retry(List indexList) { final Set refreshSubject = Sets.newHashSet(); for (ScheduleIndex index : indexList) { refresh(index, refreshSubject); - send(index); + sendSync(index); } }