Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8023] Add expression filtering capability to the pullBlockIfNotFound method of pull consumer #8024

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

/**
* @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation {@link
* DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages.
* @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation
* {@link DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages.
*/
@Deprecated
public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
Expand Down Expand Up @@ -375,6 +375,20 @@ public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offs
this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback);
}

@Override
public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector selector,
long offset, int maxNums,
PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq, selector, offset, maxNums, pullCallback);
}

@Override
public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector selector,
long offset,
int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq, selector, offset, maxNums);
}

@Override
public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public interface MQPullConsumer extends MQConsumer {
*
* @param mq from which message queue
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe
* all
* null or * expression,meaning subscribe all
* @param offset from where to pull
* @param maxNums max pulling numbers
* @return The resulting {@code PullRequest}
Expand Down Expand Up @@ -121,7 +120,7 @@ void pull(final MessageQueue mq, final String subExpression, final long offset,
InterruptedException;

/**
* Pulling the messages in a async. way. Support message selection
* Pulling the messages in a async way. Support message selection
*/
void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
final PullCallback pullCallback) throws MQClientException, RemotingException,
Expand Down Expand Up @@ -150,6 +149,23 @@ void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, fina
final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException,
InterruptedException;

/**
* Pulling the messages through callback function,if no message arrival,blocking. Support message selection
*/
void pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, final MessageSelector selector,
final long offset, final int maxNums,
final PullCallback pullCallback) throws MQClientException, RemotingException,
InterruptedException;

/**
* Pulling the messages,if no message arrival,blocking some time. Support message selection
*
* @return The resulting {@code PullRequest}
*/
PullResult pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, final MessageSelector selector,
final long offset, final int maxNums) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;

/**
* Update the offset
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,21 @@ public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offs
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}

public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true,
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}

public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}


public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
this.isRunning();
Expand Down
Loading