diff --git a/pom.xml b/pom.xml
index 7ff9de45..467a61bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
com.qunar.qmq
qmq-parent
- 1.1.43
+ 1.1.44-SNAPSHOT
pom
qmq
diff --git a/qmq-api/pom.xml b/qmq-api/pom.xml
index 20bacc68..f4440059 100644
--- a/qmq-api/pom.xml
+++ b/qmq-api/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-api
diff --git a/qmq-backup/pom.xml b/qmq-backup/pom.xml
index 9cb9f086..dbbebbfa 100644
--- a/qmq-backup/pom.xml
+++ b/qmq-backup/pom.xml
@@ -5,7 +5,7 @@
com.qunar.qmq
qmq-parent
- 1.1.43
+ 1.1.44-SNAPSHOT
4.0.0
diff --git a/qmq-client/pom.xml b/qmq-client/pom.xml
index 6d7f4714..87f1ce00 100644
--- a/qmq-client/pom.xml
+++ b/qmq-client/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-client
diff --git a/qmq-common/pom.xml b/qmq-common/pom.xml
index 697e373c..7863d076 100644
--- a/qmq-common/pom.xml
+++ b/qmq-common/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-common
diff --git a/qmq-delay-server/pom.xml b/qmq-delay-server/pom.xml
index 7f032a0d..0acd0e06 100644
--- a/qmq-delay-server/pom.xml
+++ b/qmq-delay-server/pom.xml
@@ -5,7 +5,7 @@
com.qunar.qmq
qmq-parent
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-delay-server
diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java
index f162b1bb..cc63a7ad 100644
--- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java
+++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java
@@ -127,4 +127,40 @@ public static void appendFailedByMessageIllegal(String subject) {
public static void processTime(String subject, long time) {
Metrics.timer("processTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS);
}
+
+ public static void sendBatchExecutorAddFailed(String subject) {
+ countInc("sendBatchExecutorAddFailed", subject);
+ }
+
+ public static void hashedWheelTimerExpireError() {
+ Metrics.counter("hashed_wheel_timer_expire_error", EMPTY, EMPTY).inc();
+ }
+
+ public static void addWheelFailed(String subject) {
+ countInc("addWheelFailed", subject);
+ }
+
+ public static void sendProcessorFailed() {
+ Metrics.counter("sendProcessorFailed", EMPTY, EMPTY).inc();
+ }
+
+ public static void sendProcessorPartlyFailed() {
+ Metrics.counter("sendProcessorPartlyFailed", EMPTY, EMPTY).inc();
+ }
+
+ public static void appendScheduleLogTime(String subject, long time) {
+ Metrics.timer("appendScheduleLogTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS);
+ }
+
+ public static void retryNettySend() {
+ Metrics.counter("retryNettySend", EMPTY, EMPTY).inc();
+ }
+
+ public static void retryNettySendError() {
+ Metrics.counter("retryNettySendError", EMPTY, EMPTY).inc();
+ }
+
+ public static void syncSend(String subject) {
+ Metrics.counter("sync_send_count", SUBJECT_ARRAY, new String[] {subject}).inc();
+ }
}
diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java
index f54ed56d..44de6982 100644
--- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java
+++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java
@@ -73,7 +73,6 @@ ListenableFuture receive(List messages, RemotingComm
for (RawMessageExtend message : messages) {
final MessageHeader header = message.getHeader();
monitorMessageReceived(header.getCreateTime(), header.getSubject());
-
final ReceivedDelayMessage receivedDelayMessage = new ReceivedDelayMessage(message, cmd.getReceiveTime());
futures.add(receivedDelayMessage.getPromise());
invoker.invoke(receivedDelayMessage);
diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java
index 30732c10..642b5708 100644
--- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java
+++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java
@@ -46,12 +46,14 @@ class SenderExecutor implements Disposable {
private final Sender sender;
private final DelayLogFacade store;
private final int sendThreads;
+ private DynamicConfig sendConfig;
SenderExecutor(final Sender sender, DelayLogFacade store, DynamicConfig sendConfig) {
this.sender = sender;
this.store = store;
this.brokerLoadBalance = PollBrokerLoadBalance.getInstance();
this.sendThreads = sendConfig.getInt("delay.send.threads", DEFAULT_SEND_THREAD);
+ this.sendConfig = sendConfig;
}
void execute(final List indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) {
@@ -61,6 +63,17 @@ void execute(final List indexList, final SenderGroup.ResultHandle
}
}
+ public void syncExecute(final List indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) {
+ Map> groups = groupByBroker(indexList, brokerService);
+ for (Map.Entry> entry : groups.entrySet()) {
+ doSyncExecute(entry.getKey(), entry.getValue(), handler);
+ }
+ }
+
+ private void doSyncExecute(final SenderGroup group, final List list, final SenderGroup.ResultHandler handler) {
+ group.sendSync(list, sender, handler);
+ }
+
private void doExecute(final SenderGroup group, final List list, final SenderGroup.ResultHandler handler) {
group.send(list, sender, handler);
}
@@ -88,7 +101,7 @@ private SenderGroup getGroup(BrokerGroupInfo groupInfo, int sendThreads) {
String groupName = groupInfo.getGroupName();
SenderGroup senderGroup = groupSenders.get(groupName);
if (null == senderGroup) {
- senderGroup = new SenderGroup(groupInfo, sendThreads, store);
+ senderGroup = new SenderGroup(groupInfo, sendThreads, store, sendConfig);
SenderGroup currentSenderGroup = groupSenders.putIfAbsent(groupName, senderGroup);
senderGroup = null != currentSenderGroup ? currentSenderGroup : senderGroup;
} else {
diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java
index 1aaa8892..90e841e5 100644
--- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java
+++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java
@@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.common.Disposable;
+import qunar.tc.qmq.configuration.DynamicConfig;
import qunar.tc.qmq.delay.DelayLogFacade;
import qunar.tc.qmq.delay.ScheduleIndex;
import qunar.tc.qmq.delay.monitor.QMon;
@@ -36,10 +37,7 @@
import qunar.tc.qmq.protocol.producer.SendResult;
import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import static qunar.tc.qmq.delay.monitor.QMon.delayBrokerSendMsgCount;
@@ -52,12 +50,16 @@
public class SenderGroup implements Disposable {
private static final Logger LOGGER = LoggerFactory.getLogger(SenderGroup.class);
private static final int MAX_SEND_BATCH_SIZE = 50;
+ private static final int RETRY_FALL_BACK = 1;//refresh再来重试
+ private static final int RETRY_WITHOUT_FALL_BACK = 2;//不再refresh,直接在netty端重试
private final AtomicReference groupInfo;
private final DelayLogFacade store;
+ private DynamicConfig sendConfig;
private final ThreadPoolExecutor executorService;
private final RateLimiter LOG_LIMITER = RateLimiter.create(2);
- SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store) {
+ SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store, DynamicConfig sendConfig) {
+ this.sendConfig = sendConfig;
this.groupInfo = new AtomicReference<>(groupInfo);
this.store = store;
final LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(1);
@@ -69,7 +71,8 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
boolean success = false;
while (!success) {
try {
- success = workQueue.add(r);
+ success = workQueue.offer(r);
+ //success = workQueue.add(r); 这里会频繁抛异常,微秒级别性能会有问题。
} catch (Throwable ignore) {
}
@@ -78,27 +81,49 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
});
}
+ /**
+ * 绕过线程池直接执行
+ * @param records
+ * @param sender
+ * @param handler
+ */
+ public void sendSync(final List records, final Sender sender, final ResultHandler handler) {
+ monitorSyncSend(records);
+ doSend(records, sender, handler, RETRY_WITHOUT_FALL_BACK);
+ }
+
+ private static void monitorSyncSend(List records) {
+ try {
+ for (int i = 0; i < records.size(); i++) {
+ ScheduleIndex scheduleIndex = records.get(i);
+ QMon.syncSend(scheduleIndex.getSubject());
+ }
+ } catch (Throwable throwable) {
+
+ }
+ }
+
public void send(final List records, final Sender sender, final ResultHandler handler) {
- executorService.execute(() -> doSend(records, sender, handler));
+ executorService.execute(() -> doSend(records, sender, handler, RETRY_FALL_BACK));
}
- private void doSend(final List batch, final Sender sender, final ResultHandler handler) {
+ private void doSend(final List batch, final Sender sender, final ResultHandler handler, int retryStrategy) {
BrokerGroupInfo groupInfo = this.groupInfo.get();
String groupName = groupInfo.getGroupName();
List> partitions = Lists.partition(batch, MAX_SEND_BATCH_SIZE);
for (List partition : partitions) {
- send(sender, handler, groupInfo, groupName, partition);
+ send(sender, handler, groupInfo, groupName, partition, retryStrategy);
}
}
- private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List list) {
+ private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List list, int retryStrategy) {
try {
long start = System.currentTimeMillis();
List records = store.recoverLogRecord(list);
QMon.loadMsgTime(System.currentTimeMillis() - start);
- Datagram response = sendMessages(records, sender);
+ Datagram response = sendMessages(records, sender, retryStrategy);
release(records);
monitor(list, groupName);
if (response == null) {
@@ -182,21 +207,77 @@ private Map getSendResult(Datagram response) {
}
}
- private Datagram sendMessages(final List records, final Sender sender) {
+ private Datagram sendMessages(final List records, final Sender sender, int retryStrategy) {
long start = System.currentTimeMillis();
-
try {
- return sender.send(records, this);
+ //return sender.send(records, this);
+ return doSendDatagram(records, sender);
} catch (ClientSendException e) {
ClientSendException.SendErrorCode errorCode = e.getSendErrorCode();
- monitorSendError(records, groupInfo.get(), errorCode.ordinal());
- } catch (Exception e) {
- monitorSendError(records, groupInfo.get(), -1);
+ LOGGER.error("SenderGroup sendMessages error, client send exception, broker group={}, errorCode={}", groupInfo.get(), errorCode, e);
+ monitorSendError(records, groupInfo.get(), errorCode.ordinal(), e);
+ } catch (Throwable e) {
+ LOGGER.error("SenderGroup sendMessages error, broker group={}", groupInfo.get(), e);
+ monitorSendError(records, groupInfo.get(), -1, e);
} finally {
QMon.sendMsgTime(groupInfo.get().getGroupName(), System.currentTimeMillis() - start);
}
+ return retrySendMessages(records, sender, retryStrategy);
+ }
+
+ private Datagram doSendDatagram(List records, Sender sender) throws Exception {
+ return sender.send(records, this);
+ }
+
+ /**
+ * 如果想更优雅,可以参考guava-retry
+ * @param records
+ * @param sender
+ * @param retryStrategy
+ * @return
+ */
+ private Datagram retrySendMessages(List records, Sender sender, int retryStrategy) {
+ Datagram result = null;
+ int maxTimes = sendConfig.getInt("netty.send.message.retry.max.times", 20);
+ /**
+ * 如果是Refresh重试过来的,那就在这里一直重试netty
+ * 如果delay机器本身出问题,refresh也没用,就在这里卡住
+ */
+ if (retryStrategy == RETRY_WITHOUT_FALL_BACK) {
+ maxTimes = sendConfig.getInt("netty.send.message.retry.without.fallback.max.times", 36000);
+ }
+ for (int count = 0; count < maxTimes; count++) {
+ try {
+ QMon.retryNettySend();
+ result = doSendDatagram(records, sender);
+ if (result != null) {
+ break;
+ }
+ } catch (Throwable e) {
+ if (LOG_LIMITER.tryAcquire()) {
+ for (ScheduleSetRecord record : records) {
+ LOGGER.error("retry senderGroup sendMessages error, retry_count={}, subject={}, brokerGroup={}", count, record.getSubject(), groupInfo.get(), e);
+ }
+ }
+ QMon.retryNettySendError();
+ retryRandomSleep(retryStrategy);
+ }
+ }
+ return result;
+ }
+
+ private void retryRandomSleep(int retryStrategy) {
+ long randomMinSleepMillis = sendConfig.getLong("netty.send.message.retry.min.sleep.millis", 1000L);
+ long randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.max.sleep.millis", 10000L);
+ if (retryStrategy == RETRY_WITHOUT_FALL_BACK) {
+ randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.without.fallback.max.sleep.millis", 30000L);
+ }
+ final ThreadLocalRandom random = ThreadLocalRandom.current();
+ try {
+ Thread.sleep(random.nextLong(randomMinSleepMillis, randomMaxSleepMillis));
+ } catch (InterruptedException ex) {
- return null;
+ }
}
private void monitorSendFail(List indexList, String groupName) {
@@ -210,13 +291,15 @@ private void monitorSendFail(String subject, String groupName) {
QMon.nettySendMessageFailCount(subject, groupName);
}
- private void monitorSendError(List records, BrokerGroupInfo group, int errorCode) {
- records.parallelStream().forEach(record -> monitorSendError(record.getSubject(), group, errorCode));
+ private void monitorSendError(List records, BrokerGroupInfo group, int errorCode, Throwable throwable) {
+ for (ScheduleSetRecord record : records) {
+ monitorSendError(record.getSubject(), group, errorCode, throwable);
+ }
}
- private void monitorSendError(String subject, BrokerGroupInfo group, int errorCode) {
+ private void monitorSendError(String subject, BrokerGroupInfo group, int errorCode, Throwable throwable) {
if (LOG_LIMITER.tryAcquire()) {
- LOGGER.error("netty delay sender send error,subject:{},group:{},code:{}", subject, group, errorCode);
+ LOGGER.error("netty delay sender send error,subject:{},group:{},code:{}", subject, group, errorCode, throwable);
}
QMon.nettySendMessageFailCount(subject, group.getGroupName());
}
diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java
index f8ef3455..a281dc78 100644
--- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java
+++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java
@@ -16,6 +16,7 @@
package qunar.tc.qmq.delay.sender;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,11 +28,13 @@
import qunar.tc.qmq.delay.DelayLogFacade;
import qunar.tc.qmq.delay.ScheduleIndex;
import qunar.tc.qmq.delay.meta.BrokerRoleManager;
+import qunar.tc.qmq.delay.monitor.QMon;
import qunar.tc.qmq.delay.store.model.DispatchLogRecord;
import qunar.tc.qmq.delay.store.model.ScheduleSetRecord;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
@@ -73,26 +76,38 @@ public void init() {
this.batchExecutor.init();
}
+ private void sendSync(ScheduleIndex index) {
+ if (!BrokerRoleManager.isDelayMaster()) {
+ return;
+ }
+ syncProcess(Lists.newArrayList(index));
+ }
+
@Override
public void send(ScheduleIndex index) {
if (!BrokerRoleManager.isDelayMaster()) {
return;
}
-
boolean add;
- try {
- long waitTime = Math.abs(sendWaitTime);
- if (waitTime > 0) {
- add = batchExecutor.addItem(index, waitTime, TimeUnit.MILLISECONDS);
- } else {
- add = batchExecutor.addItem(index);
+ do {
+ try {
+ long waitTime = Math.abs(sendWaitTime);
+ if (waitTime > 0) {
+ add = batchExecutor.addItem(index, waitTime, TimeUnit.MILLISECONDS);
+ } else {
+ add = batchExecutor.addItem(index);
+ }
+ if (!add) {
+ QMon.sendBatchExecutorAddFailed(index.getSubject());
+ long sleepBaseMillis = config.getLong("delay.send.batch.executor.sleep.min.millis", 500L);
+ final ThreadLocalRandom random = ThreadLocalRandom.current();
+ long sleepMillis = sleepBaseMillis + random.nextInt(500);
+ Thread.sleep(sleepMillis);
+ }
+ } catch (InterruptedException e) {
+ return;
}
- } catch (InterruptedException e) {
- return;
- }
- if (!add) {
- reject(index);
- }
+ } while (!add);
}
@Override
@@ -105,8 +120,13 @@ public void process(List indexList) {
}
}
- private void reject(ScheduleIndex index) {
- send(index);
+ private void syncProcess(List indexList) {
+ try {
+ senderExecutor.syncExecute(indexList, this, brokerService);
+ } catch (Exception e) {
+ LOGGER.error("send message failed,messageSize:{} will retry", indexList.size(), e);
+ retry(indexList);
+ }
}
private void success(ScheduleSetRecord record) {
@@ -117,6 +137,7 @@ private void retry(List records, Set messageIds) {
final Set refreshSubject = Sets.newHashSet();
for (ScheduleSetRecord record : records) {
if (messageIds.contains(record.getMessageId())) {
+ QMon.sendProcessorPartlyFailed();
ScheduleIndex index = new ScheduleIndex(record.getSubject(), record.getScheduleTime(), record.getStartWroteOffset(), record.getRecordSize(), record.getSequence());
refresh(index, refreshSubject);
send(index);
@@ -134,7 +155,7 @@ private void retry(List indexList) {
final Set refreshSubject = Sets.newHashSet();
for (ScheduleIndex index : indexList) {
refresh(index, refreshSubject);
- send(index);
+ sendSync(index);
}
}
@@ -153,6 +174,7 @@ public void success(List indexList, Set messageIds) {
@Override
public void fail(List indexList) {
+ QMon.sendProcessorFailed();
retry(indexList);
}
diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/startup/ServerWrapper.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/startup/ServerWrapper.java
index b9a9a063..ba500817 100644
--- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/startup/ServerWrapper.java
+++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/startup/ServerWrapper.java
@@ -76,19 +76,19 @@ public class ServerWrapper implements Disposable {
private DynamicConfig config;
private DefaultStoreConfiguration storeConfig;
- public void start(boolean autoOnline) {
- init();
- register();
- startServer();
- sync();
- if (autoOnline)
- online();
- }
-
- public void online() {
- BrokerConfig.markAsWritable();
- brokerRegisterService.healthSwitch(true);
- }
+ public void start(boolean autoOnline) {
+ init();
+ register();
+ startServer();
+ sync();
+ if (autoOnline)
+ online();
+ }
+
+ public void online() {
+ BrokerConfig.markAsWritable();
+ brokerRegisterService.healthSwitch(true);
+ }
private void init() {
this.config = DynamicConfigLoader.load("delay.properties");
@@ -122,7 +122,6 @@ private boolean iterateCallback(final ScheduleIndex index) {
wheelTickManager.addWHeel(index);
return true;
}
-
return false;
}
diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/store/log/ScheduleLog.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/store/log/ScheduleLog.java
index 479ff2db..9f40b3f7 100644
--- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/store/log/ScheduleLog.java
+++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/store/log/ScheduleLog.java
@@ -23,6 +23,7 @@
import qunar.tc.qmq.delay.ScheduleIndex;
import qunar.tc.qmq.delay.base.LongHashSet;
import qunar.tc.qmq.delay.config.StoreConfiguration;
+import qunar.tc.qmq.delay.monitor.QMon;
import qunar.tc.qmq.delay.store.DefaultDelaySegmentValidator;
import qunar.tc.qmq.delay.store.ScheduleLogValidatorSupport;
import qunar.tc.qmq.delay.store.appender.ScheduleSetAppender;
@@ -71,25 +72,30 @@ private void reValidate(int singleMessageLimitSize) {
@Override
public AppendLogResult append(LogRecord record) {
- if (!open.get()) {
- return new AppendLogResult<>(MessageProducerCode.STORE_ERROR, "schedule log closed");
- }
- AppendLogResult> result = scheduleSet.append(record);
- int code = result.getCode();
- if (MessageProducerCode.SUCCESS != code) {
- LOGGER.error("appendMessageLog schedule set error,log:{} {},code:{}", record.getSubject(), record.getMessageId(), code);
- return new AppendLogResult<>(MessageProducerCode.STORE_ERROR, "appendScheduleSetError");
- }
+ long start = System.currentTimeMillis();
+ try {
+ if (!open.get()) {
+ return new AppendLogResult<>(MessageProducerCode.STORE_ERROR, "schedule log closed");
+ }
+ AppendLogResult> result = scheduleSet.append(record);
+ int code = result.getCode();
+ if (MessageProducerCode.SUCCESS != code) {
+ LOGGER.error("appendMessageLog schedule set error,log:{} {},code:{}", record.getSubject(), record.getMessageId(), code);
+ return new AppendLogResult<>(MessageProducerCode.STORE_ERROR, "appendScheduleSetError");
+ }
- RecordResult recordResult = result.getAdditional();
- ScheduleIndex index = new ScheduleIndex(
- record.getSubject(),
- record.getScheduleTime(),
- recordResult.getResult().getWroteOffset(),
- recordResult.getResult().getWroteBytes(),
- recordResult.getResult().getAdditional().getSequence());
+ RecordResult recordResult = result.getAdditional();
+ ScheduleIndex index = new ScheduleIndex(
+ record.getSubject(),
+ record.getScheduleTime(),
+ recordResult.getResult().getWroteOffset(),
+ recordResult.getResult().getWroteBytes(),
+ recordResult.getResult().getAdditional().getSequence());
- return new AppendLogResult<>(MessageProducerCode.SUCCESS, "", index);
+ return new AppendLogResult<>(MessageProducerCode.SUCCESS, "", index);
+ } finally {
+ QMon.appendScheduleLogTime(record.getSubject(), System.currentTimeMillis() - start);
+ }
}
@Override
diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sync/slave/SlaveSynchronizer.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sync/slave/SlaveSynchronizer.java
index 00ab41de..d734471c 100644
--- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sync/slave/SlaveSynchronizer.java
+++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sync/slave/SlaveSynchronizer.java
@@ -104,8 +104,8 @@ void shutdown() {
@Override
public void run() {
- final long start = System.currentTimeMillis();
while (running) {
+ final long start = System.currentTimeMillis();
try {
sync();
} catch (Throwable e) {
diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/HashedWheelTimer.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/HashedWheelTimer.java
index 919614f3..6f914d29 100644
--- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/HashedWheelTimer.java
+++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/HashedWheelTimer.java
@@ -17,7 +17,10 @@
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import qunar.tc.qmq.delay.ScheduleIndex;
+import qunar.tc.qmq.delay.monitor.QMon;
import java.util.Collections;
import java.util.HashSet;
@@ -30,6 +33,9 @@
@SuppressWarnings("all")
public class HashedWheelTimer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HashedWheelTimer.class);
+
private static final AtomicIntegerFieldUpdater WORKER_STATE_UPDATER;
static {
@@ -168,6 +174,9 @@ public Set stop() {
return worker.unprocessedTimeouts();
}
+ /**
+ * java.lang.IllegalStateException: Queue full---队列满了会抛这个异常
+ */
public void newTimeout(ScheduleIndex index, long delay, TimeUnit unit) {
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
HashedWheelTimeout timeout = new HashedWheelTimeout(this, index, deadline);
@@ -340,7 +349,13 @@ public void expire() {
return;
}
- timer.expire(entity);
+ try {
+ timer.expire(entity);
+ } catch (Throwable throwable) {
+ LOGGER.error("timer expire op error, entityInfo:subject={},offset={},scheduleTime={}",
+ entity.getSubject(), entity.getOffset(), entity.getScheduleTime(), throwable);
+ QMon.hashedWheelTimerExpireError();
+ }
}
@Override
diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/WheelTickManager.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/WheelTickManager.java
index 8feb867f..c8b34394 100644
--- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/WheelTickManager.java
+++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/wheel/WheelTickManager.java
@@ -119,6 +119,11 @@ private void doRecover(DispatchLogSegment dispatchLogSegment) {
loadedCursor.shiftCursor(baseOffset);
}
+ /**
+ * 看看哪些已经dispatch过了,recover的时候就忽略
+ * @param currentDispatchLog
+ * @return
+ */
private LongHashSet loadDispatchLog(final DispatchLogSegment currentDispatchLog) {
LogVisitor visitor = currentDispatchLog.newVisitor(0);
final LongHashSet recordSet = new LongHashSet(currentDispatchLog.entries());
@@ -138,6 +143,9 @@ private boolean isStarted() {
return started.get();
}
+ /**
+ * 周期性的兜底---每隔30分钟执行一次,
+ */
private void load() {
long next = System.currentTimeMillis() + config.getLoadInAdvanceTimesInMillis();
long prepareLoadBaseOffset = resolveSegment(next, segmentScale);
@@ -266,9 +274,10 @@ private void refresh(ScheduleIndex index) {
try {
scheduleTime = index.getScheduleTime();
timer.newTimeout(index, scheduleTime - now, TimeUnit.MILLISECONDS);
- } catch (Throwable e) {
- LOGGER.error("wheel refresh error, scheduleTime:{}, delay:{}", scheduleTime, scheduleTime - now);
- throw Throwables.propagate(e);
+ } catch (Throwable throwable) {
+ LOGGER.error("wheel refresh error, scheduleTime:{}, delay:{}", scheduleTime, scheduleTime - now, throwable);
+ QMon.addWheelFailed(index.getSubject());
+ throw Throwables.propagate(throwable);
}
}
@@ -284,7 +293,22 @@ public void shutdown() {
}
public void addWHeel(ScheduleIndex index) {
- refresh(index);
+ boolean result = false;
+ do {
+ try {
+ //Wheel里面的队列满了,需要等待重试,这里卡住
+ refresh(index);
+ result = true;
+ } catch (Throwable throwable) {
+ LOGGER.error("add wheel temp error, {}, {}, {}", index.getSubject(), index.getScheduleTime(), index.getOffset(), throwable);
+ try {
+ long sleepMillis = config.getConfig().getLong("add.wheel.exception.sleep.millis", 500L);
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException e) {
+ LOGGER.error("add wheel temp sleep error", e);
+ }
+ }
+ } while (!result);
}
public boolean canAdd(long scheduleTime, long offset) {
diff --git a/qmq-deploy/pom.xml b/qmq-deploy/pom.xml
index b33155a0..3588da0b 100644
--- a/qmq-deploy/pom.xml
+++ b/qmq-deploy/pom.xml
@@ -4,7 +4,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
4.0.0
diff --git a/qmq-dist/pom.xml b/qmq-dist/pom.xml
index 95685fe7..beb21904 100644
--- a/qmq-dist/pom.xml
+++ b/qmq-dist/pom.xml
@@ -6,7 +6,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-dist
diff --git a/qmq-gateway/pom.xml b/qmq-gateway/pom.xml
index 66cb2842..ab61c780 100644
--- a/qmq-gateway/pom.xml
+++ b/qmq-gateway/pom.xml
@@ -7,7 +7,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-gateway
diff --git a/qmq-metaserver/pom.xml b/qmq-metaserver/pom.xml
index cc2c459d..7e047b7a 100644
--- a/qmq-metaserver/pom.xml
+++ b/qmq-metaserver/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-metaserver
diff --git a/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/ClientMetaInfoStoreImpl.java b/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/ClientMetaInfoStoreImpl.java
index 1551a52d..161bd571 100644
--- a/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/ClientMetaInfoStoreImpl.java
+++ b/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/ClientMetaInfoStoreImpl.java
@@ -36,7 +36,7 @@ public class ClientMetaInfoStoreImpl implements ClientMetaInfoStore {
public List queryConsumer(String subject) {
return jdbcTemplate.query("SELECT id,subject_info,client_type,consumer_group,client_id,app_code,room FROM client_meta_info WHERE subject_info=? AND client_type=?", (rs, rowNum) -> {
final ClientMetaInfo meta = new ClientMetaInfo();
- meta.setId(rs.getInt("id"));
+ meta.setId(rs.getLong("id"));
meta.setSubject(rs.getString("subject_info"));
meta.setClientTypeCode(rs.getInt("client_type"));
meta.setConsumerGroup(rs.getString("consumer_group"));
diff --git a/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/DatabaseStore.java b/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/DatabaseStore.java
index f03038be..316a710b 100644
--- a/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/DatabaseStore.java
+++ b/qmq-metaserver/src/main/java/qunar/tc/qmq/meta/store/impl/DatabaseStore.java
@@ -231,7 +231,7 @@ public void insertClientMetaInfo(MetaInfoRequest request) {
private static final RowMapper CLIENT_META_INFO_ROW_MAPPER= (rs, rowNum) -> {
final ClientMetaInfo clientMetaInfo = new ClientMetaInfo();
- clientMetaInfo.setId(rs.getInt("id"));
+ clientMetaInfo.setId(rs.getLong("id"));
clientMetaInfo.setSubject(rs.getString("subject_info"));
clientMetaInfo.setClientTypeCode(rs.getInt("client_type"));
clientMetaInfo.setConsumerGroup(rs.getString("consumer_group"));
diff --git a/qmq-metrics-prometheus/pom.xml b/qmq-metrics-prometheus/pom.xml
index 90ad14eb..f681248e 100644
--- a/qmq-metrics-prometheus/pom.xml
+++ b/qmq-metrics-prometheus/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-metrics-prometheus
diff --git a/qmq-remoting/pom.xml b/qmq-remoting/pom.xml
index 388a5fc0..273b0bec 100644
--- a/qmq-remoting/pom.xml
+++ b/qmq-remoting/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-remoting
diff --git a/qmq-server-common/pom.xml b/qmq-server-common/pom.xml
index f81feb39..2abd99ea 100644
--- a/qmq-server-common/pom.xml
+++ b/qmq-server-common/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-server-common
diff --git a/qmq-server-common/src/main/java/qunar/tc/qmq/concurrent/ActorSystem.java b/qmq-server-common/src/main/java/qunar/tc/qmq/concurrent/ActorSystem.java
index b76c12be..d67b1532 100644
--- a/qmq-server-common/src/main/java/qunar/tc/qmq/concurrent/ActorSystem.java
+++ b/qmq-server-common/src/main/java/qunar/tc/qmq/concurrent/ActorSystem.java
@@ -128,6 +128,7 @@ public static class Actor implements Runnable, Comparable {
private final String name;
private long total;
private volatile long submitTs;
+ private volatile long executeTs;
//通过Unsafe操作
private volatile int status;
@@ -148,6 +149,7 @@ boolean dispatch(E message) {
@Override
public void run() {
long start = System.currentTimeMillis();
+ executeTs = start;
String old = Thread.currentThread().getName();
try {
Thread.currentThread().setName(systemName + "-" + name);
@@ -228,8 +230,16 @@ private boolean updateStatus(int oldStatus, int newStatus) {
@Override
public int compareTo(Actor o) {
+ /*
+ 原策略"TotalDuration"会存在如下问题:
+ 当一个突发性的高QPS消息过来,由于之前量小,total值很低,所以每次都会优先执行这个消息,导致CPU被全部拿走,进而导致其他消息无法被拉取
int result = Long.compare(total, o.total);
return result == 0 ? Long.compare(submitTs, o.submitTs) : result;
+
+ 改进策略"LastExecuteTimestamp":
+ 记录executeTs“上次执行时间戳”,使用“上次执行时间戳”进行比较,如果“上次执行时间戳”相同,再降级到total
+ */
+ return ActorCompareWay.LastExecuteTimestamp.compare(this, o);
}
@Override
@@ -247,6 +257,26 @@ public int hashCode() {
}
}
+ private enum ActorCompareWay {
+
+ LastExecuteTimestamp {
+ @Override
+ public int compare(Actor a1, Actor a2) {
+ int result = Long.compare(a1.executeTs, a2.executeTs);
+ return result == 0 ? Long.compare(a1.total, a2.total) : result;
+ }
+ },
+ TotalDuration {
+ @Override
+ public int compare(Actor a1, Actor a2) {
+ int result = Long.compare(a1.total, a2.total);
+ return result == 0 ? Long.compare(a1.submitTs, a2.submitTs) : result;
+ }
+ };
+
+ public abstract int compare(Actor a1, Actor a2);
+ }
+
/**
* Copyright (C) 2009-2016 Lightbend Inc.
*/
diff --git a/qmq-server/pom.xml b/qmq-server/pom.xml
index 22da86be..1fb676fa 100644
--- a/qmq-server/pom.xml
+++ b/qmq-server/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-server
diff --git a/qmq-store/pom.xml b/qmq-store/pom.xml
index 52123dc1..de26e4ff 100644
--- a/qmq-store/pom.xml
+++ b/qmq-store/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-store
diff --git a/qmq-store/src/main/java/qunar/tc/qmq/store/PullLogManager.java b/qmq-store/src/main/java/qunar/tc/qmq/store/PullLogManager.java
index e5e5bb59..5a54990f 100644
--- a/qmq-store/src/main/java/qunar/tc/qmq/store/PullLogManager.java
+++ b/qmq-store/src/main/java/qunar/tc/qmq/store/PullLogManager.java
@@ -17,6 +17,7 @@
package qunar.tc.qmq.store;
import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,7 +130,11 @@ public Table getLogs() {
public void flush() {
final long start = System.currentTimeMillis();
try {
- for (final PullLog log : logs.values()) {
+ /**
+ * 报java.util.ConcurrentModificationException,需要copy
+ */
+ Collection pullLogs = ImmutableList.copyOf(logs.values());
+ for (final PullLog log : pullLogs) {
log.flush();
}
} finally {
@@ -171,7 +176,8 @@ private void remove(String subject, String group, String consumerId) {
@Override
public void close() {
- for (final PullLog log : logs.values()) {
+ Collection pullLogs = ImmutableList.copyOf(logs.values());
+ for (final PullLog log : pullLogs) {
log.close();
}
}
diff --git a/qmq-sync/pom.xml b/qmq-sync/pom.xml
index a858fdf1..bbab2997 100644
--- a/qmq-sync/pom.xml
+++ b/qmq-sync/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-sync
diff --git a/qmq-tools/pom.xml b/qmq-tools/pom.xml
index 3d40c83f..37e1fa91 100644
--- a/qmq-tools/pom.xml
+++ b/qmq-tools/pom.xml
@@ -5,7 +5,7 @@
qmq-parent
com.qunar.qmq
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-tools
diff --git a/qmq-watchdog/pom.xml b/qmq-watchdog/pom.xml
index f748655b..d187f6eb 100644
--- a/qmq-watchdog/pom.xml
+++ b/qmq-watchdog/pom.xml
@@ -5,7 +5,7 @@
com.qunar.qmq
qmq-parent
- 1.1.43
+ 1.1.44-SNAPSHOT
qmq-watchdog