Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(apis):refactor apis for CPP styles #236

Merged
merged 3 commits into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions include/Arg_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "RocketMQClient.h"

namespace rocketmq {
//<!***************************************************************************
class ROCKETMQCLIENT_API Arg_helper {
public:
Arg_helper(int argc, char* argv[]);
Expand All @@ -36,7 +35,6 @@ class ROCKETMQCLIENT_API Arg_helper {
std::vector<std::string> m_args;
};

//<!***************************************************************************
} // namespace rocketmq

#endif //<!_ARG_HELPER_H_;
4 changes: 0 additions & 4 deletions include/AsyncCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
#include "SendResult.h"

namespace rocketmq {
//<!***************************************************************************
struct AsyncCallback {};
//<!***************************************************************************
typedef enum sendCallbackType { noAutoDeleteSendCallback = 0, autoDeleteSendCallback = 1 } sendCallbackType;

class ROCKETMQCLIENT_API SendCallback : public AsyncCallback {
Expand All @@ -46,13 +44,11 @@ class ROCKETMQCLIENT_API AutoDeleteSendCallBack : public SendCallback {
virtual sendCallbackType getSendCallbackType() { return autoDeleteSendCallback; }
};

//<!************************************************************************
class ROCKETMQCLIENT_API PullCallback : public AsyncCallback {
public:
virtual ~PullCallback() {}
virtual void onSuccess(MQMessageQueue& mq, PullResult& result, bool bProducePullRequest) = 0;
virtual void onException(MQException& e) = 0;
};
//<!***************************************************************************
} // namespace rocketmq
#endif
63 changes: 33 additions & 30 deletions include/DefaultMQProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

namespace rocketmq {
class DefaultMQProducerImpl;
//<!***************************************************************************
class ROCKETMQCLIENT_API DefaultMQProducer {
public:
DefaultMQProducer(const std::string& groupname);
Expand All @@ -54,47 +53,57 @@ class ROCKETMQCLIENT_API DefaultMQProducer {
virtual void sendOneway(MQMessage& msg, const MQMessageQueue& mq);
virtual void sendOneway(MQMessage& msg, MessageQueueSelector* selector, void* arg);

const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);

void setSessionCredentials(const std::string& accessKey,
const std::string& secretKey,
const std::string& accessChannel);
const SessionCredentials& getSessionCredentials() const;

const std::string& getNamesrvDomain() const;
void setNamesrvDomain(const std::string& namesrvDomain);

const std::string& getNameSpace() const;
void setNameSpace(const std::string& nameSpace);

const std::string& getGroupName() const;
void setGroupName(const std::string& groupname);

const std::string& getInstanceName() const;
void setInstanceName(const std::string& instanceName);

/**
* Log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
* log file num is 3, each log size is 100M
**/
void setLogLevel(elogLevel inputLevel);
elogLevel getLogLevel();
void setLogPath(const std::string& logPath);
void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit

int getSendMsgTimeout() const;
void setSendMsgTimeout(int sendMsgTimeout);

/*
* if msgBody size is large than m_compressMsgBodyOverHowmuch
* If msgBody size is large than compressMsgBodyOverHowmuch
* rocketmq cpp will compress msgBody according to compressLevel
*/
int getCompressMsgBodyOverHowmuch() const;
void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch);
int getCompressLevel() const;
void setCompressLevel(int compressLevel);

// if msgbody size larger than maxMsgBodySize, exception will be throwed
int getMaxMessageSize() const;
void setMaxMessageSize(int maxMessageSize);

// set msg max retry times, default retry times is 5
int getRetryTimes() const;
void setRetryTimes(int times);

int getRetryTimes4Async() const;
void setRetryTimes4Async(int times);
const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);
const std::string& getNamesrvDomain() const;
void setNamesrvDomain(const std::string& namesrvDomain);
const std::string& getInstanceName() const;
void setInstanceName(const std::string& instanceName);
// nameSpace
const std::string& getNameSpace() const;
void setNameSpace(const std::string& nameSpace);
const std::string& getGroupName() const;
void setGroupName(const std::string& groupname);

// log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
// log file num is 3, each log size is 100M
void setLogLevel(elogLevel inputLevel);
elogLevel getLogLevel();
void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit

/** set TcpTransport pull thread num, which dermine the num of threads to
/** Set TcpTransport pull thread num, which dermine the num of threads to
* distribute network data,
* 1. its default value is CPU num, it must be setted before producer/consumer
* start, minimum value is CPU num;
Expand All @@ -106,15 +115,15 @@ class ROCKETMQCLIENT_API DefaultMQProducer {
void setTcpTransportPullThreadNum(int num);
const int getTcpTransportPullThreadNum() const;

/** timeout of tcp connect, it is same meaning for both producer and consumer;
/** Timeout of tcp connect, it is same meaning for both producer and consumer;
* 1. default value is 3000ms
* 2. input parameter could only be milliSecond, suggestion value is
* 1000-3000ms;
**/
void setTcpTransportConnectTimeout(uint64_t timeout); // ms
const uint64_t getTcpTransportConnectTimeout() const;

/** timeout of tryLock tcpTransport before sendMsg/pullMsg, if timeout,
/** Timeout of tryLock tcpTransport before sendMsg/pullMsg, if timeout,
* returns NULL
* 1. paremeter unit is ms, default value is 3000ms, the minimun value is 1000ms
* suggestion value is 3000ms;
Expand All @@ -127,14 +136,8 @@ class ROCKETMQCLIENT_API DefaultMQProducer {
void setUnitName(std::string unitName);
const std::string& getUnitName() const;

void setSessionCredentials(const std::string& accessKey,
const std::string& secretKey,
const std::string& accessChannel);
const SessionCredentials& getSessionCredentials() const;

private:
DefaultMQProducerImpl* impl;
};
//<!***************************************************************************
} // namespace rocketmq
#endif
67 changes: 36 additions & 31 deletions include/DefaultMQPullConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,55 +29,50 @@
#include "MQueueListener.h"
#include "PullResult.h"
#include "RocketMQClient.h"
#include "SessionCredentials.h"

namespace rocketmq {
class SubscriptionData;
class DefaultMQPullConsumerImpl;
//<!***************************************************************************
class ROCKETMQCLIENT_API DefaultMQPullConsumer {
public:
DefaultMQPullConsumer(const std::string& groupname);
virtual ~DefaultMQPullConsumer();

//<!begin mqadmin;
virtual void start();
virtual void shutdown();
//<!end mqadmin;

const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);

void setSessionCredentials(const std::string& accessKey,
const std::string& secretKey,
const std::string& accessChannel);
const SessionCredentials& getSessionCredentials() const;

const std::string& getNamesrvDomain() const;
void setNamesrvDomain(const std::string& namesrvDomain);

const std::string& getInstanceName() const;
void setInstanceName(const std::string& instanceName);
// nameSpace

const std::string& getNameSpace() const;
void setNameSpace(const std::string& nameSpace);

const std::string& getGroupName() const;
void setGroupName(const std::string& groupname);

// log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
// log file num is 3, each log size is 100M
/**
* Log configuration interface, default LOG_LEVEL is LOG_LEVEL_INFO, default
* log file num is 3, each log size is 100M
**/
void setLogLevel(elogLevel inputLevel);
elogLevel getLogLevel();
void setLogPath(const std::string& logPath);
void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit

void setSessionCredentials(const std::string& accessKey,
const std::string& secretKey,
const std::string& accessChannel);
const SessionCredentials& getSessionCredentials() const;
//<!begin MQConsumer
virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs);
virtual void persistConsumerOffset();
virtual void persistConsumerOffsetByResetOffset();
virtual void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info);
virtual ConsumeType getConsumeType();
virtual ConsumeFromWhere getConsumeFromWhere();
virtual void getSubscriptions(std::vector<SubscriptionData>&);
virtual void updateConsumeOffset(const MQMessageQueue& mq, int64 offset);
virtual void removeConsumeOffset(const MQMessageQueue& mq);
//<!end MQConsumer;

