Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rebalance): use smart_ptr to manage pull request #206

Merged
merged 5 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions include/DefaultMQPullConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
virtual void getSubscriptions(std::vector<SubscriptionData>&);
virtual void updateConsumeOffset(const MQMessageQueue& mq, int64 offset);
virtual void removeConsumeOffset(const MQMessageQueue& mq);
virtual void producePullMsgTask(PullRequest*);
virtual bool producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest);
virtual Rebalance* getRebalance() const;
//<!end MQConsumer;

Expand All @@ -67,7 +67,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
* @param subExpression
* set filter expression for pulled msg, broker will filter msg actively
* Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
* if subExpression is setted to "null" or "*"��all msg will be subscribed
* if subExpression is setted to "null" or "*", all msg will be subscribed
* @param offset
* specify the started pull offset
* @param maxNums
Expand All @@ -90,7 +90,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
* @param subExpression
* set filter expression for pulled msg, broker will filter msg actively
* Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
* if subExpression is setted to "null" or "*"��all msg will be subscribed
* if subExpression is setted to "null" or "*", all msg will be subscribed
* @param offset
* specify the started pull offset
* @param maxNums
Expand All @@ -107,20 +107,13 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {

virtual ConsumerRunningInfo* getConsumerRunningInfo() { return NULL; }
/**
* ��ȡ���ѽ��ȣ�����-1��ʾ����
*
* @param mq
* @param fromStore
* @return
*/
int64 fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore);
/**
* ����topic��ȡMessageQueue���Ծ��ⷽʽ�����ڶ����Ա֮�����
*
* @param topic
* ��ϢTopic
* @return ���ض��м���
*/

void fetchMessageQueuesInBalance(const std::string& topic, std::vector<MQMessageQueue> mqs);

// temp persist consumer offset interface, only valid with
Expand Down
14 changes: 9 additions & 5 deletions include/DefaultMQPushConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
virtual Rebalance* getRebalance() const;
ConsumeMsgService* getConsumerMsgService() const;

virtual void producePullMsgTask(PullRequest*);
void triggerNextPullRequest(boost::asio::deadline_timer* t, PullRequest* request);
virtual bool producePullMsgTask(boost::weak_ptr<PullRequest>);
virtual bool producePullMsgTaskLater(boost::weak_ptr<PullRequest>, int millis);
static void static_triggerNextPullRequest(void* context,
boost::asio::deadline_timer* t,
boost::weak_ptr<PullRequest>);
void triggerNextPullRequest(boost::asio::deadline_timer* t, boost::weak_ptr<PullRequest>);
void runPullMsgQueue(TaskQueue* pTaskQueue);
void pullMessage(PullRequest* pullrequest); // sync pullMsg
void pullMessageAsync(PullRequest* pullrequest); // async pullMsg
void pullMessage(boost::weak_ptr<PullRequest> pullrequest); // sync pullMsg
void pullMessageAsync(boost::weak_ptr<PullRequest> pullrequest); // async pullMsg
void setAsyncPull(bool asyncFlag);
AsyncPullCallback* getAsyncPullCallBack(PullRequest* request, MQMessageQueue msgQueue);
AsyncPullCallback* getAsyncPullCallBack(boost::weak_ptr<PullRequest>, MQMessageQueue msgQueue);
void shutdownAsyncPullCallBack();

