From 0c9852c287982aa255a40c7edcfb2943524c5e53 Mon Sep 17 00:00:00 2001 From: William Huba Date: Fri, 22 Jan 2021 08:32:50 -0500 Subject: [PATCH] Add new output plugin for sending OpenTelemetry metrics over gRPC. (#232) --- agent/pom.xml | 8 + .../com/spotify/ffwd/FastForwardAgent.java | 1 + api/pom.xml | 17 +- .../java/com/spotify/ffwd/Initializable.java | 3 +- core/pom.xml | 4 + docs/_layouts/documentation.html | 6 +- docs/content/_docs/opentelemetry.md | 23 +++ modules/opencensus/pom.xml | 121 ++++++------ modules/opentelemetry/pom.xml | 95 ++++++++++ .../OpenTelemetryOutputModule.java | 35 ++++ .../OpenTelemetryOutputPlugin.java | 69 +++++++ .../OpenTelemetryPluginSink.java | 173 ++++++++++++++++++ modules/pubsub/pom.xml | 34 ++++ pom.xml | 126 ++++++++++--- 14 files changed, 631 insertions(+), 84 deletions(-) create mode 100644 docs/content/_docs/opentelemetry.md create mode 100644 modules/opentelemetry/pom.xml create mode 100644 modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryOutputModule.java create mode 100644 modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryOutputPlugin.java create mode 100644 modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryPluginSink.java diff --git a/agent/pom.xml b/agent/pom.xml index 7d9172ff..ab7dac9d 100644 --- a/agent/pom.xml +++ b/agent/pom.xml @@ -26,6 +26,10 @@ com.spotify.ffwd ffwd-core + + com.spotify.ffwd + ffwd-api + com.spotify.ffwd ffwd-module-kafka @@ -66,6 +70,10 @@ com.spotify.ffwd ffwd-module-opencensus + + com.spotify.ffwd + ffwd-module-opentelemetry + diff --git a/agent/src/main/java/com/spotify/ffwd/FastForwardAgent.java b/agent/src/main/java/com/spotify/ffwd/FastForwardAgent.java index 96a4a467..9a73cf43 100644 --- a/agent/src/main/java/com/spotify/ffwd/FastForwardAgent.java +++ b/agent/src/main/java/com/spotify/ffwd/FastForwardAgent.java @@ -96,6 +96,7 @@ static FastForwardAgent setup(final Optional configPath) { modules.add(com.spotify.ffwd.http.HttpModule.class); modules.add(com.spotify.ffwd.pubsub.PubsubOutputModule.class); modules.add(com.spotify.ffwd.opencensus.OpenCensusOutputModule.class); + modules.add(com.spotify.ffwd.opentelemetry.OpenTelemetryOutputModule.class); final AgentCore.Builder builder = AgentCore.builder() .modules(modules) diff --git a/api/pom.xml b/api/pom.xml index 0796b37a..a3cf1548 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -23,6 +23,12 @@ com.google.inject guice + + + com.google.guava + guava + + com.google.inject.extensions @@ -31,7 +37,10 @@ com.google.protobuf protobuf-java - 3.11.1 + + + com.google.guava + guava @@ -68,6 +77,12 @@ com.spotify.metrics semantic-metrics-core + + + com.google.guava + guava + + diff --git a/api/src/main/java/com/spotify/ffwd/Initializable.java b/api/src/main/java/com/spotify/ffwd/Initializable.java index b5b854ad..26cc075d 100644 --- a/api/src/main/java/com/spotify/ffwd/Initializable.java +++ b/api/src/main/java/com/spotify/ffwd/Initializable.java @@ -31,5 +31,6 @@ public interface Initializable { /** * Initialize a component, will be called synchronously after everything has been started. */ - void init(); + default void init() { + } } diff --git a/core/pom.xml b/core/pom.xml index 7a877f33..575e5039 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -34,6 +34,10 @@ com.uchuhimo konf + + com.google.code.gson + gson + com.fasterxml.jackson.module diff --git a/docs/_layouts/documentation.html b/docs/_layouts/documentation.html index b629f6fb..0bb636ac 100644 --- a/docs/_layouts/documentation.html +++ b/docs/_layouts/documentation.html @@ -28,9 +28,13 @@ Protobuf -
  • +
  • Pubsub
  • + +
  • + OpenTelemetry +
  • diff --git a/docs/content/_docs/opentelemetry.md b/docs/content/_docs/opentelemetry.md new file mode 100644 index 00000000..6609da09 --- /dev/null +++ b/docs/content/_docs/opentelemetry.md @@ -0,0 +1,23 @@ +--- +title: OpenTelemetry +--- + +# FastForward OpenTelemetry + +This module provies an output plugin for [OpenTelemetry metrics](https://github.com/open-telemetry/opentelemetry-proto/). It does not provide any tracing related functionality. + +## Configuration + +* `endpoint` - The gRPC endpoint to send metrics to. +* `headers` - An optional map of headers to include in the [MetricService export](https://github.com/open-telemetry/opentelemetry-proto/blob/v0.7.0/opentelemetry/proto/collector/metrics/v1/metrics_service.proto#L32) RPC. + +Here is an example configuration using the OpenTelemetry plugin: + +``` +output: + plugins: + - type: opentelemetry + endpoint: ingestion.example.com:443 + headers: + Authentication: Bearer Z29vZCBqb2IgZGVjb2RpbmcgdGhpcyBlYXN0ZXIgZWdn +``` diff --git a/modules/opencensus/pom.xml b/modules/opencensus/pom.xml index 4e350a56..8e79755a 100644 --- a/modules/opencensus/pom.xml +++ b/modules/opencensus/pom.xml @@ -42,60 +42,73 @@
    - - io.opencensus - opencensus-api - 0.23.0 - - - io.opencensus - opencensus-impl - 0.23.0 - - - io.opencensus - opencensus-exporter-stats-stackdriver - 0.23.0 - - - com.google.protobuf - protobuf-java - - - io.grpc - grpc-auth - - - io.grpc - grpc-stub - - - io.grpc - grpc-netty-shaded - - - com.google.cloud - google-cloud-core - - - com.google.api - api-common - - - com.google.api.grpc - proto-google-common-protos - - - com.google.cloud - google-cloud-core-grpc - - - - - com.google.code.gson - gson - 2.8.1 - + + io.opencensus + opencensus-api + 0.23.0 + + + io.opencensus + opencensus-impl + 0.23.0 + + + com.google.guava + guava + + + + + io.opencensus + opencensus-exporter-stats-stackdriver + 0.23.0 + + + com.google.protobuf + protobuf-java + + + io.grpc + grpc-auth + + + io.grpc + grpc-stub + + + io.grpc + grpc-netty-shaded + + + com.google.cloud + google-cloud-core + + + com.google.api + api-common + + + com.google.api.grpc + proto-google-common-protos + + + com.google.cloud + google-cloud-core-grpc + + + com.google.guava + guava + + + + + com.google.code.gson + gson + + + com.google.guava + guava + diff --git a/modules/opentelemetry/pom.xml b/modules/opentelemetry/pom.xml new file mode 100644 index 00000000..f73d1881 --- /dev/null +++ b/modules/opentelemetry/pom.xml @@ -0,0 +1,95 @@ + + 4.0.0 + + + com.spotify.ffwd + ffwd-parent + 0.7.4-SNAPSHOT + ../../pom.xml + + + ffwd-module-opentelemetry + 0.7.4-SNAPSHOT + jar + FastForward OpenTelemetry Module + + + 0.14.1 + + + + + com.spotify.ffwd + ffwd-api + + + + com.fasterxml.jackson.core + jackson-databind + + + + io.grpc + grpc-netty-shaded + + + com.google.guava + guava + + + + + io.grpc + grpc-protobuf + + + com.google.guava + guava + + + com.google.api.grpc + proto-google-common-protos + + + com.google.protobuf + protobuf-java + + + + + io.grpc + grpc-stub + + + com.google.guava + guava + + + + + org.apache.tomcat + annotations-api + provided + + + com.google.guava + guava + + + com.google.protobuf + protobuf-java + + + + io.opentelemetry + opentelemetry-proto + ${opentelemetry.version} + + + com.google.protobuf + protobuf-java + + + + + \ No newline at end of file diff --git a/modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryOutputModule.java b/modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryOutputModule.java new file mode 100644 index 00000000..ebea7117 --- /dev/null +++ b/modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryOutputModule.java @@ -0,0 +1,35 @@ +/* + * -\-\- + * FastForward OpenTelemetry Module + * -- + * Copyright (C) 2021 Spotify AB. + * -- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * -/-/- + */ + +package com.spotify.ffwd.opentelemetry; + +import com.google.inject.Inject; +import com.spotify.ffwd.module.FastForwardModule; +import com.spotify.ffwd.module.PluginContext; + +public class OpenTelemetryOutputModule implements FastForwardModule { + @Inject + private PluginContext context; + + @Override + public void setup() { + context.registerOutput("opentelemetry", OpenTelemetryOutputPlugin.class); + } +} diff --git a/modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryOutputPlugin.java b/modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryOutputPlugin.java new file mode 100644 index 00000000..cab89705 --- /dev/null +++ b/modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryOutputPlugin.java @@ -0,0 +1,69 @@ +/* + * -\-\- + * FastForward OpenTelemetry Module + * -- + * Copyright (C) 2021 Spotify AB. + * -- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * -/-/- + */ + +package com.spotify.ffwd.opentelemetry; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.PrivateModule; +import com.spotify.ffwd.filter.Filter; +import com.spotify.ffwd.module.Batching; +import com.spotify.ffwd.output.OutputPlugin; +import com.spotify.ffwd.output.PluginSink; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OpenTelemetryOutputPlugin extends OutputPlugin { + private final Map headers; + private final String endpoint; + + @JsonCreator + public OpenTelemetryOutputPlugin( + @JsonProperty("filter") Optional filter, + @JsonProperty("flushInterval") @Nullable Long flushInterval, + @JsonProperty("batching") Optional batching, + @JsonProperty("headers") Optional> headers, + @JsonProperty("endpoint") @Nullable String endpoint + ) { + super(filter, Batching.from(flushInterval, batching)); + this.headers = headers.orElse(new HashMap<>()); + this.endpoint = Objects.requireNonNull(endpoint, "endpoint must be set"); + } + + @Override + public Module module(Key key, String id) { + return new PrivateModule() { + @Override + protected void configure() { + bind(Logger.class).toInstance(LoggerFactory.getLogger(id)); + bind(key).toInstance( + new OpenTelemetryPluginSink(endpoint, headers)); + expose(key); + } + }; + } +} diff --git a/modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryPluginSink.java b/modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryPluginSink.java new file mode 100644 index 00000000..4c8c9062 --- /dev/null +++ b/modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryPluginSink.java @@ -0,0 +1,173 @@ +/* + * -\-\- + * FastForward OpenTelemetry Module + * -- + * Copyright (C) 2021 Spotify AB. + * -- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * -/-/- + */ + +package com.spotify.ffwd.opentelemetry; + +import com.google.inject.Inject; +import com.spotify.ffwd.model.v2.Batch; +import com.spotify.ffwd.model.v2.Metric; +import com.spotify.ffwd.output.PluginSink; +import eu.toolchain.async.AsyncFramework; +import eu.toolchain.async.AsyncFuture; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.stub.MetadataUtils; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import io.opentelemetry.proto.common.v1.StringKeyValue; +import io.opentelemetry.proto.metrics.v1.DoubleDataPoint; +import io.opentelemetry.proto.metrics.v1.DoubleGauge; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OpenTelemetryPluginSink implements PluginSink { + private static final Logger log = LoggerFactory.getLogger(OpenTelemetryPluginSink.class); + + @Inject private AsyncFramework async; + private String endpoint; + private Map headers; + private ManagedChannel channel; + private MetricsServiceGrpc.MetricsServiceBlockingStub stub; + + OpenTelemetryPluginSink( + String endpoint, + Map headers + ) { + this.endpoint = endpoint; + this.headers = headers; + } + + + @Override + public void sendMetric(Metric metric) { + if (metric.hasDistribution()) { + log.warn("Distributions not supported, dropping metric"); + return; + } + + Map tags = metric.getTags(); + + // NB(hexedpackets): Name is supposed to uniquely identify the metric. The combination + // of `key` and `what` may or may not be adequate. It might make sense to have this be + // configurable? + String name = metric.getKey() + "." + tags.get("what"); + DoubleGauge gauge = convertGauge(metric); + io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = + io.opentelemetry.proto.metrics.v1.Metric.newBuilder() + .setName(name) + .setDoubleGauge(gauge); + + String unit = tags.get("unit"); + if (unit != null) { + metricBuilder.setUnit(unit); + } + + ExportMetricsServiceRequest request = ExportMetricsServiceRequest.newBuilder() + .addResourceMetrics(ResourceMetrics.newBuilder() + .addInstrumentationLibraryMetrics( + InstrumentationLibraryMetrics.newBuilder().addMetrics(metricBuilder.build()) + .build()) + .build()) + .build(); + + try { + //noinspection ResultOfMethodCallIgnored + stub.export(request); + } catch (Exception e) { + log.error(e.getMessage()); + } + } + + @Override + public void sendBatch(Batch batch) { + throw new RuntimeException("Batches are unsupported"); + } + + @Override + public AsyncFuture start() { + channel = ManagedChannelBuilder.forTarget(this.endpoint) + .useTransportSecurity() + .build(); + + MetricsServiceGrpc.MetricsServiceBlockingStub stub = + MetricsServiceGrpc.newBlockingStub(channel); + + Metadata extraHeaders = new Metadata(); + headers.forEach((key, value) -> { + Metadata.Key header = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER); + extraHeaders.put(header, value); + }); + this.stub = MetadataUtils.attachHeaders(stub, extraHeaders); + + return async.resolved(); + } + + @Override + public AsyncFuture stop() { + channel.shutdown(); + try { + channel.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Channel termination interrupted: ", e); + return async.failed(e); + } + return async.resolved(); + } + + @Override + public boolean isReady() { + return (channel != null && !channel.isTerminated()); + } + + private DoubleGauge convertGauge(Metric metric) { + Map tags = new HashMap<>(); + tags.putAll(metric.getTags()); + tags.putAll(metric.getResource()); + tags.put("key", metric.getKey()); + List labels = convertTags(tags); + + return DoubleGauge.newBuilder() + .addDataPoints(DoubleDataPoint.newBuilder() + .setStartTimeUnixNano(0) + .setTimeUnixNano(metric.getTime() * 1000 * 1000) + .setValue((Double) metric.getValue().getValue()) + .addAllLabels(labels) + .build()) + .build(); + } + + private List convertTags(Map tags) { + return tags.entrySet() + .parallelStream() + .map(entry -> StringKeyValue.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + .build()) + .collect(Collectors.toList()); + } +} diff --git a/modules/pubsub/pom.xml b/modules/pubsub/pom.xml index 1604de27..62198412 100644 --- a/modules/pubsub/pom.xml +++ b/modules/pubsub/pom.xml @@ -47,12 +47,46 @@ com.google.auto.value auto-value + + com.google.protobuf + protobuf-java + + + com.google.api.grpc + proto-google-common-protos +
    + + com.google.protobuf + protobuf-java + + + com.google.api.grpc + proto-google-common-protos + com.google.guava guava 27.0.1-jre + + + com.google.errorprone + error_prone_annotations + + + org.codehaus.mojo + animal-sniffer-annotations + + + + + com.google.errorprone + error_prone_annotations + + + org.codehaus.mojo + animal-sniffer-annotations diff --git a/pom.xml b/pom.xml index beb1f4a6..79c41aef 100644 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,7 @@ modules/http modules/pubsub modules/opencensus + modules/opentelemetry @@ -67,6 +68,12 @@ 1.3.31 1.1.2 3.0.4 + 1.35.0 + 2.8.6 + 2.4.0 + 1.19 + 27.0.1-jre + 3.11.1 @@ -185,31 +192,6 @@ - - com.google.auth - google-auth-library-credentials - 0.17.1 - - - com.google.auth - google-auth-library-oauth2-http - 0.17.1 - - - com.google.http-client - google-http-client - 1.31.0 - - - com.google.j2objc - j2objc-annotations - 1.3 - - - io.grpc - grpc-context - 1.19.0 - com.spotify.ffwd ffwd-api @@ -275,10 +257,94 @@ ffwd-module-opencensus ${project.version} + + com.spotify.ffwd + ffwd-module-opentelemetry + ${project.version} + + + + com.google.auth + google-auth-library-credentials + 0.17.1 + + + com.google.auth + google-auth-library-oauth2-http + 0.17.1 + + + com.google.http-client + google-http-client + 1.31.0 + + + com.google.j2objc + j2objc-annotations + 1.3 + + + com.google.code.gson + gson + ${gson.version} + com.google.protobuf protobuf-java - 3.11.1 + ${protobuf.version} + + + + + io.grpc + grpc-context + ${grpc.version} + + + io.grpc + grpc-core + ${grpc.version} + + + com.google.protobuf + protobuf-java + + + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + org.apache.tomcat + annotations-api + 6.0.53 + + + com.google.api.grpc + proto-google-common-protos + 1.12.0 + + + com.google.errorprone + error_prone_annotations + ${errorprone.version} + + + org.codehaus.mojo + animal-sniffer-annotations + ${animal_sniffer.version} @@ -290,6 +356,12 @@ com.uchuhimo konf 0.13.3 + + + com.google.code.gson + gson + + @@ -345,7 +417,7 @@ com.google.guava guava - 27.0.1-jre + ${guava.version}