Skip to content

Commit

Permalink
Adds Elasticsearch 6.x support using Span2 model
Browse files Browse the repository at this point in the history
This adds Elasticsearch 6.x support via single-type indexes:

* zipkin:span-2017-08-05 - span2 (single endpoint) format
* zipkin:dependency-2017-08-05 - dependency links in existing format

This indexing model will be available in the next minor release of
Zipkin, particularly for Elasticsearch 2.4+. If you aren't running
Elasticsearch 2.4+, yet. Please upgrade.

Those wishing to experiment with this format before the next minor
release can set `ES_EXPERIMENTAL_SPAN2=true` to use this style now.
When set, writes will use the above scheme, but both the former and new
indexes will be read.

Fixes #1676
See #1644 for the new span2 model
See #1679 for the dual-read approach, which this is similar to
  • Loading branch information
Adrian Cole committed Aug 6, 2017
1 parent 4f99f63 commit 644fb96
Show file tree
Hide file tree
Showing 42 changed files with 2,968 additions and 955 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.util.EnvironmentTestUtils.addEnvironment;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

public class ZipkinElasticsearchHttpStorageAutoConfigurationTest {

Expand Down Expand Up @@ -246,8 +247,8 @@ public void dailyIndexFormat() {
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
context.refresh();

assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
.isEqualTo("zipkin-1970-01-01");
assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
.isEqualTo("zipkin:span-1970-01-01");
}

@Test
Expand All @@ -262,8 +263,8 @@ public void dailyIndexFormat_overridingPrefix() {
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
context.refresh();

assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
.isEqualTo("zipkin_prod-1970-01-01");
assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
.isEqualTo("zipkin_prod:span-1970-01-01");
}

@Test
Expand All @@ -278,8 +279,8 @@ public void dailyIndexFormat_overridingDateSeparator() {
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
context.refresh();

assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
.isEqualTo("zipkin-1970.01.01");
assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
.isEqualTo("zipkin:span-1970.01.01");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class CassandraTest {

@ClassRule
public static LazyCassandra3Storage storage =
new LazyCassandra3Storage("cassandra:3.10", "test_zipkin3");
new LazyCassandra3Storage("openzipkin/zipkin-cassandra:1.29.1", "test_zipkin3");

public static class DependenciesTest extends CassandraDependenciesTest {
@Override protected Cassandra3Storage storage() {
Expand Down
52 changes: 6 additions & 46 deletions zipkin-storage/elasticsearch-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

This is is a plugin to the Elasticsearch storage component, which uses
HTTP by way of [OkHttp 3](https://github.com/square/okttp) and
[Moshi](https://github.com/square/moshi). This currently supports both
2.x and 5.x version families.
[Moshi](https://github.com/square/moshi). This currently supports 2.x
through 6.x version families.

## Multiple hosts
Most users will supply a DNS name that's mapped to multiple A or AAAA
Expand All @@ -23,7 +23,7 @@ Here are some examples:

## Indexes
Spans are stored into daily indices, for example spans with a timestamp
falling on 2016/03/19 will be stored in the index named 'zipkin-2016-03-19'.
falling on 2016/03/19 will be stored in the index named 'zipkin:span-2016-03-19'.
There is no support for TTL through this SpanStore. It is recommended
instead to use [Elastic Curator](https://www.elastic.co/guide/en/elasticsearch/client/curator/current/about.html)
to remove indices older than the point you are interested in.
Expand All @@ -36,8 +36,8 @@ the date separator from '-' to something else.
control the daily index format.

For example, spans with a timestamp falling on 2016/03/19 end up in the
index 'zipkin-2016-03-19'. When the date separator is '.', the index
would be 'zipkin-2016.03.19'.
index 'zipkin:span-2016-03-19'. When the date separator is '.', the index
would be 'zipkin:span-2016.03.19'.

### String Mapping
The Zipkin api implies aggregation and exact match (keyword) on string
Expand Down Expand Up @@ -82,54 +82,14 @@ your indexes:

```bash
# the output below shows which tokens will match on the trace id supplied.
$ curl -s localhost:9200/test_zipkin_http-2016-10-26/_analyze -d '{
$ curl -s localhost:9200/test_zipkin_http:span-2016-10-26/_analyze -d '{
"text": "48485a3953bb61246b221d5bc9e6496c",
"analyzer": "traceId_analyzer"
}'|jq '.tokens|.[]|.token'
"48485a3953bb61246b221d5bc9e6496c"
"6b221d5bc9e6496c"
```

### Span and service Names
Zipkin defines span and service names as lowercase. At write time, any
mixed case span or service names are downcased. If writing a custom
collector in a different language, make sure you write span and service
names in lowercase. Also, if there are any custom query tools, ensure
inputs are downcased.

Span and service name queries default to look back 24hrs (2 index days).
This can be controlled by `ElasticsearchHttpStorage.Builder.namesLookback`

#### Index format
Starting with Zipkin 1.23, service and span names are written to the
same daily indexes as spans and dependency links as the document type
"servicespan". This was added for performance reasons as formerly search
was using relatively expensive nested queries.

The documents themselves represent service and span name pairs. Only one
document is present per daily index. This is to keep the documents from
repeating at a multiplier of span count, which also simplifies query.
This deduplication is enforced at write time by using an ID convention
of the service and span name. Ex. `id = MyServiceName|MySpanName`

The document is a simple structure, like:
```json
{
"serviceName": "MyServiceName",
"spanName": "MySpanName",
}
```

The document does replicate data in the ID, but it is needed as you
cannot query based on an ID expression.

#### Notes for data written prior to Zipkin 1.23
Before Zipkin 1.23, service and span names were nested queries against
the span type. This was an expensive operation, which resulted in high
latency particularly when the UI loads. When the "servicespan" type is
missing from an index, or there's no results returned, a fallback nested
query is invoked.

## Customizing the ingest pipeline

When using Elasticsearch 5.x, you can setup an [ingest pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/master/pipeline.html)
Expand Down
14 changes: 14 additions & 0 deletions zipkin-storage/elasticsearch-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,19 @@
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,21 @@
*/
package zipkin.storage.elasticsearch.http;

import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import okio.Buffer;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.Pair;
import zipkin.internal.Nullable;
import zipkin.internal.Span2;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;

import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.UTF_8;
import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing

Expand All @@ -49,76 +45,64 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
return;
}
try {
HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es);
Map<String, Set<Pair<String>>> indexToServiceSpans = indexSpans(indexer, spans);
if (!indexToServiceSpans.isEmpty()) {
indexNames(indexer, indexToServiceSpans);
}
BulkSpanIndexer indexer = newBulkSpanIndexer(es);
indexSpans(indexer, spans);
indexer.execute(callback);
} catch (Throwable t) {
propagateIfFatal(t);
callback.onError(t);
}
}

/** Indexes spans and returns a mapping of indexes that may need a names update */
Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
Map<String, Set<Pair<String>>> indexToServiceSpans = new LinkedHashMap<>();
void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
for (Span span : spans) {
Long timestamp = guessTimestamp(span);
Long timestampMillis;
String index; // which index to store this span into
long indexTimestamp = 0L; // which index to store this span into
Long spanTimestamp;
if (timestamp != null) {
timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp);
index = indexNameFormatter.indexNameForTimestamp(timestampMillis);
indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(timestamp);
} else {
timestampMillis = null;
spanTimestamp = null;
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
// the index bucket, any annotation is better than using current time.
Long indexTimestamp = null;
for (int i = 0, length = span.annotations.size(); i < length; i++) {
indexTimestamp = span.annotations.get(i).timestamp / 1000;
break;
}
if (indexTimestamp == null) indexTimestamp = System.currentTimeMillis();
index = indexNameFormatter.indexNameForTimestamp(indexTimestamp);
if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
}
if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span);
byte[] document = Codec.JSON.writeSpan(span);
if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis);
indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
indexer.add(indexTimestamp, span, spanTimestamp);
}
return indexToServiceSpans;
}

void putServiceSpans(Map<String, Set<Pair<String>>> indexToServiceSpans, String index, Span s) {
Set<Pair<String>> serviceSpans = indexToServiceSpans.get(index);
if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
for (String serviceName : s.serviceNames()) {
serviceSpans.add(Pair.create(serviceName, s.name));
}

BulkSpanIndexer newBulkSpanIndexer(ElasticsearchHttpStorage es) {
return new BulkSpanIndexer(es);
}

/**
* Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
* a large order of duplicates ending up in the daily index. This also means queries do not need
* to deduplicate.
*/
void indexNames(HttpBulkIndexer indexer, Map<String, Set<Pair<String>>> indexToServiceSpans)
throws IOException {
Buffer buffer = new Buffer();
for (Map.Entry<String, Set<Pair<String>>> entry : indexToServiceSpans.entrySet()) {
String index = entry.getKey();
for (Pair<String> serviceSpan : entry.getValue()) {
JsonWriter writer = JsonWriter.of(buffer);
writer.beginObject();
writer.name("serviceName").value(serviceSpan._1);
writer.name("spanName").value(serviceSpan._2);
writer.endObject();
byte[] document = buffer.readByteArray();
indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2);
static class BulkSpanIndexer {
final HttpBulkIndexer indexer;
final IndexNameFormatter indexNameFormatter;

BulkSpanIndexer(ElasticsearchHttpStorage es) {
this.indexer = new HttpBulkIndexer("index-span", es);
this.indexNameFormatter = es.indexNameFormatter();
}

void add(long indexTimestamp, Span span, @Nullable Long timestampMillis) {
String index = indexNameFormatter.formatTypeAndTimestamp(SPAN, indexTimestamp);
for (Span2 span2 : Span2Converter.fromSpan(span)) {
byte[] document = Span2Codec.JSON.writeSpan(span2);
if (timestampMillis != null) {
document = prefixWithTimestampMillis(document, timestampMillis);
}
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}
}

void execute(Callback<Void> callback) throws IOException {
indexer.execute(callback);
}
}

private static final byte[] TIMESTAMP_MILLIS_PREFIX = "{\"timestamp_millis\":".getBytes(UTF_8);
Expand Down
Loading

0 comments on commit 644fb96

Please sign in to comment.