-
Notifications
You must be signed in to change notification settings - Fork 102
/
Copy pathStatsDSender.java
111 lines (96 loc) · 3.94 KB
/
StatsDSender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.timgroup.statsd;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class StatsDSender implements Runnable {
private static final Charset MESSAGE_CHARSET = Charset.forName("UTF-8");
private static final String MESSAGE_TOO_LONG = "Message longer than size of sendBuffer";
private final ByteBuffer sendBuffer;
private final Callable<SocketAddress> addressLookup;
private final BlockingQueue<String> queue;
private final StatsDClientErrorHandler handler;
private final DatagramChannel clientChannel;
private volatile boolean shutdown;
StatsDSender(final Callable<SocketAddress> addressLookup, final int queueSize,
final StatsDClientErrorHandler handler, final DatagramChannel clientChannel, final int maxPacketSizeBytes) {
this(addressLookup, new LinkedBlockingQueue<String>(queueSize), handler, clientChannel, maxPacketSizeBytes);
}
StatsDSender(final Callable<SocketAddress> addressLookup, final BlockingQueue<String> queue,
final StatsDClientErrorHandler handler, final DatagramChannel clientChannel, final int maxPacketSizeBytes) {
sendBuffer = ByteBuffer.allocate(maxPacketSizeBytes);
this.addressLookup = addressLookup;
this.queue = queue;
this.handler = handler;
this.clientChannel = clientChannel;
}
boolean send(final String message) {
if (!shutdown) {
queue.offer(message);
return true;
}
return false;
}
@Override
public void run() {
while (!(queue.isEmpty() && shutdown)) {
try {
if (Thread.interrupted()) {
return;
}
final String message = queue.poll(1, TimeUnit.SECONDS);
if (null != message) {
final byte[] data = message.getBytes(MESSAGE_CHARSET);
if (sendBuffer.capacity() < data.length) {
throw new InvalidMessageException(MESSAGE_TOO_LONG, message);
}
final SocketAddress address = addressLookup.call();
if (sendBuffer.remaining() < (data.length + 1)) {
blockingSend(address);
}
if (sendBuffer.position() > 0) {
sendBuffer.put((byte) '\n');
}
sendBuffer.put(data);
if (null == queue.peek()) {
blockingSend(address);
}
}
} catch (final InterruptedException e) {
if (shutdown) {
return;
}
} catch (final Exception e) {
handler.handle(e);
}
}
}
private void blockingSend(final SocketAddress address) throws IOException {
final int sizeOfBuffer = sendBuffer.position();
sendBuffer.flip();
final int sentBytes = clientChannel.send(sendBuffer, address);
sendBuffer.limit(sendBuffer.capacity());
sendBuffer.rewind();
if (sizeOfBuffer != sentBytes) {
handler.handle(
new IOException(
String.format(
"Could not send entirely stat %s to %s. Only sent %d bytes out of %d bytes",
sendBuffer.toString(),
address.toString(),
sentBytes,
sizeOfBuffer)));
}
}
boolean isShutdown() {
return shutdown;
}
void shutdown() {
shutdown = true;
}
}