Skip to content

Commit

Permalink
Adds v2 native in-memory storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Aug 30, 2017
1 parent 316878c commit d9e9492
Show file tree
Hide file tree
Showing 9 changed files with 699 additions and 17 deletions.
4 changes: 2 additions & 2 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ static QueryRequest toQueryRequest(HttpUrl url) {
.limit(maybeInteger(url.queryParameter("limit"))).build();
}

static Long maybeLong(String input) {
static Long maybeLong(@Nullable String input) {
return input != null ? Long.valueOf(input) : null;
}

static Integer maybeInteger(String input) {
static Integer maybeInteger(@Nullable String input) {
return input != null ? Integer.valueOf(input) : null;
}

Expand Down
29 changes: 23 additions & 6 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinRule.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -14,6 +14,8 @@
package zipkin.junit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -29,9 +31,13 @@
import org.junit.runners.model.Statement;
import zipkin.Span;
import zipkin.collector.InMemoryCollectorMetrics;
import zipkin.storage.InMemoryStorage;
import zipkin.internal.CallbackCaptor;
import zipkin.internal.GroupByTraceId;
import zipkin.internal.V2InMemoryStorage;
import zipkin.internal.V2SpanConverter;

import static okhttp3.mockwebserver.SocketPolicy.KEEP_OPEN;
import static zipkin.internal.GroupByTraceId.TRACE_DESCENDING;

/**
* Starts up a local Zipkin server, listening for http requests on {@link #httpUrl}.
Expand All @@ -42,8 +48,7 @@
* See http://openzipkin.github.io/zipkin-api/#/
*/
public final class ZipkinRule implements TestRule {

private final InMemoryStorage storage = new InMemoryStorage();
private final V2InMemoryStorage storage = V2InMemoryStorage.newBuilder().build();
private final InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics();
private final MockWebServer server = new MockWebServer();
private final BlockingQueue<MockResponse> failureQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -109,7 +114,9 @@ public InMemoryCollectorMetrics collectorMetrics() {
* you'd add the parent here.
*/
public ZipkinRule storeSpans(List<Span> spans) {
storage.spanConsumer().accept(spans);
CallbackCaptor<Void> callback = new CallbackCaptor<>();
storage.asyncSpanConsumer().accept(spans, callback);
callback.get();
return this;
}

Expand All @@ -131,7 +138,17 @@ public ZipkinRule enqueueFailure(HttpFailure failure) {

/** Retrieves all traces this zipkin server has received. */
public List<List<Span>> getTraces() {
return storage.spanStore().getRawTraces();
List<List<zipkin.internal.v2.Span>> traces = storage.v2SpanStore().getTraces();
List<List<Span>> result = new ArrayList<>(traces.size());
for (List<zipkin.internal.v2.Span> trace2 : traces) {
List<Span> sameTraceId = new ArrayList<>();
for (zipkin.internal.v2.Span span2 : trace2) {
sameTraceId.add(V2SpanConverter.toSpan(span2));
}
result.addAll(GroupByTraceId.apply(sameTraceId, false, false));
}
Collections.sort(result, TRACE_DESCENDING);
return result;
}

/**
Expand Down
7 changes: 4 additions & 3 deletions zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ public void httpRequestCountIncrements() throws IOException {
*/
@Test
public void collectorMetrics_spans() throws IOException {
postSpans(TRACE);
postSpans(TRACE);
postSpans(asList(LOTS_OF_SPANS[0]));
postSpans(asList(LOTS_OF_SPANS[1], LOTS_OF_SPANS[2]));

assertThat(zipkin.collectorMetrics().spans()).isEqualTo(TRACE.size() * 2);
assertThat(zipkin.collectorMetrics().spans())
.isEqualTo(3);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.springframework.context.annotation.Configuration;
import zipkin.collector.CollectorMetrics;
import zipkin.collector.CollectorSampler;
import zipkin.internal.V2InMemoryStorage;
import zipkin.server.brave.TracedStorageComponent;
import zipkin.storage.InMemoryStorage;
import zipkin.storage.StorageComponent;

@Configuration
Expand Down Expand Up @@ -86,7 +86,10 @@ static class InMemoryConfiguration {
@Bean StorageComponent storage(
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.mem.max-spans:500000}") int maxSpans) {
return InMemoryStorage.builder().strictTraceId(strictTraceId).maxSpanCount(maxSpans).build();
return V2InMemoryStorage.newBuilder()
.strictTraceId(strictTraceId)
.maxSpanCount(maxSpans)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package zipkin.server;

import java.io.IOException;
import java.util.List;
import okio.Buffer;
import okio.GzipSink;
import org.junit.Before;
Expand All @@ -32,6 +34,7 @@
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2InMemoryStorage;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.MessageEncoder;
Expand Down Expand Up @@ -61,7 +64,7 @@ public class ZipkinServerIntegrationTest {
@Autowired
ConfigurableWebApplicationContext context;
@Autowired
InMemoryStorage storage;
V2InMemoryStorage storage;
@Autowired
ActuateCollectorMetrics metrics;

Expand Down Expand Up @@ -103,7 +106,8 @@ public void writeSpans_version2() throws Exception {

@Test
public void writeSpans_updatesMetrics() throws Exception {
byte[] body = Codec.JSON.writeSpans(TRACE);
List<Span> spans = asList(LOTS_OF_SPANS[0], LOTS_OF_SPANS[1], LOTS_OF_SPANS[2]);
byte[] body = Codec.JSON.writeSpans(spans);
mockMvc.perform(post("/api/v1/spans").content(body));
mockMvc.perform(post("/api/v1/spans").content(body));

Expand All @@ -114,9 +118,9 @@ public void writeSpans_updatesMetrics() throws Exception {
.andExpect(jsonPath("$.['counter.zipkin_collector.bytes.http']").value(body.length * 2))
.andExpect(jsonPath("$.['gauge.zipkin_collector.message_bytes.http']")
.value(Double.valueOf(body.length))) // most recent size
.andExpect(jsonPath("$.['counter.zipkin_collector.spans.http']").value(TRACE.size() * 2))
.andExpect(jsonPath("$.['counter.zipkin_collector.spans.http']").value(spans.size() * 2))
.andExpect(jsonPath("$.['gauge.zipkin_collector.message_spans.http']")
.value(Double.valueOf(TRACE.size()))); // most recent count
.value(Double.valueOf(spans.size()))); // most recent count
}

@Test
Expand Down
71 changes: 71 additions & 0 deletions zipkin/src/main/java/zipkin/internal/V2InMemoryStorage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.internal;

import java.io.IOException;
import zipkin.internal.v2.storage.InMemoryStorage;

public final class V2InMemoryStorage extends V2StorageComponent {

public static Builder newBuilder() {
return new V2InMemoryStorage.Builder();
}

public static final class Builder implements zipkin.storage.StorageComponent.Builder {
final InMemoryStorage.Builder delegate = InMemoryStorage.newBuilder();

@Override public Builder strictTraceId(boolean strictTraceId) {
delegate.strictTraceId(strictTraceId);
return this;
}

/** Eldest traces are removed to ensure spans in memory don't exceed this value */
public Builder maxSpanCount(int maxSpanCount) {
delegate.maxSpanCount(maxSpanCount);
return this;
}

@Override public V2InMemoryStorage build() {
return new V2InMemoryStorage(delegate.build());
}

Builder() {
}
}

final InMemoryStorage delegate;

V2InMemoryStorage(InMemoryStorage delegate) {
this.delegate = delegate;
}

@Override public InMemoryStorage v2SpanStore() {
return delegate;
}

@Override public InMemoryStorage v2SpanConsumer() {
return delegate;
}

@Override public CheckResult check() {
return CheckResult.OK;
}

public void clear() {
delegate.clear();
}

@Override public void close() throws IOException {
}
}
Loading

0 comments on commit d9e9492

Please sign in to comment.