From d9e9492a62bf6929617804fe55e98efe46449934 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 30 Aug 2017 15:57:28 +0800 Subject: [PATCH] Adds v2 native in-memory storage --- .../java/zipkin/junit/ZipkinDispatcher.java | 4 +- .../main/java/zipkin/junit/ZipkinRule.java | 29 +- .../java/zipkin/junit/ZipkinRuleTest.java | 7 +- .../server/ZipkinServerConfiguration.java | 7 +- .../server/ZipkinServerIntegrationTest.java | 12 +- .../zipkin/internal/V2InMemoryStorage.java | 71 +++ .../internal/v2/storage/InMemoryStorage.java | 414 ++++++++++++++++++ .../zipkin/internal/ITV2InMemoryStorage.java | 74 ++++ .../v2/storage/InMemoryStorageTest.java | 98 +++++ 9 files changed, 699 insertions(+), 17 deletions(-) create mode 100644 zipkin/src/main/java/zipkin/internal/V2InMemoryStorage.java create mode 100644 zipkin/src/main/java/zipkin/internal/v2/storage/InMemoryStorage.java create mode 100644 zipkin/src/test/java/zipkin/internal/ITV2InMemoryStorage.java create mode 100644 zipkin/src/test/java/zipkin/internal/v2/storage/InMemoryStorageTest.java diff --git a/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java b/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java index 56b17ed9050..a69f165251e 100644 --- a/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java +++ b/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java @@ -137,11 +137,11 @@ static QueryRequest toQueryRequest(HttpUrl url) { .limit(maybeInteger(url.queryParameter("limit"))).build(); } - static Long maybeLong(String input) { + static Long maybeLong(@Nullable String input) { return input != null ? Long.valueOf(input) : null; } - static Integer maybeInteger(String input) { + static Integer maybeInteger(@Nullable String input) { return input != null ? Integer.valueOf(input) : null; } diff --git a/zipkin-junit/src/main/java/zipkin/junit/ZipkinRule.java b/zipkin-junit/src/main/java/zipkin/junit/ZipkinRule.java index d86014964ba..0646c9c0c0c 100644 --- a/zipkin-junit/src/main/java/zipkin/junit/ZipkinRule.java +++ b/zipkin-junit/src/main/java/zipkin/junit/ZipkinRule.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,6 +14,8 @@ package zipkin.junit; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -29,9 +31,13 @@ import org.junit.runners.model.Statement; import zipkin.Span; import zipkin.collector.InMemoryCollectorMetrics; -import zipkin.storage.InMemoryStorage; +import zipkin.internal.CallbackCaptor; +import zipkin.internal.GroupByTraceId; +import zipkin.internal.V2InMemoryStorage; +import zipkin.internal.V2SpanConverter; import static okhttp3.mockwebserver.SocketPolicy.KEEP_OPEN; +import static zipkin.internal.GroupByTraceId.TRACE_DESCENDING; /** * Starts up a local Zipkin server, listening for http requests on {@link #httpUrl}. @@ -42,8 +48,7 @@ * See http://openzipkin.github.io/zipkin-api/#/ */ public final class ZipkinRule implements TestRule { - - private final InMemoryStorage storage = new InMemoryStorage(); + private final V2InMemoryStorage storage = V2InMemoryStorage.newBuilder().build(); private final InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics(); private final MockWebServer server = new MockWebServer(); private final BlockingQueue failureQueue = new LinkedBlockingQueue<>(); @@ -109,7 +114,9 @@ public InMemoryCollectorMetrics collectorMetrics() { * you'd add the parent here. */ public ZipkinRule storeSpans(List spans) { - storage.spanConsumer().accept(spans); + CallbackCaptor callback = new CallbackCaptor<>(); + storage.asyncSpanConsumer().accept(spans, callback); + callback.get(); return this; } @@ -131,7 +138,17 @@ public ZipkinRule enqueueFailure(HttpFailure failure) { /** Retrieves all traces this zipkin server has received. */ public List> getTraces() { - return storage.spanStore().getRawTraces(); + List> traces = storage.v2SpanStore().getTraces(); + List> result = new ArrayList<>(traces.size()); + for (List trace2 : traces) { + List sameTraceId = new ArrayList<>(); + for (zipkin.internal.v2.Span span2 : trace2) { + sameTraceId.add(V2SpanConverter.toSpan(span2)); + } + result.addAll(GroupByTraceId.apply(sameTraceId, false, false)); + } + Collections.sort(result, TRACE_DESCENDING); + return result; } /** diff --git a/zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java b/zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java index 2b36be3008f..0483037e66a 100644 --- a/zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java +++ b/zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java @@ -174,10 +174,11 @@ public void httpRequestCountIncrements() throws IOException { */ @Test public void collectorMetrics_spans() throws IOException { - postSpans(TRACE); - postSpans(TRACE); + postSpans(asList(LOTS_OF_SPANS[0])); + postSpans(asList(LOTS_OF_SPANS[1], LOTS_OF_SPANS[2])); - assertThat(zipkin.collectorMetrics().spans()).isEqualTo(TRACE.size() * 2); + assertThat(zipkin.collectorMetrics().spans()) + .isEqualTo(3); } @Test diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java index 16770d36144..b73d1b69d01 100644 --- a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java +++ b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java @@ -27,8 +27,8 @@ import org.springframework.context.annotation.Configuration; import zipkin.collector.CollectorMetrics; import zipkin.collector.CollectorSampler; +import zipkin.internal.V2InMemoryStorage; import zipkin.server.brave.TracedStorageComponent; -import zipkin.storage.InMemoryStorage; import zipkin.storage.StorageComponent; @Configuration @@ -86,7 +86,10 @@ static class InMemoryConfiguration { @Bean StorageComponent storage( @Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId, @Value("${zipkin.storage.mem.max-spans:500000}") int maxSpans) { - return InMemoryStorage.builder().strictTraceId(strictTraceId).maxSpanCount(maxSpans).build(); + return V2InMemoryStorage.newBuilder() + .strictTraceId(strictTraceId) + .maxSpanCount(maxSpans) + .build(); } } } diff --git a/zipkin-server/src/test/java/zipkin/server/ZipkinServerIntegrationTest.java b/zipkin-server/src/test/java/zipkin/server/ZipkinServerIntegrationTest.java index f5ce83080fd..5bf342ac5d7 100644 --- a/zipkin-server/src/test/java/zipkin/server/ZipkinServerIntegrationTest.java +++ b/zipkin-server/src/test/java/zipkin/server/ZipkinServerIntegrationTest.java @@ -13,6 +13,8 @@ */ package zipkin.server; +import java.io.IOException; +import java.util.List; import okio.Buffer; import okio.GzipSink; import org.junit.Before; @@ -32,6 +34,7 @@ import zipkin.Codec; import zipkin.Span; import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.internal.V2InMemoryStorage; import zipkin.internal.V2SpanConverter; import zipkin.internal.v2.codec.Encoder; import zipkin.internal.v2.codec.MessageEncoder; @@ -61,7 +64,7 @@ public class ZipkinServerIntegrationTest { @Autowired ConfigurableWebApplicationContext context; @Autowired - InMemoryStorage storage; + V2InMemoryStorage storage; @Autowired ActuateCollectorMetrics metrics; @@ -103,7 +106,8 @@ public void writeSpans_version2() throws Exception { @Test public void writeSpans_updatesMetrics() throws Exception { - byte[] body = Codec.JSON.writeSpans(TRACE); + List spans = asList(LOTS_OF_SPANS[0], LOTS_OF_SPANS[1], LOTS_OF_SPANS[2]); + byte[] body = Codec.JSON.writeSpans(spans); mockMvc.perform(post("/api/v1/spans").content(body)); mockMvc.perform(post("/api/v1/spans").content(body)); @@ -114,9 +118,9 @@ public void writeSpans_updatesMetrics() throws Exception { .andExpect(jsonPath("$.['counter.zipkin_collector.bytes.http']").value(body.length * 2)) .andExpect(jsonPath("$.['gauge.zipkin_collector.message_bytes.http']") .value(Double.valueOf(body.length))) // most recent size - .andExpect(jsonPath("$.['counter.zipkin_collector.spans.http']").value(TRACE.size() * 2)) + .andExpect(jsonPath("$.['counter.zipkin_collector.spans.http']").value(spans.size() * 2)) .andExpect(jsonPath("$.['gauge.zipkin_collector.message_spans.http']") - .value(Double.valueOf(TRACE.size()))); // most recent count + .value(Double.valueOf(spans.size()))); // most recent count } @Test diff --git a/zipkin/src/main/java/zipkin/internal/V2InMemoryStorage.java b/zipkin/src/main/java/zipkin/internal/V2InMemoryStorage.java new file mode 100644 index 00000000000..04d0609acfd --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/V2InMemoryStorage.java @@ -0,0 +1,71 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.io.IOException; +import zipkin.internal.v2.storage.InMemoryStorage; + +public final class V2InMemoryStorage extends V2StorageComponent { + + public static Builder newBuilder() { + return new V2InMemoryStorage.Builder(); + } + + public static final class Builder implements zipkin.storage.StorageComponent.Builder { + final InMemoryStorage.Builder delegate = InMemoryStorage.newBuilder(); + + @Override public Builder strictTraceId(boolean strictTraceId) { + delegate.strictTraceId(strictTraceId); + return this; + } + + /** Eldest traces are removed to ensure spans in memory don't exceed this value */ + public Builder maxSpanCount(int maxSpanCount) { + delegate.maxSpanCount(maxSpanCount); + return this; + } + + @Override public V2InMemoryStorage build() { + return new V2InMemoryStorage(delegate.build()); + } + + Builder() { + } + } + + final InMemoryStorage delegate; + + V2InMemoryStorage(InMemoryStorage delegate) { + this.delegate = delegate; + } + + @Override public InMemoryStorage v2SpanStore() { + return delegate; + } + + @Override public InMemoryStorage v2SpanConsumer() { + return delegate; + } + + @Override public CheckResult check() { + return CheckResult.OK; + } + + public void clear() { + delegate.clear(); + } + + @Override public void close() throws IOException { + } +} diff --git a/zipkin/src/main/java/zipkin/internal/v2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin/internal/v2/storage/InMemoryStorage.java new file mode 100644 index 00000000000..5c9b343326e --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/v2/storage/InMemoryStorage.java @@ -0,0 +1,414 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal.v2.storage; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import zipkin.DependencyLink; +import zipkin.internal.DependencyLinker; +import zipkin.internal.Pair; +import zipkin.internal.v2.Call; +import zipkin.internal.v2.Span; + +import static zipkin.internal.GroupByTraceId.TRACE_DESCENDING; +import static zipkin.internal.Util.sortedList; + +/** + * Test storage component that keeps all spans in memory, accepting them on the calling thread. + * + *

Internally, spans are indexed on 64-bit trace ID + * + *

Here's an example of some traces in memory: + * + *

{@code
+ * spansByTraceIdTimeStamp:
+ *     --> ( spanA(time:July 4, traceId:aaaa, service:foo, name:GET),
+ *                        spanB(time:July 4, traceId:aaaa, service:bar, name:GET) )
+ *     --> ( spanC(time:July 4, traceId:aaaa, service:foo, name:GET) )
+ *     --> ( spanD(time:July 5, traceId:bbbb, service:biz, name:GET) )
+ *     --> ( spanE(time:July 6, traceId:bbbb) service:foo, name:POST )
+ *
+ * traceIdToTraceIdTimeStamps:
+ *    aaaa --> [  ]
+ *    bbbb --> [ ,  ]
+ *    cccc --> [  ]
+ *
+ * serviceToTraceIds:
+ *    foo --> [ , ,  ]
+ *    bar --> [  ]
+ *    biz --> [  ]
+ *
+ * serviceToSpanNames:
+ *    bar --> ( GET )
+ *    biz --> ( GET )
+ *    foo --> ( GET, POST )
+ * }
+ */ +public final class InMemoryStorage implements SpanConsumer, SpanStore { + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder { + boolean strictTraceId = true; + int maxSpanCount = 500000; + + /** {@inheritDoc} */ + public Builder strictTraceId(boolean strictTraceId) { + this.strictTraceId = strictTraceId; + return this; + } + + /** Eldest traces are removed to ensure spans in memory don't exceed this value */ + public Builder maxSpanCount(int maxSpanCount) { + if (maxSpanCount <= 0) throw new IllegalArgumentException("maxSpanCount <= 0"); + this.maxSpanCount = maxSpanCount; + return this; + } + + public InMemoryStorage build() { + return new InMemoryStorage(this); + } + } + + /** + * Primary source of data is this map, which includes spans ordered descending by timestamp. All + * other maps are derived from the span values here. This uses a list for the spans, so that it is + * visible (via /api/v2/trace/id?raw) when instrumentation report the same spans multiple times. + */ + private final SortedMultimap, Span> spansByTraceIdTimeStamp = + new SortedMultimap(VALUE_2_DESCENDING) { + @Override Collection valueContainer() { + return new LinkedList<>(); + } + }; + + /** This supports span lookup by {@link Span#traceId lower 64-bits of the trace ID} */ + private final SortedMultimap> traceIdToTraceIdTimeStamps = + new SortedMultimap>(Long::compareTo) { + @Override Collection> valueContainer() { + return new LinkedHashSet<>(); + } + }; + /** This is an index of {@link Span#traceId} by {@link zipkin.Endpoint#serviceName service name} */ + private final ServiceNameToTraceIds serviceToTraceIds = new ServiceNameToTraceIds(); + /** This is an index of {@link Span#name} by {@link zipkin.Endpoint#serviceName service name} */ + private final SortedMultimap serviceToSpanNames = + new SortedMultimap(String::compareTo) { + @Override Collection valueContainer() { + return new LinkedHashSet<>(); + } + }; + + final boolean strictTraceId; + final int maxSpanCount; + volatile int acceptedSpanCount; + + InMemoryStorage(Builder builder) { + this.strictTraceId = builder.strictTraceId; + this.maxSpanCount = builder.maxSpanCount; + } + + public synchronized void clear() { + acceptedSpanCount = 0; + traceIdToTraceIdTimeStamps.clear(); + spansByTraceIdTimeStamp.clear(); + serviceToTraceIds.clear(); + serviceToSpanNames.clear(); + } + + @Override synchronized public Call accept(List spans) { + int delta = spans.size(); + int spansToRecover = (spansByTraceIdTimeStamp.size() + delta) - maxSpanCount; + evictToRecoverSpans(spansToRecover); + for (Span span : spans) { + Long timestamp = span.timestamp() != null ? span.timestamp() : Long.MIN_VALUE; + Pair traceIdTimeStamp = Pair.create(span.traceId(), timestamp); + spansByTraceIdTimeStamp.put(traceIdTimeStamp, span); + traceIdToTraceIdTimeStamps.put(span.traceId(), traceIdTimeStamp); + acceptedSpanCount++; + + String spanName = span.name(); + if (span.localServiceName() != null) { + serviceToTraceIds.put(span.localServiceName(), span.traceId()); + serviceToSpanNames.put(span.localServiceName(), spanName); + } + if (span.remoteServiceName() != null) { + serviceToTraceIds.put(span.remoteServiceName(), span.traceId()); + serviceToSpanNames.put(span.remoteServiceName(), spanName); + } + } + return Call.create(null /* Void == null */); + } + + /** Returns the count of spans evicted. */ + int evictToRecoverSpans(int spansToRecover) { + int spansEvicted = 0; + while (spansToRecover > 0) { + int spansInOldestTrace = deleteOldestTrace(); + spansToRecover -= spansInOldestTrace; + spansEvicted += spansInOldestTrace; + } + return spansEvicted; + } + + /** Returns the count of spans evicted. */ + private int deleteOldestTrace() { + int spansEvicted = 0; + long traceId = spansByTraceIdTimeStamp.delegate.lastKey()._1; + Collection> traceIdTimeStamps = traceIdToTraceIdTimeStamps.remove(traceId); + for (Iterator> traceIdTimeStampIter = traceIdTimeStamps.iterator(); + traceIdTimeStampIter.hasNext(); ) { + Pair traceIdTimeStamp = traceIdTimeStampIter.next(); + Collection spans = spansByTraceIdTimeStamp.remove(traceIdTimeStamp); + spansEvicted += spans.size(); + } + for (String orphanedService : serviceToTraceIds.removeServiceIfTraceId(traceId)) { + serviceToSpanNames.remove(orphanedService); + } + return spansEvicted; + } + + @Override + public synchronized Call>> getTraces(QueryRequest request) { + return getTraces(request, strictTraceId); + } + + synchronized Call>> getTraces(QueryRequest request, boolean strictTraceId) { + Set traceIdsInTimerange = traceIdsDescendingByTimestamp(request); + if (traceIdsInTimerange.isEmpty()) return Call.emptyList(); + + List> result = new ArrayList<>(); + for (Iterator traceId = traceIdsInTimerange.iterator(); + traceId.hasNext() && result.size() < request.limit(); ) { + List next = spansByTraceId(traceId.next()); + if (!request.test(next)) continue; + if (!strictTraceId) { + result.add(next); + continue; + } + + // re-run the query as now spans are strictly grouped + for (List strictTrace : strictByTraceId(next)) { + if (request.test(strictTrace)) result.add(strictTrace); + } + } + + return Call.create(result); + } + + static Collection> strictByTraceId(List next) { + Map, List> groupedByTraceId = new LinkedHashMap<>(); + for (Span span : next) { + Pair traceIdPair = Pair.create(span.traceIdHigh(), span.traceId()); + if (!groupedByTraceId.containsKey(traceIdPair)) { + groupedByTraceId.put(traceIdPair, new LinkedList<>()); + } + groupedByTraceId.get(traceIdPair).add(span); + } + return groupedByTraceId.values(); + } + + /** Used for testing. Returns all traces unconditionally. */ + public synchronized List> getTraces() { + List> result = new ArrayList<>(); + for (long traceId : traceIdToTraceIdTimeStamps.keySet()) { + List sameTraceId = spansByTraceId(traceId); + if (strictTraceId) { + result.addAll(strictByTraceId(sameTraceId)); + } else { + result.add(sameTraceId); + } + } + return result; + } + + Set traceIdsDescendingByTimestamp(QueryRequest request) { + Collection> traceIdTimestamps = request.serviceName() != null + ? traceIdTimestampsByServiceName(request.serviceName()) + : spansByTraceIdTimeStamp.keySet(); + + long endTs = request.endTs() * 1000; + long startTs = endTs - request.lookback() * 1000; + + if (traceIdTimestamps == null || traceIdTimestamps.isEmpty()) return Collections.emptySet(); + Set result = new LinkedHashSet<>(); + for (Pair traceIdTimestamp : traceIdTimestamps) { + if (traceIdTimestamp._2 >= startTs || traceIdTimestamp._2 <= endTs) { + result.add(traceIdTimestamp._1); + } + } + return result; + } + + @Override public synchronized Call> getTrace(long traceIdHigh, long traceId) { + List spans = spansByTraceId(traceId); + if (spans == null || spans.isEmpty()) return Call.emptyList(); + if (!strictTraceId) return Call.create(spans); + + List filtered = new ArrayList<>(spans); + Iterator iterator = filtered.iterator(); + while (iterator.hasNext()) { + if (iterator.next().traceIdHigh() != traceIdHigh) { + iterator.remove(); + } + } + return Call.create(filtered); + } + + @Override public synchronized Call> getServiceNames() { + List result = sortedList(serviceToTraceIds.keySet()); + return Call.create(result); + } + + @Override public synchronized Call> getSpanNames(String service) { + if (service.isEmpty()) return Call.emptyList(); + service = service.toLowerCase(Locale.ROOT); // service names are always lowercase! + List result = sortedList(serviceToSpanNames.get(service)); + return Call.create(result); + } + + @Override + public synchronized Call> getDependencies(long endTs, long lookback) { + QueryRequest request = QueryRequest.newBuilder() + .endTs(endTs) + .lookback(lookback) + .limit(Integer.MAX_VALUE).build(); + + // We don't have a query parameter for strictTraceId when fetching dependency links, so we + // ignore traceIdHigh. Otherwise, a single trace can appear as two, doubling callCount. + Call>> getTracesCall = getTraces(request, false); + return getTracesCall.map(traces -> { + DependencyLinker linksBuilder = new DependencyLinker(); + for (Collection trace : traces) { + // use a hash set to dedupe any redundantly accepted spans + linksBuilder.putTrace(new LinkedHashSet<>(trace).iterator()); + } + return linksBuilder.link(); + }); + } + + static final Comparator> VALUE_2_DESCENDING = (left, right) -> { + int result = right._2.compareTo(left._2); + if (result != 0) return result; + return right._1.compareTo(left._1); + }; + + static final class ServiceNameToTraceIds extends SortedMultimap { + ServiceNameToTraceIds() { + super(String::compareTo); + } + + @Override Set valueContainer() { + return new LinkedHashSet<>(); + } + + /** Returns service names orphaned by removing the trace ID */ + Set removeServiceIfTraceId(long traceId) { + Set result = new LinkedHashSet<>(); + for (Map.Entry> entry : delegate.entrySet()) { + Collection traceIds = entry.getValue(); + if (traceIds.remove(traceId) && traceIds.isEmpty()) { + result.add(entry.getKey()); + } + } + delegate.keySet().removeAll(result); + return result; + } + } + + // Not synchronized as every exposed method on the enclosing type is + static abstract class SortedMultimap { + final SortedMap> delegate; + int size = 0; + + SortedMultimap(Comparator comparator) { + delegate = new TreeMap<>(comparator); + } + + abstract Collection valueContainer(); + + Set keySet() { + return delegate.keySet(); + } + + int size() { + return size; + } + + void put(K key, V value) { + Collection valueContainer = delegate.get(key); + if (valueContainer == null) { + delegate.put(key, valueContainer = valueContainer()); + } + if (valueContainer.add(value)) size++; + } + + Collection remove(K key) { + Collection value = delegate.remove(key); + if (value != null) size -= value.size(); + return value; + } + + void clear() { + delegate.clear(); + size = 0; + } + + Collection get(K key) { + Collection result = delegate.get(key); + return result != null ? result : Collections.emptySet(); + } + } + + private List spansByTraceId(long traceId) { + List sameTraceId = new ArrayList<>(); + for (Pair traceIdTimestamp : traceIdToTraceIdTimeStamps.get(traceId)) { + sameTraceId.addAll(spansByTraceIdTimeStamp.get(traceIdTimestamp)); + } + return sameTraceId; + } + + private Collection> traceIdTimestampsByServiceName(String serviceName) { + List> traceIdTimestamps = new ArrayList<>(); + for (long traceId : serviceToTraceIds.get(serviceName)) { + traceIdTimestamps.addAll(traceIdToTraceIdTimeStamps.get(traceId)); + } + Collections.sort(traceIdTimestamps, VALUE_2_DESCENDING); + return traceIdTimestamps; + } + + /** Compares by {@link Span#timestamp()} if present. */ + final static Comparator SPAN_COMPARATOR = new Comparator() { + @Override public int compare(Span left, Span right) { + if (left == right) return 0; + long x = left.timestamp() == null ? Long.MIN_VALUE : left.timestamp(); + long y = right.timestamp() == null ? Long.MIN_VALUE : right.timestamp(); + return x < y ? -1 : x == y ? 0 : 1; + } + }; +} diff --git a/zipkin/src/test/java/zipkin/internal/ITV2InMemoryStorage.java b/zipkin/src/test/java/zipkin/internal/ITV2InMemoryStorage.java new file mode 100644 index 00000000000..ce77c2f26c9 --- /dev/null +++ b/zipkin/src/test/java/zipkin/internal/ITV2InMemoryStorage.java @@ -0,0 +1,74 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.io.IOException; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import zipkin.Span; +import zipkin.TestObjects; +import zipkin.storage.StorageComponent; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(Enclosed.class) +public class ITV2InMemoryStorage { + + public static class DependenciesTest extends zipkin.storage.DependenciesTest { + final V2InMemoryStorage storage = V2InMemoryStorage.newBuilder().build(); + + @Override protected V2InMemoryStorage storage() { + return storage; + } + + @Override public void clear() { + storage.clear(); + } + } + + public static class SpanStoreTest extends zipkin.storage.SpanStoreTest { + final V2InMemoryStorage storage = V2InMemoryStorage.newBuilder().build(); + + @Override protected V2InMemoryStorage storage() { + return storage; + } + + @Override public void clear() throws IOException { + storage.clear(); + } + + /** This shows when spans are sent multiple times. Doing so can reveal instrumentation bugs. */ + @Test public void getRawTrace_sameSpanTwice(){ + Span span = TestObjects.LOTS_OF_SPANS[0]; + accept(span); + accept(span); + + assertThat(store().getRawTrace(span.traceIdHigh, span.traceId)) + .containsExactly(span, span); + } + } + + public static class StrictTraceIdFalseTest extends zipkin.storage.StrictTraceIdFalseTest { + final V2InMemoryStorage storage = V2InMemoryStorage.newBuilder().strictTraceId(false).build(); + + @Override protected StorageComponent storage() { + return storage; + } + + @Override public void clear() throws IOException { + storage.clear(); + } + } +} diff --git a/zipkin/src/test/java/zipkin/internal/v2/storage/InMemoryStorageTest.java b/zipkin/src/test/java/zipkin/internal/v2/storage/InMemoryStorageTest.java new file mode 100644 index 00000000000..03a65d9fb83 --- /dev/null +++ b/zipkin/src/test/java/zipkin/internal/v2/storage/InMemoryStorageTest.java @@ -0,0 +1,98 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal.v2.storage; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.Test; +import zipkin.DependencyLink; +import zipkin.Endpoint; +import zipkin.internal.v2.Span; + +import static java.util.Arrays.asList; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static zipkin.TestObjects.APP_ENDPOINT; +import static zipkin.TestObjects.DAY; +import static zipkin.TestObjects.TODAY; + +public class InMemoryStorageTest { + InMemoryStorage storage = InMemoryStorage.newBuilder().build(); + + @Test public void getTraces_filteringMatchesMostRecentTraces() throws IOException { + List endpoints = IntStream.rangeClosed(1, 10).mapToObj(i -> + Endpoint.create("service" + i, 127 << 24 | i)) + .collect(Collectors.toList()); + + long gapBetweenSpans = 100; + List earlySpans = IntStream.rangeClosed(1, 10).mapToObj(i -> Span.builder().name("early") + .traceId(i).id(i).timestamp((TODAY - i) * 1000).duration(1L) + .localEndpoint(endpoints.get(i - 1)).build()).collect(toList()); + + List lateSpans = IntStream.rangeClosed(1, 10).mapToObj(i -> Span.builder().name("late") + .traceId(i + 10).id(i + 10).timestamp((TODAY + gapBetweenSpans - i) * 1000).duration(1L) + .localEndpoint(endpoints.get(i - 1)).build()).collect(toList()); + + storage.accept(earlySpans).execute(); + storage.accept(lateSpans).execute(); + + List[] earlyTraces = + earlySpans.stream().map(Collections::singletonList).toArray(List[]::new); + List[] lateTraces = + lateSpans.stream().map(Collections::singletonList).toArray(List[]::new); + + //sanity checks + assertThat(storage.getTraces(requestBuilder().serviceName("service1").build()).execute()) + .containsExactly(lateTraces[0], earlyTraces[0]); + + assertThat(storage.getTraces(requestBuilder().build()).execute()) + .hasSize(20); + + assertThat(storage.getTraces(requestBuilder() + .limit(10).build()).execute()) + .containsExactly(lateTraces); + + assertThat(storage.getTraces(requestBuilder() + .endTs(TODAY + gapBetweenSpans).lookback(gapBetweenSpans).build()).execute()) + .containsExactly(lateTraces); + + assertThat(storage.getTraces(requestBuilder() + .endTs(TODAY).build()).execute()) + .containsExactly(earlyTraces); + } + + /** It should be safe to run dependency link jobs twice */ + @Test public void replayOverwrites() throws IOException { + Span span = Span.builder().traceId(10L).id(10L).name("receive") + .kind(Span.Kind.CONSUMER) + .localEndpoint(APP_ENDPOINT) + .remoteEndpoint(Endpoint.builder().serviceName("kafka").build()) + .timestamp(TODAY * 1000) + .build(); + + storage.accept(asList(span)); + storage.accept(asList(span)); + + assertThat(storage.getDependencies(TODAY + 1000L, TODAY).execute()).containsOnly( + DependencyLink.builder().parent("kafka").child("app").callCount(1L).build() + ); + } + + static QueryRequest.Builder requestBuilder() { + return QueryRequest.newBuilder().endTs(TODAY + DAY).lookback(DAY * 2).limit(100); + } +}