Skip to content
This repository has been archived by the owner on Jul 1, 2022. It is now read-only.

Fix ThriftSender max span size check #670

Merged
merged 4 commits into from
Jan 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class ThriftSender extends ThriftSenderBase implements Sender {

private Process process;
private int processBytesSize;
private int byteBufferSize;
private int spanBytesSize;

@ToString.Exclude private List<io.jaegertracing.thriftjava.Span> spanBuffer;

Expand All @@ -49,7 +49,6 @@ public int append(JaegerSpan span) throws SenderException {
process = new Process(span.getTracer().getServiceName());
process.setTags(JaegerThriftSpanConverter.buildTags(span.getTracer().tags()));
processBytesSize = calculateProcessSize(process);
byteBufferSize += processBytesSize;
}

io.jaegertracing.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span);
Expand All @@ -59,10 +58,10 @@ public int append(JaegerSpan span) throws SenderException {
spanSize, getMaxSpanBytes()), null, 1);
}

byteBufferSize += spanSize;
if (byteBufferSize <= getMaxSpanBytes()) {
spanBytesSize += spanSize;
if (spanBytesSize <= getMaxSpanBytes()) {
spanBuffer.add(thriftSpan);
if (byteBufferSize < getMaxSpanBytes()) {
if (spanBytesSize < getMaxSpanBytes()) {
return 0;
}
return flush();
Expand All @@ -77,7 +76,7 @@ public int append(JaegerSpan span) throws SenderException {
}

spanBuffer.add(thriftSpan);
byteBufferSize = processBytesSize + spanSize;
spanBytesSize = spanSize;
return n;
}

Expand All @@ -97,6 +96,10 @@ protected int calculateSpanSize(io.jaegertracing.thriftjava.Span span) throws Se
}
}

protected int getMaxSpanBytes() {
return getMaxBatchBytes() - processBytesSize;
}

public abstract void send(Process process, List<io.jaegertracing.thriftjava.Span> spans) throws SenderException;

@Override
Expand All @@ -112,7 +115,7 @@ public int flush() throws SenderException {
throw new SenderException("Failed to flush spans.", e, n);
} finally {
spanBuffer.clear();
byteBufferSize = processBytesSize;
spanBytesSize = 0;
}
return n;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum ProtocolType {

protected final TProtocolFactory protocolFactory;
private final TSerializer serializer;
private final int maxSpanBytes;
private final int maxBatchBytes;

@ToString.Exclude private AutoExpandingBufferWriteTransport memoryTransport;

Expand All @@ -63,13 +63,13 @@ public ThriftSenderBase(ProtocolType protocolType, int maxPacketSize) {
maxPacketSize = ThriftUdpTransport.MAX_PACKET_SIZE;
}

maxSpanBytes = maxPacketSize - EMIT_BATCH_OVERHEAD;
maxBatchBytes = maxPacketSize - EMIT_BATCH_OVERHEAD;
memoryTransport = new AutoExpandingBufferWriteTransport(maxPacketSize, 2);
serializer = new TSerializer(protocolFactory);
}

protected int getMaxSpanBytes() {
return maxSpanBytes;
protected int getMaxBatchBytes() {
return maxBatchBytes;
}

protected byte[] serialize(TBase<?,?> thriftBase) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ public void send(Process process, List<Span> spans) throws SenderException {
sender.calculateSpanSize(null);
}

@Test(expected = SenderException.class)
public void appendFail() throws Exception {
int size = 0;
ThriftSender sender = new ThriftSender(ProtocolType.Compact, 0) {
@Override
public void send(Process process, List<Span> spans) throws SenderException {
throw new SenderException("", null, spans.size());
}
};

JaegerTracer tracer = new JaegerTracer.Builder("failure").build();
sender.append(tracer.buildSpan("flush-fail").start());
sender.flush();
}

@Test(expected = SenderException.class)
public void flushFail() throws Exception {
ThriftSender sender = new ThriftSender(ProtocolType.Compact, 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,30 @@ public void tearDown() throws Exception {
reporter.close();
}

@Test(expected = SenderException.class)
public void testAppendSpanTooLarge() throws Exception {
/**
* create a mock span with target size
*/
private JaegerSpan buildSpanWithSize(int targetSpanSize) throws Exception {
JaegerSpan jaegerSpan = tracer.buildSpan("raza").start();
String msg = "";
for (int i = 0; i < 10001; i++) {
msg += ".";
jaegerSpan.log(msg);
}
jaegerSpan.setTag("mock", "");

try {
sender.append(jaegerSpan);
} catch (SenderException e) {
assertEquals(e.getDroppedSpanCount(), 1);
throw e;
// contains mock span tag with empty string value, which is later used to match target span size
io.jaegertracing.thriftjava.Span emptySpan = JaegerThriftSpanConverter.convertSpan(jaegerSpan);
yhpark marked this conversation as resolved.
Show resolved Hide resolved

int emptySpanSize = sender.getSize(emptySpan);
int freePacketSizeLeft = targetSpanSize - emptySpanSize;

// each "0" takes 1 byte in UTF-8
String mockStr = String.join("", Collections.nCopies(freePacketSizeLeft, "0"));
jaegerSpan.setTag("mock", mockStr);

// subtract some chars since the actual encoding takes a few extra bytes
while (targetSpanSize != sender.getSize(JaegerThriftSpanConverter.convertSpan(jaegerSpan))) {
mockStr = mockStr.substring(1);
jaegerSpan.setTag("mock", mockStr);
}

return jaegerSpan;
}

@Test
Expand Down Expand Up @@ -131,6 +140,41 @@ public void testAppend() throws Exception {
assertEquals(expectedNumSpans, result);
}

@Test
public void testAppendMaxSize() throws Exception {
Process process = new Process(tracer.getServiceName())
.setTags(JaegerThriftSpanConverter.buildTags(tracer.tags()));

int processSize = sender.getSize(process);

JaegerSpan jaegerSpan = buildSpanWithSize(maxPacketSize - UdpSender.EMIT_BATCH_OVERHEAD - processSize);

int result = sender.append(jaegerSpan);

assertEquals(1, result);

// test if the buffer is reinitialized correctly
result = sender.append(jaegerSpan);

assertEquals(1, result);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!


@Test(expected = SenderException.class)
public void testAppendSpanTooLarge() throws Exception {
Process process = new Process(tracer.getServiceName())
.setTags(JaegerThriftSpanConverter.buildTags(tracer.tags()));
int processSize = sender.getSize(process);

JaegerSpan jaegerSpan = buildSpanWithSize(maxPacketSize - UdpSender.EMIT_BATCH_OVERHEAD - processSize + 1);

try {
sender.append(jaegerSpan);
} catch (SenderException e) {
assertEquals(e.getDroppedSpanCount(), 1);
throw e;
}
}

@Test
public void testFlushSendsSpan() throws Exception {
int timeout = 50; // in milliseconds
Expand Down