Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default policy and imeout to override the "hard coded" value consumerBuilder #3

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
import org.apache.qpid.jms.transports.TransportFactory;
import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
Expand Down Expand Up @@ -117,6 +120,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
private int drainTimeout = 60000;
private long sessionOutoingWindow = -1; //Use proton default
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private TerminusExpiryPolicy defaultExpiryPolicy;
private UnsignedInteger defaultTimeout;

private final URI remoteURI;
private final AtomicBoolean closed = new AtomicBoolean();
Expand Down Expand Up @@ -1071,6 +1076,36 @@ public void setSessionOutgoingWindow(long sessionOutoingWindow) {
this.sessionOutoingWindow = sessionOutoingWindow;
}

public TerminusExpiryPolicy getDefaultExpiryPolicy() {
return defaultExpiryPolicy;
}

/**
* Set the default expiry policy. This will override consumer policy.
*
* @param defaultExpiryPolicy
*/
public void setDefaultExpiryPolicy(String defaultExpiryPolicy) {
this.defaultExpiryPolicy = TerminusExpiryPolicy.valueOf(Symbol.valueOf(defaultExpiryPolicy));
}

public UnsignedInteger getDefaultTimeout() {
return defaultTimeout;
}

/**
* Set the default timeout for consumer policy
*
* @param timeout
*/
public void setDefaultTimeout(int timeout) {
if (timeout < 0) {
defaultTimeout = new UnsignedInteger(Integer.MAX_VALUE);
} else {
defaultTimeout = new UnsignedInteger(timeout);
}
}

public long getCloseTimeout() {
return this.closeTimeout;
}
Expand Down Expand Up @@ -1178,7 +1213,7 @@ private void checkClosed() throws ProviderClosedException {
}
}

private final class IdleTimeoutCheck implements Runnable {
private final class IdleTimeoutCheck implements Runnable {
@Override
public void run() {
boolean checkScheduled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ private void configureSource(Source source) {
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
}

if (this.parent.getConnection().getProvider().getDefaultExpiryPolicy() != null) {
source.setExpiryPolicy(this.parent.getConnection().getProvider().getDefaultExpiryPolicy());
}
if (this.parent.getConnection().getProvider().getDefaultTimeout() != null) {
source.setTimeout(this.parent.getConnection().getProvider().getDefaultTimeout());
}

if (resourceInfo.isBrowser()) {
source.setDistributionMode(COPY);
}
Expand Down