/*
Expand Down
2 changes: 1 addition & 1 deletion include/MQConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API MQConsumer : public MQClient {
virtual ConsumeType getConsumeType() = 0;
virtual ConsumeFromWhere getConsumeFromWhere() = 0;
virtual void getSubscriptions(std::vector<SubscriptionData>&) = 0;
virtual void producePullMsgTask(PullRequest*) = 0;
virtual bool producePullMsgTask(boost::weak_ptr<PullRequest>) = 0;
virtual Rebalance* getRebalance() const = 0;
virtual PullResult pull(const MQMessageQueue& mq, const std::string& subExpression, int64 offset, int maxNums) = 0;
virtual void pull(const MQMessageQueue& mq,
Expand Down
8 changes: 5 additions & 3 deletions src/MQClientFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1040,10 +1040,12 @@ void MQClientFactory::resetOffset(const string& group,

for (; it != offsetTable.end(); ++it) {
MQMessageQueue mq = it->first;
PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
boost::weak_ptr<PullRequest> pullRequest = pConsumer->getRebalance()->getPullRequest(mq);
boost::shared_ptr<PullRequest> pullreq = pullRequest.lock();
// PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
if (pullreq) {
pullreq->setDroped(true);
LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data());
pullreq->setDropped(true);
LOG_INFO("resetOffset setDropped for mq:%s", mq.toString().data());
pullreq->clearAllMsgs();
pullreq->updateQueueMaxOffset(it->second);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/AllocateMQStrategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,5 @@ class AllocateMQAveragely : public AllocateMQStrategy {
};

//<!***************************************************************************
} //<!end namespace;
} // namespace rocketmq
#endif
33 changes: 24 additions & 9 deletions src/consumer/ConsumeMessageConcurrentlyService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,35 @@ MessageListenerType ConsumeMessageConcurrentlyService::getConsumeMsgSerivceListe
return m_pMessageListener->getMessageListenerType();
}

void ConsumeMessageConcurrentlyService::submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs) {
void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<PullRequest> pullRequest,
vector<MQMessageExt>& msgs) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
if (request->isDropped()) {
LOG_INFO("Pull request for %s is dropped, which will be released in next re-balance.",
request->m_messageQueue.toString().c_str());
return;
}
m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest, this, request, msgs));
}

void ConsumeMessageConcurrentlyService::ConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs) {
if (!request || request->isDroped()) {
LOG_WARN("the pull result is NULL or Had been dropped");
void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest,
vector<MQMessageExt>& msgs) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
if (!request || request->isDropped()) {
LOG_WARN("the pull request had been dropped");
request->clearAllMsgs(); // add clear operation to avoid bad state when
// dropped pullRequest returns normal
return;
}

//<!��ȡ����;
if (msgs.empty()) {
LOG_WARN("the msg of pull result is NULL,its mq:%s", (request->m_messageQueue).toString().c_str());
return;
Expand Down Expand Up @@ -123,12 +139,11 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(PullRequest* request, vec

// update offset
int64 offset = request->removeMessage(msgs);
// LOG_DEBUG("update offset:%lld of mq: %s",
// offset,(request->m_messageQueue).toString().c_str());
if (offset >= 0) {
m_pConsumer->updateConsumeOffset(request->m_messageQueue, offset);
} else {
LOG_WARN("Note: accumulation consume occurs on mq:%s", (request->m_messageQueue).toString().c_str());
LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated before. skip..",
(request->m_messageQueue).toString().c_str());
}
}

Expand All @@ -144,4 +159,4 @@ void ConsumeMessageConcurrentlyService::resetRetryTopic(vector<MQMessageExt>& ms
}

//<!***************************************************************************
} //<!end namespace;
} // namespace rocketmq
33 changes: 27 additions & 6 deletions src/consumer/ConsumeMessageOrderlyService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,25 @@ MessageListenerType ConsumeMessageOrderlyService::getConsumeMsgSerivceListenerTy
return m_pMessageListener->getMessageListenerType();
}

void ConsumeMessageOrderlyService::submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs) {
void ConsumeMessageOrderlyService::submitConsumeRequest(boost::weak_ptr<PullRequest> pullRequest,
vector<MQMessageExt>& msgs) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
m_ioService.post(boost::bind(&ConsumeMessageOrderlyService::ConsumeRequest, this, request));
}

void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* context,
PullRequest* request,
boost::weak_ptr<PullRequest> pullRequest,
bool tryLockMQ,
boost::asio::deadline_timer* t) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
LOG_INFO("submit consumeRequest later for mq:%s", request->m_messageQueue.toString().c_str());
vector<MQMessageExt> msgs;
ConsumeMessageOrderlyService* orderlyService = (ConsumeMessageOrderlyService*)context;
Expand All @@ -122,7 +133,12 @@ void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* contex
deleteAndZero(t);
}

void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
bool bGetMutex = false;
boost::unique_lock<boost::timed_mutex> lock(request->getPullRequestCriticalSection(), boost::try_to_lock);
if (!lock.owns_lock()) {
Expand All @@ -140,7 +156,7 @@ void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
// request->m_messageQueue.toString().c_str());
return;
}
if (!request || request->isDroped()) {
if (!request || request->isDropped()) {
LOG_WARN("the pull result is NULL or Had been dropped");
request->clearAllMsgs(); // add clear operation to avoid bad state when
// dropped pullRequest returns normal
Expand Down Expand Up @@ -189,11 +205,16 @@ void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
}
}
}
void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(PullRequest* request, bool tryLockMQ) {
void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> pullRequest, bool tryLockMQ) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
int retryTimer = tryLockMQ ? 500 : 100;
boost::asio::deadline_timer* t =
new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(retryTimer));
t->async_wait(
boost::bind(&(ConsumeMessageOrderlyService::static_submitConsumeRequestLater), this, request, tryLockMQ, t));
}
}
} // namespace rocketmq
16 changes: 8 additions & 8 deletions src/consumer/ConsumeMsgService.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ConsumeMsgService {
virtual void start() {}
virtual void shutdown() {}
virtual void stopThreadPool() {}
virtual void submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs) {}
virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>& msgs) {}
virtual MessageListenerType getConsumeMsgSerivceListenerType() { return messageListenerDefaultly; }
};

Expand All @@ -48,11 +48,11 @@ class ConsumeMessageConcurrentlyService : public ConsumeMsgService {
virtual ~ConsumeMessageConcurrentlyService();
virtual void start();
virtual void shutdown();
virtual void submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs);
virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>& msgs);
virtual MessageListenerType getConsumeMsgSerivceListenerType();
virtual void stopThreadPool();

void ConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs);
void ConsumeRequest(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>& msgs);

private:
void resetRetryTopic(vector<MQMessageExt>& msgs);
Expand All @@ -71,17 +71,17 @@ class ConsumeMessageOrderlyService : public ConsumeMsgService {
virtual ~ConsumeMessageOrderlyService();
virtual void start();
virtual void shutdown();
virtual void submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs);
virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>& msgs);
virtual void stopThreadPool();
virtual MessageListenerType getConsumeMsgSerivceListenerType();

void boost_asio_work();
void tryLockLaterAndReconsume(PullRequest* request, bool tryLockMQ);
void tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> request, bool tryLockMQ);
static void static_submitConsumeRequestLater(void* context,
PullRequest* request,
boost::weak_ptr<PullRequest> request,
bool tryLockMQ,
boost::asio::deadline_timer* t);
void ConsumeRequest(PullRequest* request);
void ConsumeRequest(boost::weak_ptr<PullRequest> request);
void lockMQPeriodically(boost::system::error_code& ec, boost::asio::deadline_timer* t);
void unlockAllMQ();
bool lockOneMQ(const MQMessageQueue& mq);
Expand All @@ -99,6 +99,6 @@ class ConsumeMessageOrderlyService : public ConsumeMsgService {
};

//<!***************************************************************************
} //<!end namespace;
} // namespace rocketmq

#endif //<! _CONSUMEMESSAGESERVICE_H_
6 changes: 4 additions & 2 deletions src/consumer/DefaultMQPullConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,13 @@ void DefaultMQPullConsumer::getSubscriptions(vector<SubscriptionData>& result) {
}
}

void DefaultMQPullConsumer::producePullMsgTask(PullRequest*) {}
bool DefaultMQPullConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest) {
return true;
}

Rebalance* DefaultMQPullConsumer::getRebalance() const {
return NULL;
}

//<!************************************************************************
} //<!end namespace;
} // namespace rocketmq
Loading