diff --git a/jaeger-thrift/src/main/java/io/jaegertracing/thrift/internal/senders/ThriftSender.java b/jaeger-thrift/src/main/java/io/jaegertracing/thrift/internal/senders/ThriftSender.java index 7811bd114..bcb9b5dfd 100644 --- a/jaeger-thrift/src/main/java/io/jaegertracing/thrift/internal/senders/ThriftSender.java +++ b/jaeger-thrift/src/main/java/io/jaegertracing/thrift/internal/senders/ThriftSender.java @@ -29,7 +29,7 @@ public abstract class ThriftSender extends ThriftSenderBase implements Sender { private Process process; private int processBytesSize; - private int byteBufferSize; + private int spanBytesSize; @ToString.Exclude private List spanBuffer; @@ -49,7 +49,6 @@ public int append(JaegerSpan span) throws SenderException { process = new Process(span.getTracer().getServiceName()); process.setTags(JaegerThriftSpanConverter.buildTags(span.getTracer().tags())); processBytesSize = calculateProcessSize(process); - byteBufferSize += processBytesSize; } io.jaegertracing.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span); @@ -59,10 +58,10 @@ public int append(JaegerSpan span) throws SenderException { spanSize, getMaxSpanBytes()), null, 1); } - byteBufferSize += spanSize; - if (byteBufferSize <= getMaxSpanBytes()) { + spanBytesSize += spanSize; + if (spanBytesSize <= getMaxSpanBytes()) { spanBuffer.add(thriftSpan); - if (byteBufferSize < getMaxSpanBytes()) { + if (spanBytesSize < getMaxSpanBytes()) { return 0; } return flush(); @@ -77,7 +76,7 @@ public int append(JaegerSpan span) throws SenderException { } spanBuffer.add(thriftSpan); - byteBufferSize = processBytesSize + spanSize; + spanBytesSize = spanSize; return n; } @@ -97,6 +96,10 @@ protected int calculateSpanSize(io.jaegertracing.thriftjava.Span span) throws Se } } + protected int getMaxSpanBytes() { + return getMaxBatchBytes() - processBytesSize; + } + public abstract void send(Process process, List spans) throws SenderException; @Override @@ -112,7 +115,7 @@ public int flush() throws SenderException { throw new SenderException("Failed to flush spans.", e, n); } finally { spanBuffer.clear(); - byteBufferSize = processBytesSize; + spanBytesSize = 0; } return n; } diff --git a/jaeger-thrift/src/main/java/io/jaegertracing/thrift/internal/senders/ThriftSenderBase.java b/jaeger-thrift/src/main/java/io/jaegertracing/thrift/internal/senders/ThriftSenderBase.java index e35ac24a5..0f4eb77a4 100644 --- a/jaeger-thrift/src/main/java/io/jaegertracing/thrift/internal/senders/ThriftSenderBase.java +++ b/jaeger-thrift/src/main/java/io/jaegertracing/thrift/internal/senders/ThriftSenderBase.java @@ -37,7 +37,7 @@ public enum ProtocolType { protected final TProtocolFactory protocolFactory; private final TSerializer serializer; - private final int maxSpanBytes; + private final int maxBatchBytes; @ToString.Exclude private AutoExpandingBufferWriteTransport memoryTransport; @@ -63,13 +63,13 @@ public ThriftSenderBase(ProtocolType protocolType, int maxPacketSize) { maxPacketSize = ThriftUdpTransport.MAX_PACKET_SIZE; } - maxSpanBytes = maxPacketSize - EMIT_BATCH_OVERHEAD; + maxBatchBytes = maxPacketSize - EMIT_BATCH_OVERHEAD; memoryTransport = new AutoExpandingBufferWriteTransport(maxPacketSize, 2); serializer = new TSerializer(protocolFactory); } - protected int getMaxSpanBytes() { - return maxSpanBytes; + protected int getMaxBatchBytes() { + return maxBatchBytes; } protected byte[] serialize(TBase thriftBase) throws Exception { diff --git a/jaeger-thrift/src/test/java/io/jaegertracing/thrift/internal/senders/ThriftSenderTest.java b/jaeger-thrift/src/test/java/io/jaegertracing/thrift/internal/senders/ThriftSenderTest.java index 4860b61f7..2284e3ead 100644 --- a/jaeger-thrift/src/test/java/io/jaegertracing/thrift/internal/senders/ThriftSenderTest.java +++ b/jaeger-thrift/src/test/java/io/jaegertracing/thrift/internal/senders/ThriftSenderTest.java @@ -49,6 +49,21 @@ public void send(Process process, List spans) throws SenderException { sender.calculateSpanSize(null); } + @Test(expected = SenderException.class) + public void appendFail() throws Exception { + int size = 0; + ThriftSender sender = new ThriftSender(ProtocolType.Compact, 0) { + @Override + public void send(Process process, List spans) throws SenderException { + throw new SenderException("", null, spans.size()); + } + }; + + JaegerTracer tracer = new JaegerTracer.Builder("failure").build(); + sender.append(tracer.buildSpan("flush-fail").start()); + sender.flush(); + } + @Test(expected = SenderException.class) public void flushFail() throws Exception { ThriftSender sender = new ThriftSender(ProtocolType.Compact, 0) { diff --git a/jaeger-thrift/src/test/java/io/jaegertracing/thrift/internal/senders/UdpSenderTest.java b/jaeger-thrift/src/test/java/io/jaegertracing/thrift/internal/senders/UdpSenderTest.java index 7bd8a2aca..46cc11709 100644 --- a/jaeger-thrift/src/test/java/io/jaegertracing/thrift/internal/senders/UdpSenderTest.java +++ b/jaeger-thrift/src/test/java/io/jaegertracing/thrift/internal/senders/UdpSenderTest.java @@ -84,21 +84,30 @@ public void tearDown() throws Exception { reporter.close(); } - @Test(expected = SenderException.class) - public void testAppendSpanTooLarge() throws Exception { + /** + * create a mock span with target size + */ + private JaegerSpan buildSpanWithSize(int targetSpanSize) throws Exception { JaegerSpan jaegerSpan = tracer.buildSpan("raza").start(); - String msg = ""; - for (int i = 0; i < 10001; i++) { - msg += "."; - jaegerSpan.log(msg); - } + jaegerSpan.setTag("mock", ""); - try { - sender.append(jaegerSpan); - } catch (SenderException e) { - assertEquals(e.getDroppedSpanCount(), 1); - throw e; + // contains mock span tag with empty string value, which is later used to match target span size + io.jaegertracing.thriftjava.Span emptySpan = JaegerThriftSpanConverter.convertSpan(jaegerSpan); + + int emptySpanSize = sender.getSize(emptySpan); + int freePacketSizeLeft = targetSpanSize - emptySpanSize; + + // each "0" takes 1 byte in UTF-8 + String mockStr = String.join("", Collections.nCopies(freePacketSizeLeft, "0")); + jaegerSpan.setTag("mock", mockStr); + + // subtract some chars since the actual encoding takes a few extra bytes + while (targetSpanSize != sender.getSize(JaegerThriftSpanConverter.convertSpan(jaegerSpan))) { + mockStr = mockStr.substring(1); + jaegerSpan.setTag("mock", mockStr); } + + return jaegerSpan; } @Test @@ -131,6 +140,41 @@ public void testAppend() throws Exception { assertEquals(expectedNumSpans, result); } + @Test + public void testAppendMaxSize() throws Exception { + Process process = new Process(tracer.getServiceName()) + .setTags(JaegerThriftSpanConverter.buildTags(tracer.tags())); + + int processSize = sender.getSize(process); + + JaegerSpan jaegerSpan = buildSpanWithSize(maxPacketSize - UdpSender.EMIT_BATCH_OVERHEAD - processSize); + + int result = sender.append(jaegerSpan); + + assertEquals(1, result); + + // test if the buffer is reinitialized correctly + result = sender.append(jaegerSpan); + + assertEquals(1, result); + } + + @Test(expected = SenderException.class) + public void testAppendSpanTooLarge() throws Exception { + Process process = new Process(tracer.getServiceName()) + .setTags(JaegerThriftSpanConverter.buildTags(tracer.tags())); + int processSize = sender.getSize(process); + + JaegerSpan jaegerSpan = buildSpanWithSize(maxPacketSize - UdpSender.EMIT_BATCH_OVERHEAD - processSize + 1); + + try { + sender.append(jaegerSpan); + } catch (SenderException e) { + assertEquals(e.getDroppedSpanCount(), 1); + throw e; + } + } + @Test public void testFlushSendsSpan() throws Exception { int timeout = 50; // in milliseconds