Skip to content

Commit

Permalink
Low allocation OTLP trace marshaler
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Apr 26, 2024
1 parent da4cd93 commit cba329d
Show file tree
Hide file tree
Showing 28 changed files with 2,423 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,15 @@ public static void longToBase16String(long value, char[] dest, int destOffset) {
/** Returns the {@code byte[]} decoded from the given hex {@link CharSequence}. */
public static byte[] bytesFromBase16(CharSequence value, int length) {
byte[] result = new byte[length / 2];
bytesFromBase16(value, length, result);
return result;
}

/** Fills {@code bytes} with bytes decoded from the given hex {@link CharSequence}. */
public static void bytesFromBase16(CharSequence value, int length, byte[] bytes) {
for (int i = 0; i < length; i += 2) {
result[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
bytes[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
}
return result;
}

/** Fills {@code dest} with the hex encoding of {@code bytes}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ public void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOExcepti
generator.writeString(new String(utf8Bytes, StandardCharsets.UTF_8));
}

@Override
public void writeString(
ProtoFieldInfo field, String string, int utf8Length, MarshalerContext context)
throws IOException {
generator.writeFieldName(field.getJsonName());
generator.writeString(string);
}

@Override
public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
generator.writeBinaryField(field.getJsonName(), value);
Expand Down Expand Up @@ -165,6 +173,44 @@ public void serializeRepeatedMessage(
generator.writeEndArray();
}

@Override
public <T> void serializeRepeatedMessage(
ProtoFieldInfo field,
List<? extends T> messages,
StatelessMarshaler<T> marshaler,
MarshalerContext context)
throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
for (int i = 0; i < messages.size(); i++) {
T message = messages.get(i);
generator.writeStartObject();
marshaler.writeTo(this, message, context);
generator.writeEndObject();
}
generator.writeEndArray();
}

@Override
protected void writeStartRepeated(ProtoFieldInfo field) throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
}

@Override
protected void writeEndRepeated() throws IOException {
generator.writeEndArray();
}

@Override
protected void writeStartRepeatedElement(ProtoFieldInfo field, int protoMessageSize)
throws IOException {
generator.writeStartObject();
}

@Override
protected void writeEndRepeatedElement() throws IOException {
generator.writeEndObject();
}

// Not a field.
void writeMessageValue(Marshaler message) throws IOException {
generator.writeStartObject();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.marshal;

import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceId;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Class for keeping marshaling state. The state consists of integers, that we call sizes, and
* objects, that we call data. Both integers and objects can be read from the state in the order
* they were added (first in, first out). Additionally, this class provides various pools and caches
* for objects that can be reused between marshalling attempts.
*/
public final class MarshalerContext {
private final boolean marshalStringNoAllocation;

private int[] sizes = new int[16];
private int sizeReadIndex;
private int sizeWriteIndex;
private Object[] data = new Object[16];
private int dataReadIndex;
private int dataWriteIndex;

@SuppressWarnings("BooleanParameter")
public MarshalerContext() {
this(true);
}

public MarshalerContext(boolean marshalStringNoAllocation) {
this.marshalStringNoAllocation = marshalStringNoAllocation;
}

public boolean marshalStringNoAllocation() {
return marshalStringNoAllocation;
}

public void addSize(int size) {
growSizeIfNeeded();
sizes[sizeWriteIndex++] = size;
}

public int addSize() {
growSizeIfNeeded();
return sizeWriteIndex++;
}

private void growSizeIfNeeded() {
if (sizeWriteIndex == sizes.length) {
int[] newSizes = new int[sizes.length * 2];
System.arraycopy(sizes, 0, newSizes, 0, sizes.length);
sizes = newSizes;
}
}

public void setSize(int index, int size) {
sizes[index] = size;
}

public int getSize() {
return sizes[sizeReadIndex++];
}

public void addData(@Nullable Object o) {
growDataIfNeeded();
data[dataWriteIndex++] = o;
}

private void growDataIfNeeded() {
if (dataWriteIndex == data.length) {
Object[] newData = new Object[data.length * 2];
System.arraycopy(data, 0, newData, 0, data.length);
data = newData;
}
}

public <T> T getData(Class<T> type) {
return type.cast(data[dataReadIndex++]);
}

private final IdPool traceIdPool = new IdPool(TraceId.getLength() / 2);

/** Returns a buffer that can be used to hold a trace id. */
public byte[] getTraceIdBuffer() {
return traceIdPool.get();
}

private final IdPool spanIdPool = new IdPool(SpanId.getLength() / 2);

/** Returns a buffer that can be used to hold a span id. */
public byte[] getSpanIdBuffer() {
return spanIdPool.get();
}

private static class IdPool {
private final List<byte[]> pool = new ArrayList<>();
int index;
final int idSize;

IdPool(int idSize) {
this.idSize = idSize;
}

byte[] get() {
if (index < pool.size()) {
return pool.get(index++);

Check warning on line 116 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L116

Added line #L116 was not covered by tests
}
byte[] result = new byte[idSize];
pool.add(result);
index++;

return result;
}

void reset() {
index = 0;
}

Check warning on line 127 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L126-L127

Added lines #L126 - L127 were not covered by tests
}

private final Pool<Map<?, ?>> mapPool = new Pool<>(IdentityHashMap::new, Map::clear);

/** Returns a pooled identity map. */
@SuppressWarnings("unchecked")
public <K, V> Map<K, V> getIdentityMap() {
return (Map<K, V>) mapPool.get();
}

private final Pool<List<?>> listPool = new Pool<>(ArrayList::new, List::clear);

/** Returns a pooled list. */
@SuppressWarnings("unchecked")
public <T> List<T> getList() {
return (List<T>) listPool.get();
}

private static class Pool<T> {
private final List<T> pool = new ArrayList<>();
private int index;
private final Supplier<T> factory;
private final Consumer<T> clean;

Pool(Supplier<T> factory, Consumer<T> clean) {
this.factory = factory;
this.clean = clean;
}

T get() {
if (index < pool.size()) {
return pool.get(index++);

Check warning on line 159 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L159

Added line #L159 was not covered by tests
}
T result = factory.get();
pool.add(result);
index++;

return result;
}

void reset() {
for (int i = 0; i < index; i++) {
clean.accept(pool.get(i));

Check warning on line 170 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L170

Added line #L170 was not covered by tests
}
index = 0;
}

Check warning on line 173 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L172-L173

Added lines #L172 - L173 were not covered by tests
}

/** Reset context so that serialization could be re-run. */
public void resetReadIndex() {
sizeReadIndex = 0;
dataReadIndex = 0;
}

/** Reset context so that it could be reused. */
public void reset() {
sizeReadIndex = 0;
sizeWriteIndex = 0;

Check warning on line 185 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L184-L185

Added lines #L184 - L185 were not covered by tests
for (int i = 0; i < dataWriteIndex; i++) {
data[i] = null;

Check warning on line 187 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L187

Added line #L187 was not covered by tests
}
dataReadIndex = 0;
dataWriteIndex = 0;

Check warning on line 190 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L189-L190

Added lines #L189 - L190 were not covered by tests

traceIdPool.reset();
spanIdPool.reset();

Check warning on line 193 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L192-L193

Added lines #L192 - L193 were not covered by tests

mapPool.reset();
listPool.reset();
}

Check warning on line 197 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L195-L197

Added lines #L195 - L197 were not covered by tests

private static final AtomicInteger KEY_INDEX = new AtomicInteger();

public static class Key {
final int index = KEY_INDEX.getAndIncrement();
}

public static Key key() {
return new Key();
}

private Object[] instances = new Object[16];

@SuppressWarnings("unchecked")
public <T> T getInstance(Key key, Supplier<T> supplier) {
if (key.index >= instances.length) {
Object[] newData = new Object[instances.length * 2];
System.arraycopy(instances, 0, newData, 0, instances.length);
instances = newData;

Check warning on line 216 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L214-L216

Added lines #L214 - L216 were not covered by tests
}

T result = (T) instances[key.index];
if (result == null) {
result = supplier.get();
instances[key.index] = result;
}
return result;
}
}
Loading

0 comments on commit cba329d

Please sign in to comment.