From 15e3d7b7715d49242423b3b5061aa67fb7d02578 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Fri, 6 Dec 2019 16:34:42 +0800 Subject: [PATCH 1/5] refactor(rebalance): use smart_ptr to manage pull request --- include/DefaultMQPullConsumer.h | 14 +- include/DefaultMQPushConsumer.h | 10 +- include/MQConsumer.h | 2 +- src/MQClientFactory.cpp | 4 +- src/consumer/AllocateMQStrategy.h | 2 +- .../ConsumeMessageConcurrentlyService.cpp | 25 +- src/consumer/ConsumeMessageOrderlyService.cpp | 31 +- src/consumer/ConsumeMsgService.h | 16 +- src/consumer/DefaultMQPullConsumer.cpp | 6 +- src/consumer/DefaultMQPushConsumer.cpp | 138 +++++---- src/consumer/FindBrokerResult.h | 2 +- src/consumer/OffsetStore.cpp | 2 +- src/consumer/OffsetStore.h | 2 +- src/consumer/PullAPIWrapper.cpp | 2 +- src/consumer/PullAPIWrapper.h | 2 +- src/consumer/PullRequest.cpp | 2 +- src/consumer/PullRequest.h | 2 +- src/consumer/PullResult.cpp | 2 +- src/consumer/PullResultExt.h | 2 +- src/consumer/Rebalance.cpp | 283 ++++++++++-------- src/consumer/Rebalance.h | 29 +- src/consumer/SubscriptionData.cpp | 4 +- src/consumer/SubscriptionData.h | 2 +- 23 files changed, 349 insertions(+), 235 deletions(-) diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h index 4f6ef92d8..006c42a1c 100644 --- a/include/DefaultMQPullConsumer.h +++ b/include/DefaultMQPullConsumer.h @@ -54,7 +54,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer { virtual void getSubscriptions(std::vector&); virtual void updateConsumeOffset(const MQMessageQueue& mq, int64 offset); virtual void removeConsumeOffset(const MQMessageQueue& mq); - virtual void producePullMsgTask(PullRequest*); + virtual bool producePullMsgTask(boost::weak_ptr pullRequest); virtual Rebalance* getRebalance() const; // mqs); diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h index 61fcde18b..5c5f4cad7 100644 --- a/include/DefaultMQPushConsumer.h +++ b/include/DefaultMQPushConsumer.h @@ -86,13 +86,13 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer { virtual Rebalance* getRebalance() const; ConsumeMsgService* getConsumerMsgService() const; - virtual void producePullMsgTask(PullRequest*); - void triggerNextPullRequest(boost::asio::deadline_timer* t, PullRequest* request); + virtual bool producePullMsgTask(boost::weak_ptr); + void triggerNextPullRequest(boost::asio::deadline_timer* t, boost::weak_ptr); void runPullMsgQueue(TaskQueue* pTaskQueue); - void pullMessage(PullRequest* pullrequest); // sync pullMsg - void pullMessageAsync(PullRequest* pullrequest); // async pullMsg + void pullMessage(boost::weak_ptr pullrequest); // sync pullMsg + void pullMessageAsync(boost::weak_ptr pullrequest); // async pullMsg void setAsyncPull(bool asyncFlag); - AsyncPullCallback* getAsyncPullCallBack(PullRequest* request, MQMessageQueue msgQueue); + AsyncPullCallback* getAsyncPullCallBack(boost::weak_ptr, MQMessageQueue msgQueue); void shutdownAsyncPullCallBack(); /* diff --git a/include/MQConsumer.h b/include/MQConsumer.h index cc25327ab..b6cd613bf 100644 --- a/include/MQConsumer.h +++ b/include/MQConsumer.h @@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API MQConsumer : public MQClient { virtual ConsumeType getConsumeType() = 0; virtual ConsumeFromWhere getConsumeFromWhere() = 0; virtual void getSubscriptions(std::vector&) = 0; - virtual void producePullMsgTask(PullRequest*) = 0; + virtual bool producePullMsgTask(boost::weak_ptr) = 0; virtual Rebalance* getRebalance() const = 0; virtual PullResult pull(const MQMessageQueue& mq, const std::string& subExpression, int64 offset, int maxNums) = 0; virtual void pull(const MQMessageQueue& mq, diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index 99c4ffde8..1c00f8039 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -1040,7 +1040,9 @@ void MQClientFactory::resetOffset(const string& group, for (; it != offsetTable.end(); ++it) { MQMessageQueue mq = it->first; - PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq); + boost::weak_ptr pullRequest = pConsumer->getRebalance()->getPullRequest(mq); + boost::shared_ptr pullreq = pullRequest.lock(); + // PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq); if (pullreq) { pullreq->setDroped(true); LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data()); diff --git a/src/consumer/AllocateMQStrategy.h b/src/consumer/AllocateMQStrategy.h index 4613040e9..e24966c84 100644 --- a/src/consumer/AllocateMQStrategy.h +++ b/src/consumer/AllocateMQStrategy.h @@ -92,5 +92,5 @@ class AllocateMQAveragely : public AllocateMQStrategy { }; //getMessageListenerType(); } -void ConsumeMessageConcurrentlyService::submitConsumeRequest(PullRequest* request, vector& msgs) { +void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr pullRequest, + vector& msgs) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + LOG_WARN("Pull request has been released"); + return; + } + if (request->isDroped()) { + LOG_INFO("Pull request for %s is dropped, which will be released in next re-balance.", + request->m_messageQueue.toString().c_str()); + return; + } m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest, this, request, msgs)); } -void ConsumeMessageConcurrentlyService::ConsumeRequest(PullRequest* request, vector& msgs) { +void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr pullRequest, + vector& msgs) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + LOG_WARN("Pull request has been released"); + return; + } if (!request || request->isDroped()) { LOG_WARN("the pull result is NULL or Had been dropped"); request->clearAllMsgs(); // add clear operation to avoid bad state when @@ -72,7 +89,7 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(PullRequest* request, vec return; } - //m_messageQueue).toString().c_str()); return; @@ -144,4 +161,4 @@ void ConsumeMessageConcurrentlyService::resetRetryTopic(vector& ms } //getMessageListenerType(); } -void ConsumeMessageOrderlyService::submitConsumeRequest(PullRequest* request, vector& msgs) { +void ConsumeMessageOrderlyService::submitConsumeRequest(boost::weak_ptr pullRequest, + vector& msgs) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + LOG_WARN("Pull request has been released"); + return; + } m_ioService.post(boost::bind(&ConsumeMessageOrderlyService::ConsumeRequest, this, request)); } void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* context, - PullRequest* request, + boost::weak_ptr pullRequest, bool tryLockMQ, boost::asio::deadline_timer* t) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + LOG_WARN("Pull request has been released"); + return; + } LOG_INFO("submit consumeRequest later for mq:%s", request->m_messageQueue.toString().c_str()); vector msgs; ConsumeMessageOrderlyService* orderlyService = (ConsumeMessageOrderlyService*)context; @@ -122,7 +133,12 @@ void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* contex deleteAndZero(t); } -void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) { +void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr pullRequest) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + LOG_WARN("Pull request has been released"); + return; + } bool bGetMutex = false; boost::unique_lock lock(request->getPullRequestCriticalSection(), boost::try_to_lock); if (!lock.owns_lock()) { @@ -189,11 +205,16 @@ void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) { } } } -void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(PullRequest* request, bool tryLockMQ) { +void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(boost::weak_ptr pullRequest, bool tryLockMQ) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + LOG_WARN("Pull request has been released"); + return; + } int retryTimer = tryLockMQ ? 500 : 100; boost::asio::deadline_timer* t = new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(retryTimer)); t->async_wait( boost::bind(&(ConsumeMessageOrderlyService::static_submitConsumeRequestLater), this, request, tryLockMQ, t)); } -} +} // namespace rocketmq diff --git a/src/consumer/ConsumeMsgService.h b/src/consumer/ConsumeMsgService.h index 2bb79794f..a5d3ce7c0 100644 --- a/src/consumer/ConsumeMsgService.h +++ b/src/consumer/ConsumeMsgService.h @@ -38,7 +38,7 @@ class ConsumeMsgService { virtual void start() {} virtual void shutdown() {} virtual void stopThreadPool() {} - virtual void submitConsumeRequest(PullRequest* request, vector& msgs) {} + virtual void submitConsumeRequest(boost::weak_ptr request, vector& msgs) {} virtual MessageListenerType getConsumeMsgSerivceListenerType() { return messageListenerDefaultly; } }; @@ -48,11 +48,11 @@ class ConsumeMessageConcurrentlyService : public ConsumeMsgService { virtual ~ConsumeMessageConcurrentlyService(); virtual void start(); virtual void shutdown(); - virtual void submitConsumeRequest(PullRequest* request, vector& msgs); + virtual void submitConsumeRequest(boost::weak_ptr request, vector& msgs); virtual MessageListenerType getConsumeMsgSerivceListenerType(); virtual void stopThreadPool(); - void ConsumeRequest(PullRequest* request, vector& msgs); + void ConsumeRequest(boost::weak_ptr request, vector& msgs); private: void resetRetryTopic(vector& msgs); @@ -71,17 +71,17 @@ class ConsumeMessageOrderlyService : public ConsumeMsgService { virtual ~ConsumeMessageOrderlyService(); virtual void start(); virtual void shutdown(); - virtual void submitConsumeRequest(PullRequest* request, vector& msgs); + virtual void submitConsumeRequest(boost::weak_ptr request, vector& msgs); virtual void stopThreadPool(); virtual MessageListenerType getConsumeMsgSerivceListenerType(); void boost_asio_work(); - void tryLockLaterAndReconsume(PullRequest* request, bool tryLockMQ); + void tryLockLaterAndReconsume(boost::weak_ptr request, bool tryLockMQ); static void static_submitConsumeRequestLater(void* context, - PullRequest* request, + boost::weak_ptr request, bool tryLockMQ, boost::asio::deadline_timer* t); - void ConsumeRequest(PullRequest* request); + void ConsumeRequest(boost::weak_ptr request); void lockMQPeriodically(boost::system::error_code& ec, boost::asio::deadline_timer* t); void unlockAllMQ(); bool lockOneMQ(const MQMessageQueue& mq); @@ -99,6 +99,6 @@ class ConsumeMessageOrderlyService : public ConsumeMsgService { }; //& result) { } } -void DefaultMQPullConsumer::producePullMsgTask(PullRequest*) {} +bool DefaultMQPullConsumer::producePullMsgTask(boost::weak_ptr pullRequest) { + return true; +} Rebalance* DefaultMQPullConsumer::getRebalance() const { return NULL; } // request) : m_callbackOwner(pushConsumer), m_pullRequest(request), m_bShutdown(false) {} - virtual ~AsyncPullCallback() { - m_callbackOwner = NULL; - m_pullRequest = NULL; - } + virtual ~AsyncPullCallback() { m_callbackOwner = NULL; } virtual void onSuccess(MQMessageQueue& mq, PullResult& result, bool bProducePullRequest) { + boost::shared_ptr pullRequest = m_pullRequest.lock(); + if (!pullRequest) { + LOG_WARN("Pull request for[%s] has been released", mq.toString().c_str()); + return; + } if (m_bShutdown == true) { - LOG_INFO("pullrequest for:%s in shutdown, return", (m_pullRequest->m_messageQueue).toString().c_str()); - m_pullRequest->removePullMsgEvent(); + LOG_INFO("pullrequest for:%s in shutdown, return", (pullRequest->m_messageQueue).toString().c_str()); + pullRequest->removePullMsgEvent(); return; } switch (result.pullStatus) { case FOUND: { - if (!m_pullRequest->isDroped()) // if request is setted to dropped, - // don't add msgFoundList to - // m_msgTreeMap and don't call - // producePullMsgTask - { // avoid issue: pullMsg is sent out, rebalance is doing concurrently - // and this request is dropped, and then received pulled msgs. - m_pullRequest->setNextOffset(result.nextBeginOffset); - m_pullRequest->putMessage(result.msgFoundList); + if (!pullRequest->isDroped()) // if request is setted to dropped, + // don't add msgFoundList to + // m_msgTreeMap and don't call + // producePullMsgTask + { // avoid issue: pullMsg is sent out, rebalance is doing concurrently + // and this request is dropped, and then received pulled msgs. + pullRequest->setNextOffset(result.nextBeginOffset); + pullRequest->putMessage(result.msgFoundList); - m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(m_pullRequest, result.msgFoundList); + m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(pullRequest, result.msgFoundList); if (bProducePullRequest) - m_callbackOwner->producePullMsgTask(m_pullRequest); + m_callbackOwner->producePullMsgTask(pullRequest); else - m_pullRequest->removePullMsgEvent(); + pullRequest->removePullMsgEvent(); LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld", - (m_pullRequest->m_messageQueue).toString().c_str(), result.msgFoundList.size(), + (pullRequest->m_messageQueue).toString().c_str(), result.msgFoundList.size(), result.nextBeginOffset); } else { - LOG_INFO("remove pullmsg event of mq:%s", (m_pullRequest->m_messageQueue).toString().c_str()); - m_pullRequest->removePullMsgEvent(); + LOG_INFO("remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str()); + pullRequest->removePullMsgEvent(); } break; } case NO_NEW_MSG: { - m_pullRequest->setNextOffset(result.nextBeginOffset); + pullRequest->setNextOffset(result.nextBeginOffset); vector msgs; - m_pullRequest->getMessage(msgs); + pullRequest->getMessage(msgs); if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) { /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset is kept, then consumer will enter following situation: @@ -98,12 +100,12 @@ class AsyncPullCallback : public PullCallback { LOG_INFO("maybe misMatch between broker and client happens, update consumerOffset to nextBeginOffset indicated by broker");*/ - m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue, result.nextBeginOffset); + m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset); } if (bProducePullRequest) - m_callbackOwner->producePullMsgTask(m_pullRequest); + m_callbackOwner->producePullMsgTask(pullRequest); else - m_pullRequest->removePullMsgEvent(); + pullRequest->removePullMsgEvent(); /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld", (m_pullRequest->m_messageQueue).toString().c_str(), @@ -111,10 +113,10 @@ class AsyncPullCallback : public PullCallback { break; } case NO_MATCHED_MSG: { - m_pullRequest->setNextOffset(result.nextBeginOffset); + pullRequest->setNextOffset(result.nextBeginOffset); vector msgs; - m_pullRequest->getMessage(msgs); + pullRequest->getMessage(msgs); if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) { /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset is kept, then consumer will enter following situation: @@ -129,23 +131,23 @@ class AsyncPullCallback : public PullCallback { LOG_INFO("maybe misMatch between broker and client happens, update consumerOffset to nextBeginOffset indicated by broker");*/ - m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue, result.nextBeginOffset); + m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset); } if (bProducePullRequest) - m_callbackOwner->producePullMsgTask(m_pullRequest); + m_callbackOwner->producePullMsgTask(pullRequest); else - m_pullRequest->removePullMsgEvent(); + pullRequest->removePullMsgEvent(); /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", (m_pullRequest->m_messageQueue).toString().c_str(), result.nextBeginOffset);*/ break; } case OFFSET_ILLEGAL: { - m_pullRequest->setNextOffset(result.nextBeginOffset); + pullRequest->setNextOffset(result.nextBeginOffset); if (bProducePullRequest) - m_callbackOwner->producePullMsgTask(m_pullRequest); + m_callbackOwner->producePullMsgTask(pullRequest); else - m_pullRequest->removePullMsgEvent(); + pullRequest->removePullMsgEvent(); /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", (m_pullRequest->m_messageQueue).toString().c_str(), @@ -156,31 +158,40 @@ class AsyncPullCallback : public PullCallback { // will not returns this status, so this case // could not be entered. LOG_ERROR("impossible BROKER_TIMEOUT Occurs"); - m_pullRequest->setNextOffset(result.nextBeginOffset); + pullRequest->setNextOffset(result.nextBeginOffset); if (bProducePullRequest) - m_callbackOwner->producePullMsgTask(m_pullRequest); + m_callbackOwner->producePullMsgTask(pullRequest); else - m_pullRequest->removePullMsgEvent(); + pullRequest->removePullMsgEvent(); break; } } } virtual void onException(MQException& e) { + boost::shared_ptr pullRequest = m_pullRequest.lock(); + if (!pullRequest) { + LOG_WARN("Pull request for has been released"); + return; + } + std::string queueName = pullRequest->m_messageQueue.toString(); if (m_bShutdown == true) { - LOG_INFO("pullrequest for:%s in shutdown, return", (m_pullRequest->m_messageQueue).toString().c_str()); - m_pullRequest->removePullMsgEvent(); + LOG_INFO("pullrequest for:%s in shutdown, return", (pullRequest->m_messageQueue).toString().c_str()); + pullRequest->removePullMsgEvent(); return; } - LOG_WARN("pullrequest for:%s occurs exception, reproduce it", (m_pullRequest->m_messageQueue).toString().c_str()); - m_callbackOwner->producePullMsgTask(m_pullRequest); + LOG_WARN("pullrequest for:%s occurs exception, reproduce it", (pullRequest->m_messageQueue).toString().c_str()); + m_callbackOwner->producePullMsgTask(pullRequest); } void setShutdownStatus() { m_bShutdown = true; } + const boost::weak_ptr& getPullRequest() const { return m_pullRequest; } + + void setPullRequest(boost::weak_ptr& pullRequest) { m_pullRequest = pullRequest; } private: DefaultMQPushConsumer* m_callbackOwner; - PullRequest* m_pullRequest; + boost::weak_ptr m_pullRequest; bool m_bShutdown; }; @@ -524,14 +535,23 @@ void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue& mq) { m_pOffsetStore->removeOffset(mq); } -void DefaultMQPushConsumer::triggerNextPullRequest(boost::asio::deadline_timer* t, PullRequest* request) { +void DefaultMQPushConsumer::triggerNextPullRequest(boost::asio::deadline_timer* t, + boost::weak_ptr pullRequest) { // LOG_INFO("trigger pullrequest for:%s", // (request->m_messageQueue).toString().c_str()); + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + return; + } producePullMsgTask(request); deleteAndZero(t); } -void DefaultMQPushConsumer::producePullMsgTask(PullRequest* request) { +bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr pullRequest) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + return false; + } if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) { request->addPullMsgEvent(); if (m_asyncPull) { @@ -542,13 +562,18 @@ void DefaultMQPushConsumer::producePullMsgTask(PullRequest* request) { } else { LOG_WARN("produce pullmsg of mq:%s failed", request->m_messageQueue.toString().c_str()); } + return true; } void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue* pTaskQueue) { pTaskQueue->run(); } -void DefaultMQPushConsumer::pullMessage(PullRequest* request) { +void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + return; + } if (request == NULL) { LOG_ERROR("Pull request is NULL, return"); return; @@ -699,7 +724,12 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* request) { } } -AsyncPullCallback* DefaultMQPushConsumer::getAsyncPullCallBack(PullRequest* request, MQMessageQueue msgQueue) { +AsyncPullCallback* DefaultMQPushConsumer::getAsyncPullCallBack(boost::weak_ptr pullRequest, + MQMessageQueue msgQueue) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + return NULL; + } boost::lock_guard lock(m_asyncCallbackLock); if (m_asyncPull && request) { PullMAP::iterator it = m_PullCallback.find(msgQueue); @@ -707,7 +737,11 @@ AsyncPullCallback* DefaultMQPushConsumer::getAsyncPullCallBack(PullRequest* requ LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str()); m_PullCallback[msgQueue] = new AsyncPullCallback(this, request); } - return m_PullCallback[msgQueue]; + AsyncPullCallback* asyncPullCallback = m_PullCallback[msgQueue]; + if (asyncPullCallback && asyncPullCallback->getPullRequest().expired()) { + asyncPullCallback->setPullRequest(pullRequest); + } + return asyncPullCallback; } return NULL; @@ -727,7 +761,11 @@ void DefaultMQPushConsumer::shutdownAsyncPullCallBack() { } } -void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) { +void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr pullRequest) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + return; + } if (request == NULL) { LOG_ERROR("Pull request is NULL, return"); return; @@ -868,7 +906,7 @@ ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() { getSubscriptions(result); info->setSubscriptionSet(result); - std::map requestTable = m_pRebalance->getPullRequestTable(); + std::map> requestTable = m_pRebalance->getPullRequestTable(); for (const auto& it : requestTable) { if (!it.second->isDroped()) { @@ -892,4 +930,4 @@ ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() { } // m_bPullMsgEventInprogress; }; //::iterator it = m_subscriptionData.begin(); - for (; it != m_subscriptionData.end(); ++it) + for (; it != m_subscriptionData.end(); ++it) { deleteAndZero(it->second); + } m_subscriptionData.clear(); } { - MQ2PULLREQ::iterator it = m_requestQueueTable.begin(); - for (; it != m_requestQueueTable.end(); ++it) { - delete it->second; - it->second = NULL; - } - m_requestQueueTable.clear(); + /* + MQ2PULLREQ::iterator it = m_requestQueueTable.begin(); + for (; it != m_requestQueueTable.end(); ++it) { + delete it->second; + it->second = NULL; + } + m_requestQueueTable.clear();*/ } m_topicSubscribeInfoTable.clear(); m_pConsumer = NULL; @@ -56,15 +58,21 @@ void Rebalance::doRebalance() { map::iterator it = m_subscriptionData.begin(); for (; it != m_subscriptionData.end(); ++it) { string topic = (it->first); - LOG_INFO("current topic is:%s", topic.c_str()); + LOG_DEBUG("current topic is:%s", topic.c_str()); // mqs vector mqAll; if (!getTopicSubscribeInfo(topic, mqAll)) { continue; } if (mqAll.empty()) { - if (!UtilAll::startsWith_retry(topic)) - THROW_MQEXCEPTION(MQClientException, "doRebalance the topic is empty", -1); + if (!UtilAll::startsWith_retry(topic)) { + std::string msg("#doRebalance. mqAll for topic:"); + msg.append(topic); + msg.append(" is empty"); + LOG_ERROR("Queues to allocate are empty. Msg: %s", msg.c_str()); + // to check, return error or throw exception + THROW_MQEXCEPTION(MQClientException, msg, -1); + } } //getSessionCredentials()); if (cidAll.empty()) { - /*remove the droping pullRequest changes for recovery consume fastly - from network broken - //drop all pullRequest - MQ2PULLREQ::iterator it = m_requestQueueTable.begin(); - for (; it != m_requestQueueTable.end(); ++it) - { - if(!(it->second->isDroped())) - { - MQMessageQueue mqtemp = it->first; - it->second->setDroped(true); - removeUnnecessaryMessageQueue(mqtemp); - it->second->clearAllMsgs();//add clear operation to - avoid bad - state when dropped pullRequest returns normal - LOG_INFO("find consumer failed, drop undropped mq:%s", - mqtemp.toString().c_str()); - } - }*/ - + LOG_ERROR("[ERROR] Get empty consumer IDs. Consumer Group: %s, Topic: %s", + m_pConsumer->getGroupName().c_str(), topic.c_str()); + // Should skip this round of re-balance immediately if consumer ID set is empty. THROW_MQEXCEPTION(MQClientException, "doRebalance the cidAll is empty", -1); } // log for (int i = 0; i < (int)cidAll.size(); ++i) { - LOG_INFO("client id:%s of topic:%s", cidAll[i].c_str(), topic.c_str()); + LOG_DEBUG("client id:%s of topic:%s", cidAll[i].c_str(), topic.c_str()); } //allocate(m_pConsumer->getMQClientId(), mqAll, cidAll, allocateResult); } catch (MQException& e) { - THROW_MQEXCEPTION(MQClientException, "allocate error", -1); + std::string errMsg("Allocate message queue for ConsumerGroup["); + errMsg.append(m_pConsumer->getGroupName()); + errMsg.append("],Topic["); + errMsg.append(topic); + errMsg.append("] failed. "); + LOG_ERROR(errMsg.c_str()); + THROW_MQEXCEPTION(MQClientException, errMsg, -1); } // log for (int i = 0; i < (int)allocateResult.size(); ++i) { - LOG_INFO("allocate mq:%s", allocateResult[i].toString().c_str()); + LOG_DEBUG("allocate mq:%s", allocateResult[i].toString().c_str()); } //getGroupName() << ", Topic: " << topic + << ", Current Consumer ID: " << m_pConsumer->getMQClientId() << "] is changed.\n " + << "Total Queue #: " << mqAll.size() << ", Total Consumer #: " << cidAll.size() + << " Allocated Queues are: \n"; + + for (vector::size_type i = 0; i < allocateResult.size(); ++i) { + ss << allocateResult[i].toString() << "\n"; + } + // Log allocation result. + LOG_INFO(ss.str().c_str()); messageQueueChanged(topic, mqAll, allocateResult); break; } @@ -173,7 +182,7 @@ void Rebalance::persistConsumerOffsetByResetOffset() { MQ2PULLREQ::iterator it = m_requestQueueTable.begin(); for (; it != m_requestQueueTable.end(); ++it) { if (it->second) { // even if it was dropped, also need update offset when - // rcv resetOffset cmd + // rcv resetOffset cmd mqs.push_back(it->first); } } @@ -225,25 +234,38 @@ bool Rebalance::getTopicSubscribeInfo(const string& topic, vector pPullRequest) { boost::lock_guard lock(m_requestTableMutex); m_requestQueueTable[mq] = pPullRequest; } -PullRequest* Rebalance::getPullRequest(MQMessageQueue mq) { +void Rebalance::removePullRequest(MQMessageQueue mq) { + boost::lock_guard lock(m_requestTableMutex); + if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) { + m_requestQueueTable.erase(mq); + } +} +bool Rebalance::isPullRequestExist(MQMessageQueue mq) { + boost::lock_guard lock(m_requestTableMutex); + if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) { + return true; + } + return false; +} +boost::weak_ptr Rebalance::getPullRequest(MQMessageQueue mq) { boost::lock_guard lock(m_requestTableMutex); if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) { return m_requestQueueTable[mq]; } - return NULL; + return boost::weak_ptr(); } -map Rebalance::getPullRequestTable() { +map> Rebalance::getPullRequestTable() { boost::lock_guard lock(m_requestTableMutex); return m_requestQueueTable; } -void Rebalance::unlockAll(bool oneway) { +void Rebalance::unlockAll(bool oneWay) { map*> brokerMqs; MQ2PULLREQ requestQueueTable = getPullRequestTable(); for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) { @@ -274,10 +296,10 @@ void Rebalance::unlockAll(bool oneway) { m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr, unlockBatchRequest.get(), 1000, m_pConsumer->getSessionCredentials()); for (unsigned int i = 0; i != mqs.size(); ++i) { - PullRequest* pullreq = getPullRequest(mqs[i]); - if (pullreq) { + boost::weak_ptr pullreq = getPullRequest(mqs[i]); + if (!pullreq.expired()) { LOG_INFO("unlockBatchMQ success of mq:%s", mqs[i].toString().c_str()); - pullreq->setLocked(false); + pullreq.lock()->setLocked(false); } else { LOG_ERROR("unlockBatchMQ fails of mq:%s", mqs[i].toString().c_str()); } @@ -308,10 +330,10 @@ void Rebalance::unlock(MQMessageQueue mq) { m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr, unlockBatchRequest.get(), 1000, m_pConsumer->getSessionCredentials()); for (unsigned int i = 0; i != mqs.size(); ++i) { - PullRequest* pullreq = getPullRequest(mqs[i]); - if (pullreq) { + boost::weak_ptr pullreq = getPullRequest(mqs[i]); + if (!pullreq.expired()) { LOG_INFO("unlock success of mq:%s", mqs[i].toString().c_str()); - pullreq->setLocked(false); + pullreq.lock()->setLocked(false); } else { LOG_ERROR("unlock fails of mq:%s", mqs[i].toString().c_str()); } @@ -355,11 +377,11 @@ void Rebalance::lockAll() { m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr, lockBatchRequest.get(), messageQueues, 1000, m_pConsumer->getSessionCredentials()); for (unsigned int i = 0; i != messageQueues.size(); ++i) { - PullRequest* pullreq = getPullRequest(messageQueues[i]); - if (pullreq) { + boost::weak_ptr pullreq = getPullRequest(messageQueues[i]); + if (!pullreq.expired()) { LOG_INFO("lockBatchMQ success of mq:%s", messageQueues[i].toString().c_str()); - pullreq->setLocked(true); - pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis()); + pullreq.lock()->setLocked(true); + pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis()); } else { LOG_ERROR("lockBatchMQ fails of mq:%s", messageQueues[i].toString().c_str()); } @@ -372,6 +394,7 @@ void Rebalance::lockAll() { } brokerMqs.clear(); } + bool Rebalance::lock(MQMessageQueue mq) { unique_ptr pFindBrokerResult( m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MASTER_ID, true)); @@ -397,11 +420,11 @@ bool Rebalance::lock(MQMessageQueue mq) { return false; } for (unsigned int i = 0; i != messageQueues.size(); ++i) { - PullRequest* pullreq = getPullRequest(messageQueues[i]); - if (pullreq) { + boost::weak_ptr pullreq = getPullRequest(messageQueues[i]); + if (!pullreq.expired()) { LOG_INFO("lock success of mq:%s", messageQueues[i].toString().c_str()); - pullreq->setLocked(true); - pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis()); + pullreq.lock()->setLocked(true); + pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis()); lockResult = true; } else { LOG_ERROR("lock fails of mq:%s", messageQueues[i].toString().c_str()); @@ -437,7 +460,23 @@ RebalancePush::RebalancePush(MQConsumer* consumer, MQClientFactory* pfactory) : bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector& mqsSelf) { LOG_DEBUG("updateRequestTableInRebalance Enter"); + + // 1. Clear no in charge of + // 1. set dropped + // 2. clear local message + // 3. clear offset + // 4. remove request table + // 5. set flag for route changed + // 2. Check and clear dropped/invalid pullrequest(timeout and so on) + // 3. Add new mq in charge of + // 1. new pullrequest + // 2. init next pull offset + // 3. int offset + // 4. add request table + // 5. set flag for route changed + // 4. Start long pull for request if (mqsSelf.empty()) { + // to disscuss, shall we do rebalance cotinue? LOG_WARN("allocated queue is empty for topic:%s", topic.c_str()); } @@ -445,95 +484,64 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vectorfirst; + MQ2PULLREQ::iterator itDel = requestQueueTable.begin(); + for (; itDel != requestQueueTable.end(); ++itDel) { + MQMessageQueue mqtemp = itDel->first; if (mqtemp.getTopic().compare(topic) == 0) { - if (mqsSelf.empty() || (find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) { - if (!(it->second->isDroped())) { - it->second->setDroped(true); - // delete the lastest pull request for this mq, which hasn't been response - // m_pClientFactory->removeDropedPullRequestOpaque(it->second); - removeUnnecessaryMessageQueue(mqtemp); - it->second->clearAllMsgs(); // add clear operation to avoid bad state - // when dropped pullRequest returns - // normal - LOG_INFO("drop mq:%s", mqtemp.toString().c_str()); - } + if (mqsSelf.empty() || (std::find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) { + // if not response , set to dropped + itDel->second->setDroped(true); + // remove offset table to avoid offset backup + removeUnnecessaryMessageQueue(mqtemp); + itDel->second->clearAllMsgs(); + LOG_INFO("drop mq:%s", mqtemp.toString().c_str()); + removePullRequest(mqtemp); changed = true; } + if (itDel->second->isLockExpired()) { + // if pull expired , set to dropped + itDel->second->setDroped(true); + removeUnnecessaryMessageQueue(mqtemp); + itDel->second->clearAllMsgs(); + removePullRequest(mqtemp); + LOG_INFO("drop mq:%s", mqtemp.toString().c_str()); + } } } - // pullrequestAdd; - DefaultMQPushConsumer* pConsumer = static_cast(m_pConsumer); - vector::iterator it2 = mqsSelf.begin(); - for (; it2 != mqsSelf.end(); ++it2) { - PullRequest* pPullRequest(getPullRequest(*it2)); - if (pPullRequest && pPullRequest->isDroped()) { - LOG_DEBUG( - "before resume the pull handle of this pullRequest, its mq is:%s, " - "its offset is:%lld", - (it2->toString()).c_str(), pPullRequest->getNextOffset()); - pConsumer->getOffsetStore()->removeOffset(*it2); // remove dirty offset which maybe update to - // OffsetStore::m_offsetTable by consuming After last - // drop - int64 nextOffset = computePullFromWhere(*it2); - if (nextOffset >= 0) { - /* - Fix issue with following scenario: - 1. pullRequest was dropped - 2. the pullMsgEvent was not executed by taskQueue, so the PullMsgEvent - was not stop - 3. pullReuest was resumed by next doRebalance, then mulitple - pullMsgEvent were produced for pullRequest - */ - bool bPullMsgEvent = pPullRequest->addPullMsgEvent(); - while (!bPullMsgEvent) { - boost::this_thread::sleep_for(boost::chrono::milliseconds(50)); - LOG_INFO("pullRequest with mq :%s has unfinished pullMsgEvent", (it2->toString()).c_str()); - bPullMsgEvent = pPullRequest->addPullMsgEvent(); - } - pPullRequest->setDroped(false); - pPullRequest->clearAllMsgs(); // avoid consume accumulation and consume - // dumplication issues - pPullRequest->setNextOffset(nextOffset); - pPullRequest->updateQueueMaxOffset(nextOffset); - LOG_INFO( - "after resume the pull handle of this pullRequest, its mq is:%s, " - "its offset is:%lld", - (it2->toString()).c_str(), pPullRequest->getNextOffset()); - changed = true; - pConsumer->producePullMsgTask(pPullRequest); - } else { - LOG_ERROR("get fatel error QueryOffset of mq:%s, do not reconsume this queue", (it2->toString()).c_str()); - } + // if mqSelf == null, it is better to break; + //> pullRequestsToAdd; + vector::iterator itAdd = mqsSelf.begin(); + for (; itAdd != mqsSelf.end(); ++itAdd) { + // PullRequest* pPullRequest(getPullRequest(*it2)); + // boost::weak_ptr pPullRequest = getPullRequest(*itAdd); + // if exist in table, go to next + if (isPullRequestExist(*itAdd)) { + continue; } - - if (!pPullRequest) { - LOG_INFO("updateRequestTableInRebalance Doesn't find old mq"); - PullRequest* pullRequest = new PullRequest(m_pConsumer->getGroupName()); - pullRequest->m_messageQueue = *it2; - - int64 nextOffset = computePullFromWhere(*it2); - if (nextOffset >= 0) { - pullRequest->setNextOffset(nextOffset); - pullRequest->clearAllMsgs(); // avoid consume accumulation and consume - // dumplication issues - changed = true; - // pq; - addPullRequest(*it2, pullRequest); - pullrequestAdd.push_back(pullRequest); - LOG_INFO("add mq:%s, request initiall offset:%lld", (*it2).toString().c_str(), nextOffset); - } + boost::shared_ptr pullRequest = boost::make_shared(m_pConsumer->getGroupName()); + pullRequest->m_messageQueue = *itAdd; + int64 nextOffset = computePullFromWhere(*itAdd); + if (nextOffset >= 0) { + pullRequest->setNextOffset(nextOffset); + changed = true; + addPullRequest(*itAdd, pullRequest); + pullRequestsToAdd.push_back(pullRequest); + LOG_INFO("add mq:%s, request initial offset:%ld", (*itAdd).toString().c_str(), nextOffset); + } else { + LOG_WARN( + "Failed to add pull request for %s due to failure of querying consume offset, request initial offset:%ld", + (*itAdd).toString().c_str(), nextOffset); } } - vector::iterator it3 = pullrequestAdd.begin(); - for (; it3 != pullrequestAdd.end(); ++it3) { - LOG_DEBUG("start pull request"); - pConsumer->producePullMsgTask(*it3); + for (vector>::iterator itAdded = pullRequestsToAdd.begin(); + itAdded != pullRequestsToAdd.end(); ++itAdded) { + LOG_DEBUG("start to pull %s", (*itAdded)->m_messageQueue.toString().c_str()); + if (!m_pConsumer->producePullMsgTask(*itAdded)) { + LOG_WARN("Failed to producer pull message task for %s", (*itAdded)->m_messageQueue.toString().c_str()); + } } LOG_DEBUG("updateRequestTableInRebalance exit"); @@ -542,7 +550,11 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector(m_pConsumer); + DefaultMQPushConsumer* pConsumer = dynamic_cast(m_pConsumer); + if (!pConsumer) { + LOG_ERROR("Cast consumer pointer to DefaultMQPushConsumer pointer failed"); + return result; + } ConsumeFromWhere consumeFromWhere = pConsumer->getConsumeFromWhere(); OffsetStore* pOffsetStore = pConsumer->getOffsetStore(); switch (consumeFromWhere) { @@ -623,7 +635,12 @@ void RebalancePush::messageQueueChanged(const string& topic, vector& mqDivided) {} void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) { - DefaultMQPushConsumer* pConsumer = static_cast(m_pConsumer); + // DefaultMQPushConsumer *pConsumer = static_cast(m_pConsumer); + DefaultMQPushConsumer* pConsumer = dynamic_cast(m_pConsumer); + if (!pConsumer) { + LOG_ERROR("Cast MQConsumer* to DefaultMQPushConsumer* failed"); + return; + } OffsetStore* pOffsetStore = pConsumer->getOffsetStore(); pOffsetStore->persist(mq, m_pConsumer->getSessionCredentials()); diff --git a/src/consumer/Rebalance.h b/src/consumer/Rebalance.h index 8d8a9ae5f..fe3956917 100644 --- a/src/consumer/Rebalance.h +++ b/src/consumer/Rebalance.h @@ -24,14 +24,17 @@ #include "PullRequest.h" #include "SubscriptionData.h" +#include #include namespace rocketmq { class MQClientFactory; + //& getSubscriptionInner(); //& mqs); + bool getTopicSubscribeInfo(const string& topic, vector& mqs); - void addPullRequest(MQMessageQueue mq, PullRequest* pPullRequest); - PullRequest* getPullRequest(MQMessageQueue mq); - map getPullRequestTable(); + void addPullRequest(MQMessageQueue mq, boost::shared_ptr pPullRequest); + void removePullRequest(MQMessageQueue mq); + bool isPullRequestExist(MQMessageQueue mq); + boost::weak_ptr getPullRequest(MQMessageQueue mq); + + map> getPullRequestTable(); + void lockAll(); + bool lock(MQMessageQueue mq); - void unlockAll(bool oneway = false); + + void unlockAll(bool oneWay = false); + void unlock(MQMessageQueue mq); protected: @@ -71,7 +86,7 @@ class Rebalance { boost::mutex m_topicSubscribeInfoTableMutex; map> m_topicSubscribeInfoTable; - typedef map MQ2PULLREQ; + typedef map> MQ2PULLREQ; MQ2PULLREQ m_requestQueueTable; boost::mutex m_requestTableMutex; @@ -84,6 +99,7 @@ class Rebalance { class RebalancePull : public Rebalance { public: RebalancePull(MQConsumer*, MQClientFactory*); + virtual ~RebalancePull(){}; virtual void messageQueueChanged(const string& topic, @@ -101,6 +117,7 @@ class RebalancePull : public Rebalance { class RebalancePush : public Rebalance { public: RebalancePush(MQConsumer*, MQClientFactory*); + virtual ~RebalancePush(){}; virtual void messageQueueChanged(const string& topic, @@ -115,6 +132,6 @@ class RebalancePush : public Rebalance { }; // #include #include -#include "UtilAll.h" #include "Logging.h" +#include "UtilAll.h" namespace rocketmq { // m_codeSet; }; // Date: Mon, 9 Dec 2019 18:50:00 +0800 Subject: [PATCH 2/5] refactor(rebalance): use smart_ptr to manage pull request --- include/DefaultMQPushConsumer.h | 2 + src/MQClientFactory.cpp | 4 +- .../ConsumeMessageConcurrentlyService.cpp | 11 +- src/consumer/ConsumeMessageOrderlyService.cpp | 2 +- src/consumer/DefaultMQPushConsumer.cpp | 368 +++++++++++------- src/consumer/PullRequest.cpp | 67 +++- src/consumer/PullRequest.h | 14 +- src/consumer/Rebalance.cpp | 31 +- 8 files changed, 311 insertions(+), 188 deletions(-) diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h index 5c5f4cad7..aebb7d4ab 100644 --- a/include/DefaultMQPushConsumer.h +++ b/include/DefaultMQPushConsumer.h @@ -87,6 +87,8 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer { ConsumeMsgService* getConsumerMsgService() const; virtual bool producePullMsgTask(boost::weak_ptr); + virtual bool producePullMsgTaskLater(boost::weak_ptr, int millis); + static void static_triggerNextPullRequest(void* context, boost::asio::deadline_timer* t, boost::weak_ptr); void triggerNextPullRequest(boost::asio::deadline_timer* t, boost::weak_ptr); void runPullMsgQueue(TaskQueue* pTaskQueue); void pullMessage(boost::weak_ptr pullrequest); // sync pullMsg diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index 1c00f8039..b26cb0a7e 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -1044,8 +1044,8 @@ void MQClientFactory::resetOffset(const string& group, boost::shared_ptr pullreq = pullRequest.lock(); // PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq); if (pullreq) { - pullreq->setDroped(true); - LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data()); + pullreq->setDropped(true); + LOG_INFO("resetOffset setDropped for mq:%s", mq.toString().data()); pullreq->clearAllMsgs(); pullreq->updateQueueMaxOffset(it->second); } else { diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp index fcca3f264..eb0575f75 100644 --- a/src/consumer/ConsumeMessageConcurrentlyService.cpp +++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp @@ -67,7 +67,7 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptrisDroped()) { + if (request->isDropped()) { LOG_INFO("Pull request for %s is dropped, which will be released in next re-balance.", request->m_messageQueue.toString().c_str()); return; @@ -82,14 +82,13 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptrisDroped()) { - LOG_WARN("the pull result is NULL or Had been dropped"); + if (!request || request->isDropped()) { + LOG_WARN("the pull request had been dropped"); request->clearAllMsgs(); // add clear operation to avoid bad state when // dropped pullRequest returns normal return; } - //m_messageQueue).toString().c_str()); return; @@ -140,12 +139,10 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptrremoveMessage(msgs); - // LOG_DEBUG("update offset:%lld of mq: %s", - // offset,(request->m_messageQueue).toString().c_str()); if (offset >= 0) { m_pConsumer->updateConsumeOffset(request->m_messageQueue, offset); } else { - LOG_WARN("Note: accumulation consume occurs on mq:%s", (request->m_messageQueue).toString().c_str()); + LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated before. skip..", (request->m_messageQueue).toString().c_str()); } } diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp b/src/consumer/ConsumeMessageOrderlyService.cpp index 1755a1396..f68fe44f1 100644 --- a/src/consumer/ConsumeMessageOrderlyService.cpp +++ b/src/consumer/ConsumeMessageOrderlyService.cpp @@ -156,7 +156,7 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr p // request->m_messageQueue.toString().c_str()); return; } - if (!request || request->isDroped()) { + if (!request || request->isDropped()) { LOG_WARN("the pull result is NULL or Had been dropped"); request->clearAllMsgs(); // add clear operation to avoid bad state when // dropped pullRequest returns normal diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp index de2d68b95..2231d833c 100644 --- a/src/consumer/DefaultMQPushConsumer.cpp +++ b/src/consumer/DefaultMQPushConsumer.cpp @@ -48,121 +48,118 @@ class AsyncPullCallback : public PullCallback { LOG_WARN("Pull request for[%s] has been released", mq.toString().c_str()); return; } + if (m_bShutdown == true) { LOG_INFO("pullrequest for:%s in shutdown, return", (pullRequest->m_messageQueue).toString().c_str()); - pullRequest->removePullMsgEvent(); return; } - + if (pullRequest->isDropped()) { + LOG_INFO("Pull request for queue[%s] has been set as dropped. Will NOT pull this queue any more", + pullRequest->m_messageQueue.toString().c_str()); + return; + } + if (!pullRequest->removePullMsgEvent()) { + LOG_WARN("Mark unflight for:%s failed, May be it is Dropped by #Rebalance.", + pullRequest->m_messageQueue.toString().c_str()); + return; + } switch (result.pullStatus) { case FOUND: { - if (!pullRequest->isDroped()) // if request is setted to dropped, - // don't add msgFoundList to - // m_msgTreeMap and don't call - // producePullMsgTask - { // avoid issue: pullMsg is sent out, rebalance is doing concurrently - // and this request is dropped, and then received pulled msgs. - pullRequest->setNextOffset(result.nextBeginOffset); - pullRequest->putMessage(result.msgFoundList); - - m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(pullRequest, result.msgFoundList); - - if (bProducePullRequest) - m_callbackOwner->producePullMsgTask(pullRequest); - else - pullRequest->removePullMsgEvent(); - - LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld", - (pullRequest->m_messageQueue).toString().c_str(), result.msgFoundList.size(), - result.nextBeginOffset); + if (pullRequest->isDropped()) { + LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str()); + break; + } + pullRequest->setNextOffset(result.nextBeginOffset); + pullRequest->putMessage(result.msgFoundList); + + m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(pullRequest, result.msgFoundList); + + if (bProducePullRequest) { + m_callbackOwner->producePullMsgTask(pullRequest); } else { - LOG_INFO("remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str()); - pullRequest->removePullMsgEvent(); + LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s", + (pullRequest->m_messageQueue).toString().c_str()); } + + LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld", + (pullRequest->m_messageQueue).toString().c_str(), result.msgFoundList.size(), result.nextBeginOffset); + break; } case NO_NEW_MSG: { + if (pullRequest->isDropped()) { + LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str()); + break; + } pullRequest->setNextOffset(result.nextBeginOffset); vector msgs; pullRequest->getMessage(msgs); if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) { - /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset - is kept, then consumer will enter following situation: - 1>. get pull offset with 0 when do rebalance, and set - m_offsetTable[mq] to 0; - 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin - offset increase by 800 - 3>. request->getMessage(msgs) always NULL - 4>. we need update consumerOffset to nextBeginOffset indicated by - broker - but if really no new msg could be pulled, also go to this CASE - - LOG_INFO("maybe misMatch between broker and client happens, update - consumerOffset to nextBeginOffset indicated by broker");*/ m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset); } - if (bProducePullRequest) + if (bProducePullRequest) { m_callbackOwner->producePullMsgTask(pullRequest); - else - pullRequest->removePullMsgEvent(); - - /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld", - (m_pullRequest->m_messageQueue).toString().c_str(), - result.nextBeginOffset);*/ + } else { + LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s", + (pullRequest->m_messageQueue).toString().c_str()); + } + LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld", pullRequest->m_messageQueue.toString().c_str(), + result.nextBeginOffset); break; } case NO_MATCHED_MSG: { + if (pullRequest->isDropped()) { + LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str()); + break; + } pullRequest->setNextOffset(result.nextBeginOffset); vector msgs; pullRequest->getMessage(msgs); if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) { - /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset - is kept, then consumer will enter following situation: - 1>. get pull offset with 0 when do rebalance, and set - m_offsetTable[mq] to 0; - 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin - offset increase by 800 - 3>. request->getMessage(msgs) always NULL - 4>. we need update consumerOffset to nextBeginOffset indicated by - broker - but if really no new msg could be pulled, also go to this CASE - - LOG_INFO("maybe misMatch between broker and client happens, update - consumerOffset to nextBeginOffset indicated by broker");*/ m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset); } - if (bProducePullRequest) + if (bProducePullRequest) { m_callbackOwner->producePullMsgTask(pullRequest); - else - pullRequest->removePullMsgEvent(); - /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", - (m_pullRequest->m_messageQueue).toString().c_str(), - result.nextBeginOffset);*/ + } else { + LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s", + (pullRequest->m_messageQueue).toString().c_str()); + } + LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", pullRequest->m_messageQueue.toString().c_str(), + result.nextBeginOffset); break; } case OFFSET_ILLEGAL: { + if (pullRequest->isDropped()) { + LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str()); + break; + } pullRequest->setNextOffset(result.nextBeginOffset); - if (bProducePullRequest) + if (bProducePullRequest) { m_callbackOwner->producePullMsgTask(pullRequest); - else - pullRequest->removePullMsgEvent(); + } else { + LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s", + (pullRequest->m_messageQueue).toString().c_str()); + } - /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", - (m_pullRequest->m_messageQueue).toString().c_str(), - result.nextBeginOffset);*/ + LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", pullRequest->m_messageQueue.toString().c_str(), + result.nextBeginOffset); break; } - case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker - // will not returns this status, so this case - // could not be entered. + case BROKER_TIMEOUT: { + if (pullRequest->isDropped()) { + LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str()); + break; + } LOG_ERROR("impossible BROKER_TIMEOUT Occurs"); pullRequest->setNextOffset(result.nextBeginOffset); - if (bProducePullRequest) + if (bProducePullRequest) { m_callbackOwner->producePullMsgTask(pullRequest); - else - pullRequest->removePullMsgEvent(); + } else { + LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s", + (pullRequest->m_messageQueue).toString().c_str()); + } break; } } @@ -171,20 +168,29 @@ class AsyncPullCallback : public PullCallback { virtual void onException(MQException& e) { boost::shared_ptr pullRequest = m_pullRequest.lock(); if (!pullRequest) { - LOG_WARN("Pull request for has been released"); + LOG_WARN("Pull request has been released."); + return; + } + if (!pullRequest->removePullMsgEvent()) { + LOG_WARN("Mark unflight for:%s failed when exception, May be it is Dropped by #Rebalance.", + pullRequest->m_messageQueue.toString().c_str()); return; } std::string queueName = pullRequest->m_messageQueue.toString(); if (m_bShutdown == true) { - LOG_INFO("pullrequest for:%s in shutdown, return", (pullRequest->m_messageQueue).toString().c_str()); - pullRequest->removePullMsgEvent(); + LOG_INFO("pullrequest for:%s in shutdown, return", queueName.c_str()); return; } - LOG_WARN("pullrequest for:%s occurs exception, reproduce it", (pullRequest->m_messageQueue).toString().c_str()); - m_callbackOwner->producePullMsgTask(pullRequest); + if (pullRequest->isDropped()) { + LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", queueName.c_str()); + return; + } + LOG_WARN("Pullrequest for:%s occurs exception, reproduce it after 1s.", queueName.c_str()); + m_callbackOwner->producePullMsgTaskLater(pullRequest, 1000); } void setShutdownStatus() { m_bShutdown = true; } + const boost::weak_ptr& getPullRequest() const { return m_pullRequest; } void setPullRequest(boost::weak_ptr& pullRequest) { m_pullRequest = pullRequest; } @@ -535,16 +541,41 @@ void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue& mq) { m_pOffsetStore->removeOffset(mq); } +void DefaultMQPushConsumer::static_triggerNextPullRequest(void* context, + boost::asio::deadline_timer* t, + boost::weak_ptr pullRequest) { + DefaultMQPushConsumer* pDefaultMQPushConsumer = (DefaultMQPushConsumer*)context; + if (pDefaultMQPushConsumer) { + pDefaultMQPushConsumer->triggerNextPullRequest(t, pullRequest); + } +} + void DefaultMQPushConsumer::triggerNextPullRequest(boost::asio::deadline_timer* t, boost::weak_ptr pullRequest) { - // LOG_INFO("trigger pullrequest for:%s", - // (request->m_messageQueue).toString().c_str()); + // delete first to avoild memleak + deleteAndZero(t); boost::shared_ptr request = pullRequest.lock(); if (!request) { return; } producePullMsgTask(request); - deleteAndZero(t); +} + +bool DefaultMQPushConsumer::producePullMsgTaskLater(boost::weak_ptr pullRequest, int millis) { + boost::shared_ptr request = pullRequest.lock(); + if (!request) { + LOG_INFO("Pull request is invalid. Maybe it is dropped before."); + return false; + } + if (request->isDropped()) { + LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", request->m_messageQueue.toString().c_str()); + return false; + } + boost::asio::deadline_timer* t = + new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(millis)); + t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest), this, t, request)); + LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(), millis); + return true; } bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr pullRequest) { @@ -552,15 +583,25 @@ bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr pull if (!request) { return false; } + if (request->isDropped()) { + LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", request->m_messageQueue.toString().c_str()); + return false; + } if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) { - request->addPullMsgEvent(); - if (m_asyncPull) { - m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessageAsync, this, request)); + if (request->addPullMsgEvent()) { + if (m_asyncPull) { + m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessageAsync, this, request)); + + } else { + m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request)); + } } else { - m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request)); + LOG_WARN("Failed to mark in-flight pull request for %s", request->m_messageQueue.toString().c_str()); + return false; } } else { - LOG_WARN("produce pullmsg of mq:%s failed", request->m_messageQueue.toString().c_str()); + LOG_WARN("produce PullRequest of mq:%s failed", request->m_messageQueue.toString().c_str()); + return false; } return true; } @@ -572,15 +613,12 @@ void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue* pTaskQueue) { void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest) { boost::shared_ptr request = pullRequest.lock(); if (!request) { + LOG_ERROR("Pull request is released, return"); return; } - if (request == NULL) { - LOG_ERROR("Pull request is NULL, return"); - return; - } - if (request->isDroped()) { + if (request->isDropped()) { LOG_WARN("Pull request is set drop with mq:%s, return", (request->m_messageQueue).toString().c_str()); - request->removePullMsgEvent(); + // request->removePullMsgEvent(); return; } @@ -588,19 +626,20 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) { if (!request->isLocked() || request->isLockExpired()) { if (!m_pRebalance->lock(messageQueue)) { - producePullMsgTask(request); + producePullMsgTaskLater(request, 1000); return; } } } if (request->getCacheMsgCount() > m_maxMsgCacheSize) { - // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger - // than:%d", (request->m_messageQueue).toString().c_str(), - // request->getCacheMsgCount(), m_maxMsgCacheSize); - boost::asio::deadline_timer* t = - new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(1 * 1000)); - t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest, this, t, request)); + LOG_INFO("Sync Pull request for [%s] has Cached with %d Messages and The Max size is %d, Sleep 1s.", + (request->m_messageQueue).toString().c_str(), request->getCacheMsgCount(), m_maxMsgCacheSize); + request->setLastPullTimestamp(UtilAll::currentTimeMillis()); + // This process is on flight, should be remove event before producer task again. + request->removePullMsgEvent(); + // Retry 1s, + producePullMsgTaskLater(request, 1000); return; } @@ -616,7 +655,10 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest string subExpression; SubscriptionData* pSdata = m_pRebalance->getSubscriptionData(messageQueue.getTopic()); if (pSdata == NULL) { - producePullMsgTask(request); + LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep 1s.", + (request->m_messageQueue).toString().c_str()); + request->removePullMsgEvent(); + producePullMsgTaskLater(request, 1000); return; } subExpression = pSdata->getSubString(); @@ -625,7 +667,10 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest false, // suspend !subExpression.empty(), // subscription false); // class filter - + if (request->isDropped()) { + LOG_WARN("Pull request is set as dropped with mq:%s, return", (request->m_messageQueue).toString().c_str()); + return; + } try { request->setLastPullTimestamp(UtilAll::currentTimeMillis()); unique_ptr result(m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1 @@ -641,45 +686,39 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest NULL, getSessionCredentials())); PullResult pullResult = m_pPullAPIWrapper->processPullResult(messageQueue, result.get(), pSdata); - + if (!request->removePullMsgEvent()) { + LOG_WARN("Failed to swap pullMsgEvent[%s] flag", messageQueue.toString().c_str()); + return; + } switch (pullResult.pullStatus) { case FOUND: { - if (!request->isDroped()) // if request is setted to dropped, don't add - // msgFoundList to m_msgTreeMap and don't - // call producePullMsgTask - { // avoid issue: pullMsg is sent out, rebalance is doing concurrently - // and this request is dropped, and then received pulled msgs. - request->setNextOffset(pullResult.nextBeginOffset); - request->putMessage(pullResult.msgFoundList); - - m_consumerService->submitConsumeRequest(request, pullResult.msgFoundList); - producePullMsgTask(request); - - LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld", messageQueue.toString().c_str(), - pullResult.msgFoundList.size(), pullResult.nextBeginOffset); - } else { - request->removePullMsgEvent(); + if (request->isDropped()) { + LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s", + messageQueue.toString().c_str()); + break; } + // and this request is dropped, and then received pulled msgs. + request->setNextOffset(pullResult.nextBeginOffset); + request->putMessage(pullResult.msgFoundList); + + m_consumerService->submitConsumeRequest(request, pullResult.msgFoundList); + producePullMsgTask(request); + + LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld", messageQueue.toString().c_str(), + pullResult.msgFoundList.size(), pullResult.nextBeginOffset); + break; } case NO_NEW_MSG: { + if (request->isDropped()) { + LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s", + messageQueue.toString().c_str()); + break; + } request->setNextOffset(pullResult.nextBeginOffset); vector msgs; request->getMessage(msgs); if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) { - /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset - is kept, then consumer will enter following situation: - 1>. get pull offset with 0 when do rebalance, and set - m_offsetTable[mq] to 0; - 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin - offset increase by 800 - 3>. request->getMessage(msgs) always NULL - 4>. we need update consumerOffset to nextBeginOffset indicated by - broker - but if really no new msg could be pulled, also go to this CASE - */ - // LOG_DEBUG("maybe misMatch between broker and client happens, update - // consumerOffset to nextBeginOffset indicated by broker"); updateConsumeOffset(messageQueue, pullResult.nextBeginOffset); } producePullMsgTask(request); @@ -687,12 +726,15 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest break; } case NO_MATCHED_MSG: { + if (request->isDropped()) { + LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s", + messageQueue.toString().c_str()); + break; + } request->setNextOffset(pullResult.nextBeginOffset); vector msgs; request->getMessage(msgs); if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) { - // LOG_DEBUG("maybe misMatch between broker and client happens, update - // consumerOffset to nextBeginOffset indicated by broker"); updateConsumeOffset(messageQueue, pullResult.nextBeginOffset); } producePullMsgTask(request); @@ -702,6 +744,11 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest break; } case OFFSET_ILLEGAL: { + if (request->isDropped()) { + LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s", + messageQueue.toString().c_str()); + break; + } request->setNextOffset(pullResult.nextBeginOffset); producePullMsgTask(request); @@ -720,7 +767,11 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest } } catch (MQException& e) { LOG_ERROR(e.what()); - producePullMsgTask(request); + if (request->removePullMsgEvent()) { + producePullMsgTaskLater(request, 1000); + } else { + LOG_WARN("Failed to swap pullMsgEvent[%s] flag", messageQueue.toString().c_str()); + } } } @@ -764,15 +815,12 @@ void DefaultMQPushConsumer::shutdownAsyncPullCallBack() { void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr pullRequest) { boost::shared_ptr request = pullRequest.lock(); if (!request) { + LOG_ERROR("Pull request is released, return"); return; } - if (request == NULL) { - LOG_ERROR("Pull request is NULL, return"); - return; - } - if (request->isDroped()) { + if (request->isDropped()) { LOG_WARN("Pull request is set drop with mq:%s, return", (request->m_messageQueue).toString().c_str()); - request->removePullMsgEvent(); + // request->removePullMsgEvent(); return; } @@ -780,19 +828,23 @@ void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr pullRe if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) { if (!request->isLocked() || request->isLockExpired()) { if (!m_pRebalance->lock(messageQueue)) { - producePullMsgTask(request); + // This process is on flight, should be remove event before producer task again. + request->removePullMsgEvent(); + // Retry later. + producePullMsgTaskLater(request, 1000); return; } } } if (request->getCacheMsgCount() > m_maxMsgCacheSize) { - // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger - // than:%d", (request->m_messageQueue).toString().c_str(), - // request->getCacheMsgCount(), m_maxMsgCacheSize); - boost::asio::deadline_timer* t = - new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(1 * 1000)); - t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest, this, t, request)); + LOG_INFO("Pull request for [%s] has Cached with %d Messages and The Max size is %d, Sleep 3s.", + (request->m_messageQueue).toString().c_str(), request->getCacheMsgCount(), m_maxMsgCacheSize); + request->setLastPullTimestamp(UtilAll::currentTimeMillis()); + // This process is on flight, should be remove event before producer task again. + request->removePullMsgEvent(); + // Retry 3s, + producePullMsgTaskLater(request, 3000); return; } @@ -808,7 +860,12 @@ void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr pullRe string subExpression; SubscriptionData* pSdata = (m_pRebalance->getSubscriptionData(messageQueue.getTopic())); if (pSdata == NULL) { - producePullMsgTask(request); + LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep 1s.", + (request->m_messageQueue).toString().c_str()); + // This process is on flight, should be remove event before producer task again. + request->removePullMsgEvent(); + // Subscribe data error, retry later. + producePullMsgTaskLater(request, 1000); return; } subExpression = pSdata->getSubString(); @@ -822,6 +879,10 @@ void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr pullRe arg.mq = messageQueue; arg.subData = *pSdata; arg.pPullWrapper = m_pPullAPIWrapper; + if (request->isDropped()) { + LOG_WARN("Pull request is set as dropped with mq:%s, return", request->m_messageQueue.toString().c_str()); + return; + } try { request->setLastPullTimestamp(UtilAll::currentTimeMillis()); m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1 @@ -839,7 +900,12 @@ void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr pullRe &arg); // 13 } catch (MQException& e) { LOG_ERROR(e.what()); - producePullMsgTask(request); + if (request->isDropped()) { + LOG_WARN("Pull request is set as dropped with mq:%s, return", (request->m_messageQueue).toString().c_str()); + return; + } + request->removePullMsgEvent(); + producePullMsgTaskLater(request, 1000); } } @@ -909,7 +975,7 @@ ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() { std::map> requestTable = m_pRebalance->getPullRequestTable(); for (const auto& it : requestTable) { - if (!it.second->isDroped()) { + if (!it.second->isDropped()) { MessageQueue queue((it.first).getTopic(), (it.first).getBrokerName(), (it.first).getQueueId()); ProcessQueueInfo processQueue; processQueue.cachedMsgMinOffset = it.second->getCacheMinOffset(); @@ -917,7 +983,7 @@ ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() { processQueue.cachedMsgCount = it.second->getCacheMsgCount(); processQueue.setCommitOffset( m_pOffsetStore->readOffset(it.first, MEMORY_FIRST_THEN_STORE, getSessionCredentials())); - processQueue.setDroped(it.second->isDroped()); + processQueue.setDroped(it.second->isDropped()); processQueue.setLocked(it.second->isLocked()); processQueue.lastLockTimestamp = it.second->getLastLockTimestamp(); processQueue.lastPullTimestamp = it.second->getLastPullTimestamp(); diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp index 335f7d4aa..84bd8e0cd 100644 --- a/src/consumer/PullRequest.cpp +++ b/src/consumer/PullRequest.cpp @@ -21,14 +21,23 @@ namespace rocketmq { //& msgs) { void PullRequest::clearAllMsgs() { boost::lock_guard lock(m_pullRequestLock); - if (isDroped()) { + if (isDropped()) { LOG_DEBUG("clear m_msgTreeMap as PullRequest had been dropped."); m_msgTreeMap.clear(); m_msgTreeMapTemp.clear(); @@ -143,9 +154,9 @@ void PullRequest::updateQueueMaxOffset(int64 queueOffset) { m_queueOffsetMax = queueOffset; } -void PullRequest::setDroped(bool droped) { - int temp = (droped == true ? 1 : 0); - m_bDroped.store(temp); +void PullRequest::setDropped(bool dropped) { + int temp = (dropped == true ? 1 : 0); + m_bDropped.store(temp); /* m_queueOffsetMax = 0; m_nextOffset = 0; @@ -160,8 +171,8 @@ void PullRequest::setDroped(bool droped) { */ } -bool PullRequest::isDroped() const { - return m_bDroped.load() == 1; +bool PullRequest::isDropped() const { + return m_bDropped.load() == 1; } int64 PullRequest::getNextOffset() { @@ -173,6 +184,7 @@ void PullRequest::setLocked(bool Locked) { int temp = (Locked == true ? 1 : 0); m_bLocked.store(temp); } + bool PullRequest::isLocked() const { return m_bLocked.load() == 1; } @@ -197,6 +209,16 @@ uint64 PullRequest::getLastPullTimestamp() const { return m_lastPullTimestamp; } +bool PullRequest::isPullRequestExpired() const { + uint64 interval = m_lastPullTimestamp + MAX_PULL_IDLE_TIME; + if (interval <= UtilAll::currentTimeMillis()) { + LOG_WARN("PullRequest for [%s] has been expired %lld ms,m_lastPullTimestamp = %lld ms", + m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - interval, m_lastPullTimestamp); + return true; + } + return false; +} + void PullRequest::setLastConsumeTimestamp(uint64 time) { m_lastConsumeTimestamp = time; } @@ -257,18 +279,33 @@ int64 PullRequest::commit() { } } -void PullRequest::removePullMsgEvent() { - m_bPullMsgEventInprogress = false; +bool PullRequest::removePullMsgEvent(bool force) { + // m_bPullMsgEventInprogress = false; + if (force) { + m_bPullMsgEventInprogress.store(false, boost::memory_order_relaxed); + return true; + } + + bool expected = true; + if (m_bPullMsgEventInprogress.compare_exchange_strong(expected, false, boost::memory_order_relaxed)) { + LOG_DEBUG("Un-mark in-flight pull request for %s", m_messageQueue.toString().c_str()); + return true; + } + LOG_WARN("Failed to un-mark in-flight pull request for %s", m_messageQueue.toString().c_str()); + return false; } bool PullRequest::addPullMsgEvent() { - if (m_bPullMsgEventInprogress == false) { - m_bPullMsgEventInprogress = true; - LOG_INFO("pullRequest with mq :%s set pullMsgEvent", m_messageQueue.toString().c_str()); + bool expected = false; + if (m_bPullMsgEventInprogress.compare_exchange_strong(expected, true, boost::memory_order_relaxed)) { + LOG_DEBUG("Mark in-flight pull request for %s", m_messageQueue.toString().c_str()); return true; } + LOG_WARN("Failed to mark in-flight pull request for %s", m_messageQueue.toString().c_str()); return false; } - +bool PullRequest::hasInFlightPullRequest() const { + return m_bPullMsgEventInprogress.load(boost::memory_order_relaxed); +} //& msgs); boost::timed_mutex& getPullRequestCriticalSection(); - void removePullMsgEvent(); + bool removePullMsgEvent(bool force = false); bool addPullMsgEvent(); + /** + * Check if there is an in-flight pull request. + */ + bool hasInFlightPullRequest() const; public: MQMessageQueue m_messageQueue; static const uint64 RebalanceLockInterval; // ms static const uint64 RebalanceLockMaxLiveTime; // ms + static const uint64 MAX_PULL_IDLE_TIME; // ms private: string m_groupname; int64 m_nextOffset; int64 m_queueOffsetMax; - boost::atomic m_bDroped; + boost::atomic m_bDropped; boost::atomic m_bLocked; map m_msgTreeMap; map m_msgTreeMapTemp; diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp index 73f33957f..b7464e462 100644 --- a/src/consumer/Rebalance.cpp +++ b/src/consumer/Rebalance.cpp @@ -157,7 +157,7 @@ void Rebalance::persistConsumerOffset() { boost::lock_guard lock(m_requestTableMutex); MQ2PULLREQ::iterator it = m_requestQueueTable.begin(); for (; it != m_requestQueueTable.end(); ++it) { - if (it->second && (!it->second->isDroped())) { + if (it->second && (!it->second->isDropped())) { mqs.push_back(it->first); } } @@ -269,7 +269,7 @@ void Rebalance::unlockAll(bool oneWay) { map*> brokerMqs; MQ2PULLREQ requestQueueTable = getPullRequestTable(); for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) { - if (!(it->second->isDroped())) { + if (!(it->second->isDropped())) { if (brokerMqs.find(it->first.getBrokerName()) == brokerMqs.end()) { vector* mqs = new vector; brokerMqs[it->first.getBrokerName()] = mqs; @@ -347,7 +347,7 @@ void Rebalance::lockAll() { map*> brokerMqs; MQ2PULLREQ requestQueueTable = getPullRequestTable(); for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) { - if (!(it->second->isDroped())) { + if (!(it->second->isDropped())) { string brokerKey = it->first.getBrokerName() + it->first.getTopic(); if (brokerMqs.find(brokerKey) == brokerMqs.end()) { vector* mqs = new vector; @@ -490,21 +490,36 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vectorsecond->setDroped(true); + + LOG_INFO("Drop mq:%s,because not responsive", mqtemp.toString().c_str()); + itDel->second->setDropped(true); // remove offset table to avoid offset backup removeUnnecessaryMessageQueue(mqtemp); itDel->second->clearAllMsgs(); - LOG_INFO("drop mq:%s", mqtemp.toString().c_str()); + + if (itDel->second->hasInFlightPullRequest()) { + LOG_WARN("Unconditionally remove in-flight pull request mark for queue: %s since it is being dropped", + mqtemp.toString().c_str()); + itDel->second->removePullMsgEvent(true); + } removePullRequest(mqtemp); changed = true; } - if (itDel->second->isLockExpired()) { + if (itDel->second->isPullRequestExpired()) { // if pull expired , set to dropped - itDel->second->setDroped(true); + + LOG_INFO("Drop mq:%s according Pull timeout.", mqtemp.toString().c_str()); + itDel->second->setDropped(true); removeUnnecessaryMessageQueue(mqtemp); itDel->second->clearAllMsgs(); + + if (itDel->second->hasInFlightPullRequest()) { + LOG_WARN("Unconditionally remove in-flight pull request mark for queue: %s since it is being dropped", + mqtemp.toString().c_str()); + itDel->second->removePullMsgEvent(true); + } removePullRequest(mqtemp); - LOG_INFO("drop mq:%s", mqtemp.toString().c_str()); + changed = true; } } } From a58b1fe35a1c6734853a950d255d9bea85fc26c4 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 10 Dec 2019 11:26:34 +0800 Subject: [PATCH 3/5] refactor(rebalance): use smart_ptr to manage pull request --- include/DefaultMQPullConsumer.h | 13 +++---------- include/DefaultMQPushConsumer.h | 4 +++- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h index 006c42a1c..af0194148 100644 --- a/include/DefaultMQPullConsumer.h +++ b/include/DefaultMQPullConsumer.h @@ -67,7 +67,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer { * @param subExpression * set filter expression for pulled msg, broker will filter msg actively * Now only OR operation is supported, eg: "tag1 || tag2 || tag3" - * if subExpression is setted to "null" or "*"��all msg will be subscribed + * if subExpression is setted to "null" or "*", all msg will be subscribed * @param offset * specify the started pull offset * @param maxNums @@ -90,7 +90,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer { * @param subExpression * set filter expression for pulled msg, broker will filter msg actively * Now only OR operation is supported, eg: "tag1 || tag2 || tag3" - * if subExpression is setted to "null" or "*"��all msg will be subscribed + * if subExpression is setted to "null" or "*", all msg will be subscribed * @param offset * specify the started pull offset * @param maxNums @@ -107,20 +107,13 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer { virtual ConsumerRunningInfo* getConsumerRunningInfo() { return NULL; } /** - * ��ȡ���ѽ��ȣ�����-1��ʾ���� * * @param mq * @param fromStore * @return */ int64 fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore); - /** - * ����topic��ȡMessageQueue���Ծ��ⷽʽ�����ڶ����Ա֮����� - * - * @param topic - * ��ϢTopic - * @return ���ض��м��� - */ + void fetchMessageQueuesInBalance(const std::string& topic, std::vector mqs); // temp persist consumer offset interface, only valid with diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h index aebb7d4ab..b6de08505 100644 --- a/include/DefaultMQPushConsumer.h +++ b/include/DefaultMQPushConsumer.h @@ -88,7 +88,9 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer { virtual bool producePullMsgTask(boost::weak_ptr); virtual bool producePullMsgTaskLater(boost::weak_ptr, int millis); - static void static_triggerNextPullRequest(void* context, boost::asio::deadline_timer* t, boost::weak_ptr); + static void static_triggerNextPullRequest(void* context, + boost::asio::deadline_timer* t, + boost::weak_ptr); void triggerNextPullRequest(boost::asio::deadline_timer* t, boost::weak_ptr); void runPullMsgQueue(TaskQueue* pTaskQueue); void pullMessage(boost::weak_ptr pullrequest); // sync pullMsg From 8715e0e11b0e179b6688df7dbee1b1d98ccdd288 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 10 Dec 2019 11:31:23 +0800 Subject: [PATCH 4/5] refactor(rebalance): use smart_ptr to manage pull request --- src/MQClientFactory.cpp | 2 +- src/consumer/ConsumeMessageConcurrentlyService.cpp | 3 ++- src/consumer/PullRequest.cpp | 6 +++--- src/consumer/Rebalance.cpp | 4 ++-- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index b26cb0a7e..03d4640b3 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -1044,7 +1044,7 @@ void MQClientFactory::resetOffset(const string& group, boost::shared_ptr pullreq = pullRequest.lock(); // PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq); if (pullreq) { - pullreq->setDropped(true); + pullreq->setDropped(true); LOG_INFO("resetOffset setDropped for mq:%s", mq.toString().data()); pullreq->clearAllMsgs(); pullreq->updateQueueMaxOffset(it->second); diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp index eb0575f75..e5df16e77 100644 --- a/src/consumer/ConsumeMessageConcurrentlyService.cpp +++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp @@ -142,7 +142,8 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr= 0) { m_pConsumer->updateConsumeOffset(request->m_messageQueue, offset); } else { - LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated before. skip..", (request->m_messageQueue).toString().c_str()); + LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated before. skip..", + (request->m_messageQueue).toString().c_str()); } } diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp index 84bd8e0cd..ba887be55 100644 --- a/src/consumer/PullRequest.cpp +++ b/src/consumer/PullRequest.cpp @@ -34,9 +34,9 @@ PullRequest::PullRequest(const string& groupname) m_bDropped(false), m_bLocked(false), m_bPullMsgEventInprogress(false) { - m_lastLockTimestamp = UtilAll::currentTimeMillis(); - m_lastPullTimestamp = UtilAll::currentTimeMillis(); - m_lastConsumeTimestamp = UtilAll::currentTimeMillis(); + m_lastLockTimestamp = UtilAll::currentTimeMillis(); + m_lastPullTimestamp = UtilAll::currentTimeMillis(); + m_lastConsumeTimestamp = UtilAll::currentTimeMillis(); } PullRequest::~PullRequest() { diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp index b7464e462..4e90cd022 100644 --- a/src/consumer/Rebalance.cpp +++ b/src/consumer/Rebalance.cpp @@ -492,7 +492,7 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vectorsecond->setDropped(true); + itDel->second->setDropped(true); // remove offset table to avoid offset backup removeUnnecessaryMessageQueue(mqtemp); itDel->second->clearAllMsgs(); @@ -509,7 +509,7 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vectorsecond->setDropped(true); + itDel->second->setDropped(true); removeUnnecessaryMessageQueue(mqtemp); itDel->second->clearAllMsgs(); From e4479bce4a8b25041564311053a6ed2c0b5af92c Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Wed, 11 Dec 2019 14:26:23 +0800 Subject: [PATCH 5/5] refactor(rebalance): use smart_ptr to manage pull request --- src/consumer/DefaultMQPushConsumer.cpp | 94 ++++++++++---------------- src/consumer/OffsetStore.cpp | 2 +- src/consumer/PullRequest.cpp | 35 +--------- src/consumer/PullRequest.h | 1 - src/consumer/Rebalance.cpp | 48 +++++-------- 5 files changed, 56 insertions(+), 124 deletions(-) diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp index 2231d833c..0cd542705 100644 --- a/src/consumer/DefaultMQPushConsumer.cpp +++ b/src/consumer/DefaultMQPushConsumer.cpp @@ -23,8 +23,6 @@ #include "Logging.h" #include "MQClientAPIImpl.h" #include "MQClientFactory.h" -#include "MQClientManager.h" -#include "MQProtos.h" #include "OffsetStore.h" #include "PullAPIWrapper.h" #include "PullSysFlag.h" @@ -49,7 +47,7 @@ class AsyncPullCallback : public PullCallback { return; } - if (m_bShutdown == true) { + if (m_bShutdown) { LOG_INFO("pullrequest for:%s in shutdown, return", (pullRequest->m_messageQueue).toString().c_str()); return; } @@ -58,11 +56,6 @@ class AsyncPullCallback : public PullCallback { pullRequest->m_messageQueue.toString().c_str()); return; } - if (!pullRequest->removePullMsgEvent()) { - LOG_WARN("Mark unflight for:%s failed, May be it is Dropped by #Rebalance.", - pullRequest->m_messageQueue.toString().c_str()); - return; - } switch (result.pullStatus) { case FOUND: { if (pullRequest->isDropped()) { @@ -171,13 +164,8 @@ class AsyncPullCallback : public PullCallback { LOG_WARN("Pull request has been released."); return; } - if (!pullRequest->removePullMsgEvent()) { - LOG_WARN("Mark unflight for:%s failed when exception, May be it is Dropped by #Rebalance.", - pullRequest->m_messageQueue.toString().c_str()); - return; - } std::string queueName = pullRequest->m_messageQueue.toString(); - if (m_bShutdown == true) { + if (m_bShutdown) { LOG_INFO("pullrequest for:%s in shutdown, return", queueName.c_str()); return; } @@ -544,6 +532,10 @@ void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue& mq) { void DefaultMQPushConsumer::static_triggerNextPullRequest(void* context, boost::asio::deadline_timer* t, boost::weak_ptr pullRequest) { + if (pullRequest.expired()) { + LOG_WARN("Pull request has been released before."); + return; + } DefaultMQPushConsumer* pDefaultMQPushConsumer = (DefaultMQPushConsumer*)context; if (pDefaultMQPushConsumer) { pDefaultMQPushConsumer->triggerNextPullRequest(t, pullRequest); @@ -556,6 +548,7 @@ void DefaultMQPushConsumer::triggerNextPullRequest(boost::asio::deadline_timer* deleteAndZero(t); boost::shared_ptr request = pullRequest.lock(); if (!request) { + LOG_WARN("Pull request has been released before."); return; } producePullMsgTask(request); @@ -581,6 +574,7 @@ bool DefaultMQPushConsumer::producePullMsgTaskLater(boost::weak_ptr bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr pullRequest) { boost::shared_ptr request = pullRequest.lock(); if (!request) { + LOG_WARN("Pull request has been released."); return false; } if (request->isDropped()) { @@ -588,16 +582,10 @@ bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr pull return false; } if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) { - if (request->addPullMsgEvent()) { - if (m_asyncPull) { - m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessageAsync, this, request)); - - } else { - m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request)); - } + if (m_asyncPull) { + m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessageAsync, this, request)); } else { - LOG_WARN("Failed to mark in-flight pull request for %s", request->m_messageQueue.toString().c_str()); - return false; + m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request)); } } else { LOG_WARN("produce PullRequest of mq:%s failed", request->m_messageQueue.toString().c_str()); @@ -626,6 +614,7 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) { if (!request->isLocked() || request->isLockExpired()) { if (!m_pRebalance->lock(messageQueue)) { + request->setLastPullTimestamp(UtilAll::currentTimeMillis()); producePullMsgTaskLater(request, 1000); return; } @@ -633,11 +622,9 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest } if (request->getCacheMsgCount() > m_maxMsgCacheSize) { - LOG_INFO("Sync Pull request for [%s] has Cached with %d Messages and The Max size is %d, Sleep 1s.", + LOG_INFO("Sync Pull request for %s has Cached with %d Messages and The Max size is %d, Sleep 1s.", (request->m_messageQueue).toString().c_str(), request->getCacheMsgCount(), m_maxMsgCacheSize); request->setLastPullTimestamp(UtilAll::currentTimeMillis()); - // This process is on flight, should be remove event before producer task again. - request->removePullMsgEvent(); // Retry 1s, producePullMsgTaskLater(request, 1000); return; @@ -657,7 +644,6 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest if (pSdata == NULL) { LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep 1s.", (request->m_messageQueue).toString().c_str()); - request->removePullMsgEvent(); producePullMsgTaskLater(request, 1000); return; } @@ -686,10 +672,6 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest NULL, getSessionCredentials())); PullResult pullResult = m_pPullAPIWrapper->processPullResult(messageQueue, result.get(), pSdata); - if (!request->removePullMsgEvent()) { - LOG_WARN("Failed to swap pullMsgEvent[%s] flag", messageQueue.toString().c_str()); - return; - } switch (pullResult.pullStatus) { case FOUND: { if (request->isDropped()) { @@ -767,11 +749,8 @@ void DefaultMQPushConsumer::pullMessage(boost::weak_ptr pullRequest } } catch (MQException& e) { LOG_ERROR(e.what()); - if (request->removePullMsgEvent()) { - producePullMsgTaskLater(request, 1000); - } else { - LOG_WARN("Failed to swap pullMsgEvent[%s] flag", messageQueue.toString().c_str()); - } + LOG_WARN("Pull %s occur exception, restart 1s later.", messageQueue.toString().c_str()); + producePullMsgTaskLater(request, 1000); } } @@ -820,16 +799,13 @@ void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr pullRe } if (request->isDropped()) { LOG_WARN("Pull request is set drop with mq:%s, return", (request->m_messageQueue).toString().c_str()); - // request->removePullMsgEvent(); return; } - MQMessageQueue& messageQueue = request->m_messageQueue; if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) { if (!request->isLocked() || request->isLockExpired()) { if (!m_pRebalance->lock(messageQueue)) { - // This process is on flight, should be remove event before producer task again. - request->removePullMsgEvent(); + request->setLastPullTimestamp(UtilAll::currentTimeMillis()); // Retry later. producePullMsgTaskLater(request, 1000); return; @@ -841,8 +817,6 @@ void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr pullRe LOG_INFO("Pull request for [%s] has Cached with %d Messages and The Max size is %d, Sleep 3s.", (request->m_messageQueue).toString().c_str(), request->getCacheMsgCount(), m_maxMsgCacheSize); request->setLastPullTimestamp(UtilAll::currentTimeMillis()); - // This process is on flight, should be remove event before producer task again. - request->removePullMsgEvent(); // Retry 3s, producePullMsgTaskLater(request, 3000); return; @@ -862,8 +836,6 @@ void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr pullRe if (pSdata == NULL) { LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep 1s.", (request->m_messageQueue).toString().c_str()); - // This process is on flight, should be remove event before producer task again. - request->removePullMsgEvent(); // Subscribe data error, retry later. producePullMsgTaskLater(request, 1000); return; @@ -885,26 +857,32 @@ void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr pullRe } try { request->setLastPullTimestamp(UtilAll::currentTimeMillis()); - m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1 - subExpression, // 2 - pSdata->getSubVersion(), // 3 - request->getNextOffset(), // 4 - 32, // 5 - sysFlag, // 6 - commitOffsetValue, // 7 - 1000 * 15, // 8 - m_asyncPullTimeout, // 9 - ComMode_ASYNC, // 10 - getAsyncPullCallBack(request, messageQueue), // 11 - getSessionCredentials(), // 12 - &arg); // 13 + AsyncPullCallback* pullCallback = getAsyncPullCallBack(request, messageQueue); + if (pullCallback == NULL) { + LOG_WARN("Can not get pull callback for:%s, Maybe this pull request has been released.", + request->m_messageQueue.toString().c_str()); + return; + } + m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1 + subExpression, // 2 + pSdata->getSubVersion(), // 3 + request->getNextOffset(), // 4 + 32, // 5 + sysFlag, // 6 + commitOffsetValue, // 7 + 1000 * 15, // 8 + m_asyncPullTimeout, // 9 + ComMode_ASYNC, // 10 + pullCallback, // 11 + getSessionCredentials(), // 12 + &arg); // 13 } catch (MQException& e) { LOG_ERROR(e.what()); if (request->isDropped()) { LOG_WARN("Pull request is set as dropped with mq:%s, return", (request->m_messageQueue).toString().c_str()); return; } - request->removePullMsgEvent(); + LOG_INFO("Pull %s occur exception, restart 1s later.", (request->m_messageQueue).toString().c_str()); producePullMsgTaskLater(request, 1000); } } diff --git a/src/consumer/OffsetStore.cpp b/src/consumer/OffsetStore.cpp index 18bce86b3..324653ddb 100644 --- a/src/consumer/OffsetStore.cpp +++ b/src/consumer/OffsetStore.cpp @@ -276,7 +276,7 @@ void RemoteBrokerOffsetStore::persist(const MQMessageQueue& mq, const SessionCre try { updateConsumeOffsetToBroker(mq, it->second, session_credentials); } catch (MQException& e) { - LOG_ERROR("updateConsumeOffsetToBroker error"); + LOG_ERROR("updateConsumeOffsetToBroker %s ,offset:[%lld] error", mq.toString().c_str(), it->second); } } } diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp index ba887be55..e57884088 100644 --- a/src/consumer/PullRequest.cpp +++ b/src/consumer/PullRequest.cpp @@ -28,12 +28,7 @@ const uint64 PullRequest::RebalanceLockMaxLiveTime = 30 * 1000; const uint64 PullRequest::MAX_PULL_IDLE_TIME = 120 * 1000; PullRequest::PullRequest(const string& groupname) - : m_groupname(groupname), - m_nextOffset(0), - m_queueOffsetMax(0), - m_bDropped(false), - m_bLocked(false), - m_bPullMsgEventInprogress(false) { + : m_groupname(groupname), m_nextOffset(0), m_queueOffsetMax(0), m_bDropped(false), m_bLocked(false) { m_lastLockTimestamp = UtilAll::currentTimeMillis(); m_lastPullTimestamp = UtilAll::currentTimeMillis(); m_lastConsumeTimestamp = UtilAll::currentTimeMillis(); @@ -279,33 +274,5 @@ int64 PullRequest::commit() { } } -bool PullRequest::removePullMsgEvent(bool force) { - // m_bPullMsgEventInprogress = false; - if (force) { - m_bPullMsgEventInprogress.store(false, boost::memory_order_relaxed); - return true; - } - - bool expected = true; - if (m_bPullMsgEventInprogress.compare_exchange_strong(expected, false, boost::memory_order_relaxed)) { - LOG_DEBUG("Un-mark in-flight pull request for %s", m_messageQueue.toString().c_str()); - return true; - } - LOG_WARN("Failed to un-mark in-flight pull request for %s", m_messageQueue.toString().c_str()); - return false; -} - -bool PullRequest::addPullMsgEvent() { - bool expected = false; - if (m_bPullMsgEventInprogress.compare_exchange_strong(expected, true, boost::memory_order_relaxed)) { - LOG_DEBUG("Mark in-flight pull request for %s", m_messageQueue.toString().c_str()); - return true; - } - LOG_WARN("Failed to mark in-flight pull request for %s", m_messageQueue.toString().c_str()); - return false; -} -bool PullRequest::hasInFlightPullRequest() const { - return m_bPullMsgEventInprogress.load(boost::memory_order_relaxed); -} // m_bPullMsgEventInprogress; }; //& mqsSelf) { - LOG_DEBUG("updateRequestTableInRebalance Enter"); + LOG_DEBUG("updateRequestTableInRebalance for Topic[%s] Enter", topic.c_str()); // 1. Clear no in charge of // 1. set dropped @@ -476,13 +476,12 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vectorsecond->setDropped(true); // remove offset table to avoid offset backup removeUnnecessaryMessageQueue(mqtemp); itDel->second->clearAllMsgs(); - - if (itDel->second->hasInFlightPullRequest()) { - LOG_WARN("Unconditionally remove in-flight pull request mark for queue: %s since it is being dropped", - mqtemp.toString().c_str()); - itDel->second->removePullMsgEvent(true); - } removePullRequest(mqtemp); changed = true; - } - if (itDel->second->isPullRequestExpired()) { - // if pull expired , set to dropped - + } else if (itDel->second->isPullRequestExpired()) { + // if pull expired , set to dropped, eg: if add pull task error, the pull request will be expired. LOG_INFO("Drop mq:%s according Pull timeout.", mqtemp.toString().c_str()); itDel->second->setDropped(true); removeUnnecessaryMessageQueue(mqtemp); itDel->second->clearAllMsgs(); - - if (itDel->second->hasInFlightPullRequest()) { - LOG_WARN("Unconditionally remove in-flight pull request mark for queue: %s since it is being dropped", - mqtemp.toString().c_str()); - itDel->second->removePullMsgEvent(true); - } removePullRequest(mqtemp); changed = true; } } } - // if mqSelf == null, it is better to break; //> pullRequestsToAdd; vector::iterator itAdd = mqsSelf.begin(); for (; itAdd != mqsSelf.end(); ++itAdd) { - // PullRequest* pPullRequest(getPullRequest(*it2)); - // boost::weak_ptr pPullRequest = getPullRequest(*itAdd); - // if exist in table, go to next if (isPullRequestExist(*itAdd)) { + // have check the expired pull request, re-add it. continue; } boost::shared_ptr pullRequest = boost::make_shared(m_pConsumer->getGroupName()); @@ -543,7 +524,7 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector>::iterator itAdded = pullRequestsToAdd.begin(); itAdded != pullRequestsToAdd.end(); ++itAdded) { - LOG_DEBUG("start to pull %s", (*itAdded)->m_messageQueue.toString().c_str()); + LOG_INFO("Start to pull %s, offset:%ld, GroupName %s", (*itAdded)->m_messageQueue.toString().c_str(), + (*itAdded)->getNextOffset(), (*itAdded)->getGroupName().c_str()); if (!m_pConsumer->producePullMsgTask(*itAdded)) { - LOG_WARN("Failed to producer pull message task for %s", (*itAdded)->m_messageQueue.toString().c_str()); + LOG_WARN( + "Failed to producer pull message task for %s, Remove it from Request table and wait for next #Rebalance.", + (*itAdded)->m_messageQueue.toString().c_str()); + // remove from request table, and wait for next rebalance. + (*itAdded)->setDropped(true); + removePullRequest((*itAdded)->m_messageQueue); } } - LOG_DEBUG("updateRequestTableInRebalance exit"); + LOG_DEBUG("updateRequestTableInRebalance Topic[%s] exit", topic.c_str()); return changed; } @@ -567,7 +554,8 @@ int64 RebalancePush::computePullFromWhere(const MQMessageQueue& mq) { int64 result = -1; DefaultMQPushConsumer* pConsumer = dynamic_cast(m_pConsumer); if (!pConsumer) { - LOG_ERROR("Cast consumer pointer to DefaultMQPushConsumer pointer failed"); + LOG_ERROR("Cast consumer pointer to DefaultMQPushConsumer pointer failed when computePullFromWhere %s", + mq.toString().c_str()); return result; } ConsumeFromWhere consumeFromWhere = pConsumer->getConsumeFromWhere(); @@ -653,7 +641,7 @@ void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) { // DefaultMQPushConsumer *pConsumer = static_cast(m_pConsumer); DefaultMQPushConsumer* pConsumer = dynamic_cast(m_pConsumer); if (!pConsumer) { - LOG_ERROR("Cast MQConsumer* to DefaultMQPushConsumer* failed"); + LOG_ERROR("Cast MQConsumer* to DefaultMQPushConsumer* failed when remove %s", mq.toString().c_str()); return; } OffsetStore* pOffsetStore = pConsumer->getOffsetStore();