void registerMessageQueueListener(const std::string& topic, MQueueListener* pListener);
/**
* Pull message from specified queue, if no msg in queue, return directly
*
Expand All @@ -102,7 +97,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer {
PullCallback* pPullCallback);

/**
* pull msg from specified queue, if no msg, broker will suspend the pull request 20s
* Pull msg from specified queue, if no msg, broker will suspend the pull request 20s
*
* @param mq
* specify the pulled queue
Expand All @@ -117,24 +112,34 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer {
* @return
* accroding to PullResult
*/
PullResult pullBlockIfNotFound(const MQMessageQueue& mq, const std::string& subExpression, int64 offset, int maxNums);
void pullBlockIfNotFound(const MQMessageQueue& mq,
const std::string& subExpression,
int64 offset,
int maxNums,
PullCallback* pPullCallback);
virtual PullResult pullBlockIfNotFound(const MQMessageQueue& mq,
const std::string& subExpression,
int64 offset,
int maxNums);
virtual void pullBlockIfNotFound(const MQMessageQueue& mq,
const std::string& subExpression,
int64 offset,
int maxNums,
PullCallback* pPullCallback);

void persistConsumerOffset();
void persistConsumerOffsetByResetOffset();
void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info);
ConsumeFromWhere getConsumeFromWhere();
void getSubscriptions(std::vector<SubscriptionData>&);
void updateConsumeOffset(const MQMessageQueue& mq, int64 offset);
void removeConsumeOffset(const MQMessageQueue& mq);

void registerMessageQueueListener(const std::string& topic, MQueueListener* pListener);

int64 fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore);

void fetchMessageQueuesInBalance(const std::string& topic, std::vector<MQMessageQueue> mqs);

// temp persist consumer offset interface, only valid with
// RemoteBrokerOffsetStore, updateConsumeOffset should be called before.
void persistConsumerOffset4PullConsumer(const MQMessageQueue& mq);

private:
DefaultMQPullConsumerImpl* impl;
};
//<!***************************************************************************
} // namespace rocketmq
#endif
Loading