Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

squid:S1213 - The members of an interface declaration or class should… #70

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class CallbackConnection {

private boolean onRefillCalled =false;
public static final Task NOOP = Dispatch.NOOP;
private short nextMessageId = 1;

private static class Request {
private final MQTTFrame frame;
private final short id;
Expand Down Expand Up @@ -411,7 +414,6 @@ public void onFailure(Throwable value) {
}
}

private boolean onRefillCalled =false;
public void onSessionEstablished(Transport transport) {
this.transport = transport;
if( suspendCount.get() > 0 ) {
Expand Down Expand Up @@ -741,7 +743,6 @@ private void send(Request request) {
}
}

private short nextMessageId = 1;
private short getNextMessageId() {
short rc = nextMessageId;
nextMessageId++;
Expand Down Expand Up @@ -858,8 +859,6 @@ private void processFrame(MQTTFrame frame) {
}
}

static public final Task NOOP = Dispatch.NOOP;

private void toReceiver(final PUBLISH publish) {
if( listener !=null ) {
try {
Expand Down
73 changes: 36 additions & 37 deletions mqtt-client/src/main/java/org/fusesource/mqtt/client/MQTT.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,44 +45,7 @@ public class MQTT {
private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("mqtt.thread.keep_alive", Integer.toString(1000)));
private static final long STACK_SIZE = Long.parseLong(System.getProperty("mqtt.thread.stack_size", Integer.toString(1024*512)));
private static ThreadPoolExecutor blockingThreadPool;


public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
if( blockingThreadPool == null ) {
blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread rc = new Thread(null, r, "MQTT Task", STACK_SIZE);
rc.setDaemon(true);
return rc;
}
}) {

@Override
public void shutdown() {
// we don't ever shutdown since we are shared..
}

@Override
public List<Runnable> shutdownNow() {
// we don't ever shutdown since we are shared..
return Collections.emptyList();
}
};
}
return blockingThreadPool;
}
public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
blockingThreadPool = pool;
}

private static final URI DEFAULT_HOST = createDefaultHost();
private static URI createDefaultHost() {
try {
return new URI("tcp://127.0.0.1:1883");
} catch (URISyntaxException e) {
return null;
}
}

protected URI host = DEFAULT_HOST;
protected URI localAddress;
Expand Down Expand Up @@ -127,6 +90,42 @@ public MQTT(MQTT other) {
this.tracer = other.tracer;
}

public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
if( blockingThreadPool == null ) {
blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread rc = new Thread(null, r, "MQTT Task", STACK_SIZE);
rc.setDaemon(true);
return rc;
}
}) {

@Override
public void shutdown() {
// we don't ever shutdown since we are shared..
}

@Override
public List<Runnable> shutdownNow() {
// we don't ever shutdown since we are shared..
return Collections.emptyList();
}
};
}
return blockingThreadPool;
}
public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
blockingThreadPool = pool;
}

private static URI createDefaultHost() {
try {
return new URI("tcp://127.0.0.1:1883");
} catch (URISyntaxException e) {
return null;
}
}

public CallbackConnection callbackConnection() {
if( !isCleanSession() && ( getClientId()==null || getClientId().length==0 )) {
throw new IllegalArgumentException("The client id MUST be configured when clean session is set to false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ public class MQTTProtocolCodec extends AbstractProtocolCodec {

private int maxMessageLength = 1024*1024*100;

private final Action readHeader = new Action() {
public MQTTFrame apply() throws IOException {
int length = readLength();
if( length >= 0 ) {
if( length > maxMessageLength) {
throw new IOException("The maximum message length was exceeded");
}
byte header = readBuffer.get(readStart);
readStart = readEnd;
if( length > 0 ) {
nextDecodeAction = readBody(header, length);
} else {
return new MQTTFrame().header(header);
}
}
return null;
}
};

public MQTTProtocolCodec() {
this.bufferPools = BUFFER_POOLS;
}
Expand Down Expand Up @@ -76,25 +95,6 @@ protected Action initialDecodeAction() {
return readHeader;
}

private final Action readHeader = new Action() {
public MQTTFrame apply() throws IOException {
int length = readLength();
if( length >= 0 ) {
if( length > maxMessageLength) {
throw new IOException("The maximum message length was exceeded");
}
byte header = readBuffer.get(readStart);
readStart = readEnd;
if( length > 0 ) {
nextDecodeAction = readBody(header, length);
} else {
return new MQTTFrame().header(header);
}
}
return null;
}
};

private int readLength() throws IOException {
readEnd = readStart+2; // Header is at least 2 bytes..
int limit = readBuffer.position();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public class PUBREL extends MessageSupport.HeaderBase implements Message, Acked

private short messageId;

public byte messageType() {
return TYPE;
}

public PUBREL() {
qos(QoS.AT_LEAST_ONCE);
}

public byte messageType() {
return TYPE;
}

public PUBREL decode(MQTTFrame frame) throws ProtocolException {
assert(frame.buffers.length == 1);
header(frame.header());
Expand Down