Skip to content

Commit

Permalink
Fix ThriftSender max span size check (jaegertracing#670)
Browse files Browse the repository at this point in the history
* Fix ThriftSender max span size check

Signed-off-by: Yeonghoon Park <me@yhpark.io>

* Refactor getMaxSpanSize() and getMaxBatchBytes() on ThriftSender/ThriftSenderBase

Signed-off-by: Yeonghoon Park <me@yhpark.io>

* Keep track of span buffer size in ThriftSender

Signed-off-by: Yeonghoon Park <me@yhpark.io>

* fix reset buffer size on ThriftSender, add test

Signed-off-by: Yeonghoon Park <me@yhpark.io>
  • Loading branch information
yhpark authored and yurishkuro committed Jan 17, 2020
1 parent 4bde959 commit 74adce4
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class ThriftSender extends ThriftSenderBase implements Sender {

private Process process;
private int processBytesSize;
private int byteBufferSize;
private int spanBytesSize;

@ToString.Exclude private List<io.jaegertracing.thriftjava.Span> spanBuffer;

Expand All @@ -49,7 +49,6 @@ public int append(JaegerSpan span) throws SenderException {
process = new Process(span.getTracer().getServiceName());
process.setTags(JaegerThriftSpanConverter.buildTags(span.getTracer().tags()));
processBytesSize = calculateProcessSize(process);
byteBufferSize += processBytesSize;
}

io.jaegertracing.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span);
Expand All @@ -59,10 +58,10 @@ public int append(JaegerSpan span) throws SenderException {
spanSize, getMaxSpanBytes()), null, 1);
}

byteBufferSize += spanSize;
if (byteBufferSize <= getMaxSpanBytes()) {
spanBytesSize += spanSize;
if (spanBytesSize <= getMaxSpanBytes()) {
spanBuffer.add(thriftSpan);
if (byteBufferSize < getMaxSpanBytes()) {
if (spanBytesSize < getMaxSpanBytes()) {
return 0;
}
return flush();
Expand All @@ -77,7 +76,7 @@ public int append(JaegerSpan span) throws SenderException {
}

spanBuffer.add(thriftSpan);
byteBufferSize = processBytesSize + spanSize;
spanBytesSize = spanSize;
return n;
}

Expand All @@ -97,6 +96,10 @@ protected int calculateSpanSize(io.jaegertracing.thriftjava.Span span) throws Se
}
}

protected int getMaxSpanBytes() {
return getMaxBatchBytes() - processBytesSize;
}

public abstract void send(Process process, List<io.jaegertracing.thriftjava.Span> spans) throws SenderException;

@Override
Expand All @@ -112,7 +115,7 @@ public int flush() throws SenderException {
throw new SenderException("Failed to flush spans.", e, n);
} finally {
spanBuffer.clear();
byteBufferSize = processBytesSize;
spanBytesSize = 0;
}
return n;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum ProtocolType {

protected final TProtocolFactory protocolFactory;
private final TSerializer serializer;
private final int maxSpanBytes;
private final int maxBatchBytes;

@ToString.Exclude private AutoExpandingBufferWriteTransport memoryTransport;

Expand All @@ -63,13 +63,13 @@ public ThriftSenderBase(ProtocolType protocolType, int maxPacketSize) {
maxPacketSize = ThriftUdpTransport.MAX_PACKET_SIZE;
}

maxSpanBytes = maxPacketSize - EMIT_BATCH_OVERHEAD;
maxBatchBytes = maxPacketSize - EMIT_BATCH_OVERHEAD;
memoryTransport = new AutoExpandingBufferWriteTransport(maxPacketSize, 2);
serializer = new TSerializer(protocolFactory);
}

protected int getMaxSpanBytes() {
return maxSpanBytes;
protected int getMaxBatchBytes() {
return maxBatchBytes;
}

protected byte[] serialize(TBase<?,?> thriftBase) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ public void send(Process process, List<Span> spans) throws SenderException {
sender.calculateSpanSize(null);
}

@Test(expected = SenderException.class)
public void appendFail() throws Exception {
int size = 0;
ThriftSender sender = new ThriftSender(ProtocolType.Compact, 0) {
@Override
public void send(Process process, List<Span> spans) throws SenderException {
throw new SenderException("", null, spans.size());
}
};

JaegerTracer tracer = new JaegerTracer.Builder("failure").build();
sender.append(tracer.buildSpan("flush-fail").start());
sender.flush();
}

@Test(expected = SenderException.class)
public void flushFail() throws Exception {
ThriftSender sender = new ThriftSender(ProtocolType.Compact, 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,30 @@ public void tearDown() throws Exception {
reporter.close();
}

@Test(expected = SenderException.class)
public void testAppendSpanTooLarge() throws Exception {
/**
* create a mock span with target size
*/
private JaegerSpan buildSpanWithSize(int targetSpanSize) throws Exception {
JaegerSpan jaegerSpan = tracer.buildSpan("raza").start();
String msg = "";
for (int i = 0; i < 10001; i++) {
msg += ".";
jaegerSpan.log(msg);
}
jaegerSpan.setTag("mock", "");

try {
sender.append(jaegerSpan);
} catch (SenderException e) {
assertEquals(e.getDroppedSpanCount(), 1);
throw e;
// contains mock span tag with empty string value, which is later used to match target span size
io.jaegertracing.thriftjava.Span emptySpan = JaegerThriftSpanConverter.convertSpan(jaegerSpan);

int emptySpanSize = sender.getSize(emptySpan);
int freePacketSizeLeft = targetSpanSize - emptySpanSize;

// each "0" takes 1 byte in UTF-8
String mockStr = String.join("", Collections.nCopies(freePacketSizeLeft, "0"));
jaegerSpan.setTag("mock", mockStr);

// subtract some chars since the actual encoding takes a few extra bytes
while (targetSpanSize != sender.getSize(JaegerThriftSpanConverter.convertSpan(jaegerSpan))) {
mockStr = mockStr.substring(1);
jaegerSpan.setTag("mock", mockStr);
}

return jaegerSpan;
}

@Test
Expand Down Expand Up @@ -131,6 +140,41 @@ public void testAppend() throws Exception {
assertEquals(expectedNumSpans, result);
}

@Test
public void testAppendMaxSize() throws Exception {
Process process = new Process(tracer.getServiceName())
.setTags(JaegerThriftSpanConverter.buildTags(tracer.tags()));

int processSize = sender.getSize(process);

JaegerSpan jaegerSpan = buildSpanWithSize(maxPacketSize - UdpSender.EMIT_BATCH_OVERHEAD - processSize);

int result = sender.append(jaegerSpan);

assertEquals(1, result);

// test if the buffer is reinitialized correctly
result = sender.append(jaegerSpan);

assertEquals(1, result);
}

@Test(expected = SenderException.class)
public void testAppendSpanTooLarge() throws Exception {
Process process = new Process(tracer.getServiceName())
.setTags(JaegerThriftSpanConverter.buildTags(tracer.tags()));
int processSize = sender.getSize(process);

JaegerSpan jaegerSpan = buildSpanWithSize(maxPacketSize - UdpSender.EMIT_BATCH_OVERHEAD - processSize + 1);

try {
sender.append(jaegerSpan);
} catch (SenderException e) {
assertEquals(e.getDroppedSpanCount(), 1);
throw e;
}
}

@Test
public void testFlushSendsSpan() throws Exception {
int timeout = 50; // in milliseconds
Expand Down

0 comments on commit 74adce4

Please sign in to comment.