Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DelayQueue重试优化-完善监控和日志 #177

Merged
merged 8 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.qunar.qmq</groupId>
<artifactId>qmq-parent</artifactId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
<packaging>pom</packaging>

<name>qmq</name>
Expand Down
2 changes: 1 addition & 1 deletion qmq-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
</parent>

<artifactId>qmq-api</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-backup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.qunar.qmq</groupId>
<artifactId>qmq-parent</artifactId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion qmq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
</parent>

<artifactId>qmq-client</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
</parent>

<artifactId>qmq-common</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-delay-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.qunar.qmq</groupId>
<artifactId>qmq-parent</artifactId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
</parent>

<artifactId>qmq-delay-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,40 @@ public static void appendFailedByMessageIllegal(String subject) {
public static void processTime(String subject, long time) {
Metrics.timer("processTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS);
}

public static void sendBatchExecutorAddFailed(String subject) {
countInc("sendBatchExecutorAddFailed", subject);
}

public static void hashedWheelTimerExpireError() {
Metrics.counter("hashed_wheel_timer_expire_error", EMPTY, EMPTY).inc();
}

public static void addWheelFailed(String subject) {
countInc("addWheelFailed", subject);
}

public static void sendProcessorFailed() {
Metrics.counter("sendProcessorFailed", EMPTY, EMPTY).inc();
}

public static void sendProcessorPartlyFailed() {
Metrics.counter("sendProcessorPartlyFailed", EMPTY, EMPTY).inc();
}

public static void appendScheduleLogTime(String subject, long time) {
Metrics.timer("appendScheduleLogTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS);
}

public static void retryNettySend() {
Metrics.counter("retryNettySend", EMPTY, EMPTY).inc();
}

public static void retryNettySendError() {
Metrics.counter("retryNettySendError", EMPTY, EMPTY).inc();
}

public static void syncSend(String subject) {
Metrics.counter("sync_send_count", SUBJECT_ARRAY, new String[] {subject}).inc();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ ListenableFuture<Datagram> receive(List<RawMessageExtend> messages, RemotingComm
for (RawMessageExtend message : messages) {
final MessageHeader header = message.getHeader();
monitorMessageReceived(header.getCreateTime(), header.getSubject());

final ReceivedDelayMessage receivedDelayMessage = new ReceivedDelayMessage(message, cmd.getReceiveTime());
futures.add(receivedDelayMessage.getPromise());
invoker.invoke(receivedDelayMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class SenderExecutor implements Disposable {
private final Sender sender;
private final DelayLogFacade store;
private final int sendThreads;
private DynamicConfig sendConfig;

SenderExecutor(final Sender sender, DelayLogFacade store, DynamicConfig sendConfig) {
this.sender = sender;
this.store = store;
this.brokerLoadBalance = PollBrokerLoadBalance.getInstance();
this.sendThreads = sendConfig.getInt("delay.send.threads", DEFAULT_SEND_THREAD);
this.sendConfig = sendConfig;
}

void execute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) {
Expand All @@ -61,6 +63,17 @@ void execute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandle
}
}

public void syncExecute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) {
Map<SenderGroup, List<ScheduleIndex>> groups = groupByBroker(indexList, brokerService);
for (Map.Entry<SenderGroup, List<ScheduleIndex>> entry : groups.entrySet()) {
doSyncExecute(entry.getKey(), entry.getValue(), handler);
}
}

private void doSyncExecute(final SenderGroup group, final List<ScheduleIndex> list, final SenderGroup.ResultHandler handler) {
group.sendSync(list, sender, handler);
}

private void doExecute(final SenderGroup group, final List<ScheduleIndex> list, final SenderGroup.ResultHandler handler) {
group.send(list, sender, handler);
}
Expand Down Expand Up @@ -88,7 +101,7 @@ private SenderGroup getGroup(BrokerGroupInfo groupInfo, int sendThreads) {
String groupName = groupInfo.getGroupName();
SenderGroup senderGroup = groupSenders.get(groupName);
if (null == senderGroup) {
senderGroup = new SenderGroup(groupInfo, sendThreads, store);
senderGroup = new SenderGroup(groupInfo, sendThreads, store, sendConfig);
SenderGroup currentSenderGroup = groupSenders.putIfAbsent(groupName, senderGroup);
senderGroup = null != currentSenderGroup ? currentSenderGroup : senderGroup;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.common.Disposable;
import qunar.tc.qmq.configuration.DynamicConfig;
import qunar.tc.qmq.delay.DelayLogFacade;
import qunar.tc.qmq.delay.ScheduleIndex;
import qunar.tc.qmq.delay.monitor.QMon;
Expand All @@ -36,10 +37,7 @@
import qunar.tc.qmq.protocol.producer.SendResult;

import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import static qunar.tc.qmq.delay.monitor.QMon.delayBrokerSendMsgCount;
Expand All @@ -52,12 +50,16 @@
public class SenderGroup implements Disposable {
private static final Logger LOGGER = LoggerFactory.getLogger(SenderGroup.class);
private static final int MAX_SEND_BATCH_SIZE = 50;
private static final int RETRY_FALL_BACK = 1;//refresh再来重试
private static final int RETRY_WITHOUT_FALL_BACK = 2;//不再refresh,直接在netty端重试
private final AtomicReference<BrokerGroupInfo> groupInfo;
private final DelayLogFacade store;
private DynamicConfig sendConfig;
private final ThreadPoolExecutor executorService;
private final RateLimiter LOG_LIMITER = RateLimiter.create(2);

SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store) {
SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store, DynamicConfig sendConfig) {
this.sendConfig = sendConfig;
this.groupInfo = new AtomicReference<>(groupInfo);
this.store = store;
final LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(1);
Expand All @@ -69,7 +71,8 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
boolean success = false;
while (!success) {
try {
success = workQueue.add(r);
success = workQueue.offer(r);
//success = workQueue.add(r); 这里会频繁抛异常,微秒级别性能会有问题。
} catch (Throwable ignore) {

}
Expand All @@ -78,27 +81,49 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
});
}

/**
* 绕过线程池直接执行
* @param records
* @param sender
* @param handler
*/
public void sendSync(final List<ScheduleIndex> records, final Sender sender, final ResultHandler handler) {
monitorSyncSend(records);
doSend(records, sender, handler, RETRY_WITHOUT_FALL_BACK);
}

private static void monitorSyncSend(List<ScheduleIndex> records) {
try {
for (int i = 0; i < records.size(); i++) {
ScheduleIndex scheduleIndex = records.get(i);
QMon.syncSend(scheduleIndex.getSubject());
}
} catch (Throwable throwable) {

}
}

public void send(final List<ScheduleIndex> records, final Sender sender, final ResultHandler handler) {
executorService.execute(() -> doSend(records, sender, handler));
executorService.execute(() -> doSend(records, sender, handler, RETRY_FALL_BACK));
}

private void doSend(final List<ScheduleIndex> batch, final Sender sender, final ResultHandler handler) {
private void doSend(final List<ScheduleIndex> batch, final Sender sender, final ResultHandler handler, int retryStrategy) {
BrokerGroupInfo groupInfo = this.groupInfo.get();
String groupName = groupInfo.getGroupName();
List<List<ScheduleIndex>> partitions = Lists.partition(batch, MAX_SEND_BATCH_SIZE);

for (List<ScheduleIndex> partition : partitions) {
send(sender, handler, groupInfo, groupName, partition);
send(sender, handler, groupInfo, groupName, partition, retryStrategy);
}
}

private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List<ScheduleIndex> list) {
private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List<ScheduleIndex> list, int retryStrategy) {
try {
long start = System.currentTimeMillis();
List<ScheduleSetRecord> records = store.recoverLogRecord(list);
QMon.loadMsgTime(System.currentTimeMillis() - start);

Datagram response = sendMessages(records, sender);
Datagram response = sendMessages(records, sender, retryStrategy);
release(records);
monitor(list, groupName);
if (response == null) {
Expand Down Expand Up @@ -182,21 +207,77 @@ private Map<String, SendResult> getSendResult(Datagram response) {
}
}

private Datagram sendMessages(final List<ScheduleSetRecord> records, final Sender sender) {
private Datagram sendMessages(final List<ScheduleSetRecord> records, final Sender sender, int retryStrategy) {
long start = System.currentTimeMillis();

try {
return sender.send(records, this);
//return sender.send(records, this);
return doSendDatagram(records, sender);
} catch (ClientSendException e) {
ClientSendException.SendErrorCode errorCode = e.getSendErrorCode();
monitorSendError(records, groupInfo.get(), errorCode.ordinal());
} catch (Exception e) {
monitorSendError(records, groupInfo.get(), -1);
LOGGER.error("SenderGroup sendMessages error, client send exception, broker group={}, errorCode={}", groupInfo.get(), errorCode, e);
monitorSendError(records, groupInfo.get(), errorCode.ordinal(), e);
} catch (Throwable e) {
LOGGER.error("SenderGroup sendMessages error, broker group={}", groupInfo.get(), e);
monitorSendError(records, groupInfo.get(), -1, e);
} finally {
QMon.sendMsgTime(groupInfo.get().getGroupName(), System.currentTimeMillis() - start);
}
return retrySendMessages(records, sender, retryStrategy);
}

private Datagram doSendDatagram(List<ScheduleSetRecord> records, Sender sender) throws Exception {
return sender.send(records, this);
}

/**
* 如果想更优雅,可以参考guava-retry
* @param records
* @param sender
* @param retryStrategy
* @return
*/
private Datagram retrySendMessages(List<ScheduleSetRecord> records, Sender sender, int retryStrategy) {
Datagram result = null;
int maxTimes = sendConfig.getInt("netty.send.message.retry.max.times", 20);
/**
* 如果是Refresh重试过来的,那就在这里一直重试netty
* 如果delay机器本身出问题,refresh也没用,就在这里卡住
*/
if (retryStrategy == RETRY_WITHOUT_FALL_BACK) {
maxTimes = sendConfig.getInt("netty.send.message.retry.without.fallback.max.times", 36000);
}
for (int count = 0; count < maxTimes; count++) {
try {
QMon.retryNettySend();
result = doSendDatagram(records, sender);
if (result != null) {
break;
}
} catch (Throwable e) {
if (LOG_LIMITER.tryAcquire()) {
for (ScheduleSetRecord record : records) {
LOGGER.error("retry senderGroup sendMessages error, retry_count={}, subject={}, brokerGroup={}", count, record.getSubject(), groupInfo.get(), e);
}
}
QMon.retryNettySendError();
retryRandomSleep(retryStrategy);
}
}
return result;
}

private void retryRandomSleep(int retryStrategy) {
long randomMinSleepMillis = sendConfig.getLong("netty.send.message.retry.min.sleep.millis", 1000L);
long randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.max.sleep.millis", 10000L);
if (retryStrategy == RETRY_WITHOUT_FALL_BACK) {
randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.without.fallback.max.sleep.millis", 30000L);
}
final ThreadLocalRandom random = ThreadLocalRandom.current();
try {
Thread.sleep(random.nextLong(randomMinSleepMillis, randomMaxSleepMillis));
} catch (InterruptedException ex) {

return null;
}
}

private void monitorSendFail(List<ScheduleIndex> indexList, String groupName) {
Expand All @@ -210,13 +291,15 @@ private void monitorSendFail(String subject, String groupName) {
QMon.nettySendMessageFailCount(subject, groupName);
}

private void monitorSendError(List<ScheduleSetRecord> records, BrokerGroupInfo group, int errorCode) {
records.parallelStream().forEach(record -> monitorSendError(record.getSubject(), group, errorCode));
private void monitorSendError(List<ScheduleSetRecord> records, BrokerGroupInfo group, int errorCode, Throwable throwable) {
for (ScheduleSetRecord record : records) {
monitorSendError(record.getSubject(), group, errorCode, throwable);
}
}

private void monitorSendError(String subject, BrokerGroupInfo group, int errorCode) {
private void monitorSendError(String subject, BrokerGroupInfo group, int errorCode, Throwable throwable) {
if (LOG_LIMITER.tryAcquire()) {
LOGGER.error("netty delay sender send error,subject:{},group:{},code:{}", subject, group, errorCode);
LOGGER.error("netty delay sender send error,subject:{},group:{},code:{}", subject, group, errorCode, throwable);
}
QMon.nettySendMessageFailCount(subject, group.getGroupName());
}
Expand Down
Loading