From e31e247f399eb2386b1286f561f4c74666ae1eb0 Mon Sep 17 00:00:00 2001 From: basilisk487 Date: Tue, 17 Mar 2020 12:20:04 -0500 Subject: [PATCH 1/5] PUB-221: Default proxy port (2878) in HTTP mode should function like DDI endpoint --- proxy/pom.xml | 52 +++-- .../java/com/wavefront/agent/PushAgent.java | 12 +- .../wavefront/agent/channel/ChannelUtils.java | 28 +++ .../wavefront/agent/formatter/DataFormat.java | 29 ++- .../agent/handlers/LineDelimitedUtils.java | 58 ++++++ .../AbstractLineDelimitedHandler.java | 30 ++- .../AbstractPortUnificationHandler.java | 9 +- .../agent/listeners/FeatureCheckUtils.java | 87 ++++++++ ...RawLogsIngesterPortUnificationHandler.java | 12 +- .../RelayPortUnificationHandler.java | 72 +++---- .../WavefrontPortUnificationHandler.java | 119 ++++++++++- .../listeners/tracing/JaegerThriftUtils.java | 84 ++++---- .../tracing/TracePortUnificationHandler.java | 38 ++-- .../tracing/ZipkinPortUnificationHandler.java | 48 ++--- .../com/wavefront/agent/PushAgentTest.java | 192 ++++++++++++++++++ .../agent/QueuedAgentServiceTest.java | 0 .../handlers/LineDelimitedUtilsTest.java | 31 +++ .../logsharvesting/LogsIngesterTest.java | 2 +- 18 files changed, 728 insertions(+), 175 deletions(-) create mode 100644 proxy/src/main/java/com/wavefront/agent/listeners/FeatureCheckUtils.java delete mode 100644 proxy/src/test/java/com/wavefront/agent/QueuedAgentServiceTest.java create mode 100644 proxy/src/test/java/com/wavefront/agent/handlers/LineDelimitedUtilsTest.java diff --git a/proxy/pom.xml b/proxy/pom.xml index 5a3aa48bb..8473a6088 100644 --- a/proxy/pom.xml +++ b/proxy/pom.xml @@ -261,11 +261,6 @@ jafama 2.1.0 - - com.google.re2j - re2j - 1.3 - @@ -312,9 +307,10 @@ jacocoArgLine - - com.wavefront.* - + + com.tdunning.math.* + org.logstash.* + @@ -325,11 +321,7 @@ - - - com.wavefront.* - BUNDLE @@ -339,7 +331,37 @@ + + PACKAGE + + com.wavefront.agent.preprocessor + + + + LINE + COVEREDRATIO + 0.91 + + + + + PACKAGE + + com.wavefront.agent.histogram + + + + LINE + COVEREDRATIO + 0.93 + + + + + com/tdunning/** + org/logstash/** + @@ -348,6 +370,12 @@ report + + + com/tdunning/** + org/logstash/** + + diff --git a/proxy/src/main/java/com/wavefront/agent/PushAgent.java b/proxy/src/main/java/com/wavefront/agent/PushAgent.java index 43cb45fbe..ff4eeea8c 100644 --- a/proxy/src/main/java/com/wavefront/agent/PushAgent.java +++ b/proxy/src/main/java/com/wavefront/agent/PushAgent.java @@ -663,7 +663,10 @@ protected void startGraphiteListener(String strPort, WavefrontPortUnificationHandler wavefrontPortUnificationHandler = new WavefrontPortUnificationHandler(strPort, tokenAuthenticator, healthCheckManager, - decoderSupplier.get(), handlerFactory, hostAnnotator, preprocessors.get(strPort)); + decoderSupplier.get(), handlerFactory, hostAnnotator, preprocessors.get(strPort), + () -> entityProps.get(ReportableEntityType.HISTOGRAM).isFeatureDisabled(), + () -> entityProps.get(ReportableEntityType.TRACE).isFeatureDisabled(), + () -> entityProps.get(ReportableEntityType.TRACE_SPAN_LOGS).isFeatureDisabled()); startAsManagedThread(port, new TcpIngester(createInitializer(wavefrontPortUnificationHandler, port, @@ -710,7 +713,7 @@ public void shutdown(@Nonnull String handle) { WavefrontPortUnificationHandler wavefrontPortUnificationHandler = new WavefrontPortUnificationHandler(strPort, tokenAuthenticator, healthCheckManager, decoderSupplier.get(), deltaCounterHandlerFactory, hostAnnotator, - preprocessors.get(strPort)); + preprocessors.get(strPort), () -> false, () -> false, () -> false); startAsManagedThread(port, new TcpIngester(createInitializer(wavefrontPortUnificationHandler, port, @@ -968,7 +971,10 @@ public void shutdown(@Nonnull String handle) { WavefrontPortUnificationHandler wavefrontPortUnificationHandler = new WavefrontPortUnificationHandler(strPort, tokenAuthenticator, healthCheckManager, decoderSupplier.get(), histogramHandlerFactory, hostAnnotator, - preprocessors.get(strPort)); + preprocessors.get(strPort), + () -> entityProps.get(ReportableEntityType.HISTOGRAM).isFeatureDisabled(), + () -> entityProps.get(ReportableEntityType.TRACE).isFeatureDisabled(), + () -> entityProps.get(ReportableEntityType.TRACE_SPAN_LOGS).isFeatureDisabled()); startAsManagedThread(port, new TcpIngester(createInitializer(wavefrontPortUnificationHandler, port, proxyConfig.getHistogramMaxReceivedLength(), proxyConfig.getHistogramHttpBufferSize(), diff --git a/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java b/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java index 2071ca191..d37d61e58 100644 --- a/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java +++ b/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java @@ -3,13 +3,18 @@ import com.google.common.base.Throwables; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.List; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; + import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -193,4 +198,27 @@ public static InetAddress getRemoteAddress(@Nonnull ChannelHandlerContext ctx) { InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); return remoteAddress == null ? null : remoteAddress.getAddress(); } + + /** + * Get path from the URI (i.e. discard everything after "?"). + * + * @param uri URI to get path from + * @return path + */ + public static String getPath(String uri) { + int paramsDelimiter = uri.indexOf('?'); + return paramsDelimiter == -1 ? uri : uri.substring(0, paramsDelimiter); + } + + /** + * Parse query params into name/value pairs. + * + * @param uri URI to get query params from + * @return list of {@link NameValuePair} + */ + public static List getQueryParams(String uri) { + int paramsDelimiter = uri.indexOf('?'); + return paramsDelimiter == -1 ? ImmutableList.of() : + URLEncodedUtils.parse(uri.substring(paramsDelimiter + 1), CharsetUtil.UTF_8); + } } diff --git a/proxy/src/main/java/com/wavefront/agent/formatter/DataFormat.java b/proxy/src/main/java/com/wavefront/agent/formatter/DataFormat.java index 35672d60a..b089efb6d 100644 --- a/proxy/src/main/java/com/wavefront/agent/formatter/DataFormat.java +++ b/proxy/src/main/java/com/wavefront/agent/formatter/DataFormat.java @@ -1,5 +1,8 @@ package com.wavefront.agent.formatter; +import javax.annotation.Nullable; + +import com.wavefront.api.agent.Constants; import com.wavefront.ingester.AbstractIngesterFormatter; /** @@ -8,10 +11,10 @@ * @author vasily@wavefront.com */ public enum DataFormat { - GENERIC, HISTOGRAM, SOURCE_TAG, EVENT, JSON_STRING; + DEFAULT, WAVEFRONT, HISTOGRAM, SOURCE_TAG, EVENT, SPAN, SPAN_LOG; public static DataFormat autodetect(final String input) { - if (input.length() < 2) return GENERIC; + if (input.length() < 2) return DEFAULT; char firstChar = input.charAt(0); switch (firstChar) { case '@': @@ -22,7 +25,7 @@ public static DataFormat autodetect(final String input) { if (input.startsWith(AbstractIngesterFormatter.EVENT_LITERAL)) return EVENT; break; case '{': - if (input.charAt(input.length() - 1) == '}') return JSON_STRING; + if (input.charAt(input.length() - 1) == '}') return SPAN_LOG; break; case '!': if (input.startsWith("!M ") || input.startsWith("!H ") || input.startsWith("!D ")) { @@ -30,6 +33,24 @@ public static DataFormat autodetect(final String input) { } break; } - return GENERIC; + return DEFAULT; + } + + @Nullable + public static DataFormat parse(String format) { + if (format == null) return null; + switch (format) { + case Constants.PUSH_FORMAT_WAVEFRONT: + case Constants.PUSH_FORMAT_GRAPHITE_V2: + return DataFormat.WAVEFRONT; + case Constants.PUSH_FORMAT_HISTOGRAM: + return DataFormat.HISTOGRAM; + case Constants.PUSH_FORMAT_TRACING: + return DataFormat.SPAN; + case Constants.PUSH_FORMAT_TRACING_SPAN_LOGS: + return DataFormat.SPAN_LOG; + default: + return null; + } } } diff --git a/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java b/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java index d4006639d..ae8aae29e 100644 --- a/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java +++ b/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java @@ -3,6 +3,8 @@ import org.apache.commons.lang.StringUtils; import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; /** * A collection of helper methods around plaintext newline-delimited payloads. @@ -21,10 +23,66 @@ private LineDelimitedUtils() { * @param pushData payload to split. * @return string array */ + @Deprecated public static String[] splitPushData(String pushData) { return StringUtils.split(pushData, PUSH_DATA_DELIMETER); } + /** + * Iterate over individual strings in a newline-delimited string. Skips empty strings. + * + * @param input payload to split. + * @return string iterator + */ + public static Iterator splitStringIterator(String input, char delimiter) { + return new Iterator() { + int currentPos = 0; + int indexOfDelimiter = input.indexOf(delimiter); + String peek = null; + + @Override + public boolean hasNext() { + if (peek == null) peek = advance(); + return peek != null; + } + + @Override + public String next() { + try { + if (peek == null) peek = advance(); + if (peek == null) throw new NoSuchElementException(); + return peek; + } finally { + peek = null; + } + } + + private String advance() { + String result = ""; + while ("".equals(result)) { + result = internalNext(); + } + return result; + } + + private String internalNext() { + if (indexOfDelimiter >= 0) { + try { + return input.substring(currentPos, indexOfDelimiter); + } finally { + currentPos = indexOfDelimiter + 1; + indexOfDelimiter = input.indexOf(delimiter, currentPos); + } + } else if (indexOfDelimiter == -1) { + indexOfDelimiter = Integer.MIN_VALUE; + return input.substring(currentPos); + } else { + return null; + } + } + }; + } + /** * Join a batch of strings into a payload string. * diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java index e5d429733..6d9ce052c 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java @@ -2,7 +2,9 @@ import com.wavefront.agent.auth.TokenAuthenticator; import com.wavefront.agent.channel.HealthCheckManager; +import com.wavefront.agent.formatter.DataFormat; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import io.netty.channel.ChannelHandler; @@ -13,7 +15,7 @@ import static com.wavefront.agent.channel.ChannelUtils.errorMessageWithRootCause; import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse; -import static com.wavefront.agent.handlers.LineDelimitedUtils.splitPushData; +import static com.wavefront.agent.handlers.LineDelimitedUtils.splitStringIterator; /** * Base class for all line-based protocols. Supports TCP line protocol as well as HTTP POST @@ -44,9 +46,9 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, StringBuilder output = new StringBuilder(); HttpResponseStatus status; try { - for (String line : splitPushData(request.content().toString(CharsetUtil.UTF_8))) { - processLine(ctx, line.trim()); - } + DataFormat format = getFormat(request); + splitStringIterator(request.content().toString(CharsetUtil.UTF_8), '\n'). + forEachRemaining(line -> processLine(ctx, line.trim(), format)); status = HttpResponseStatus.ACCEPTED; } catch (Exception e) { status = HttpResponseStatus.BAD_REQUEST; @@ -58,7 +60,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, /** * Handles an incoming plain text (string) message. By default simply passes a string to - * {@link #processLine(ChannelHandlerContext, String)} method. + * {@link #processLine(ChannelHandlerContext, String, DataFormat)} method. */ @Override protected void handlePlainTextMessage(final ChannelHandlerContext ctx, @@ -66,14 +68,28 @@ protected void handlePlainTextMessage(final ChannelHandlerContext ctx, if (message == null) { throw new IllegalArgumentException("Message cannot be null"); } - processLine(ctx, message.trim()); + String trimmedMessage = message.trim(); + if (trimmedMessage.isEmpty()) return; + processLine(ctx, trimmedMessage, null); } + /** + * Detect data format for an incoming HTTP request, if possible. + * + * @param httpRequest http request. + * @return Detected data format or null if unknown. + */ + @Nullable + protected abstract DataFormat getFormat(FullHttpRequest httpRequest); + /** * Process a single line for a line-based stream. * * @param ctx Channel handler context. * @param message Message to process. + * @param format Data format, if known */ - protected abstract void processLine(final ChannelHandlerContext ctx, final String message); + protected abstract void processLine(final ChannelHandlerContext ctx, + @Nonnull final String message, + @Nullable DataFormat format); } diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java index 3957abfb1..da4c61b98 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java @@ -15,7 +15,6 @@ import org.apache.http.client.utils.URLEncodedUtils; import java.io.IOException; -import java.net.URI; import java.net.URISyntaxException; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -146,11 +145,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.log(Level.WARNING, "Unexpected error: ", cause); } - protected String extractToken(final FullHttpRequest request) throws URISyntaxException { - URI requestUri = new URI(request.uri()); + protected String extractToken(final FullHttpRequest request) { String token = firstNonNull(request.headers().getAsString("X-AUTH-TOKEN"), request.headers().getAsString("Authorization"), "").replaceAll("^Bearer ", "").trim(); - Optional tokenParam = URLEncodedUtils.parse(requestUri, CharsetUtil.UTF_8). + Optional tokenParam = URLEncodedUtils.parse(request.uri(), CharsetUtil.UTF_8). stream().filter(x -> x.getName().equals("t") || x.getName().equals("token") || x.getName().equals("api_key")).findFirst(); if (tokenParam.isPresent()) { @@ -159,8 +157,7 @@ protected String extractToken(final FullHttpRequest request) throws URISyntaxExc return token; } - protected boolean authorized(final ChannelHandlerContext ctx, final FullHttpRequest request) - throws URISyntaxException { + protected boolean authorized(final ChannelHandlerContext ctx, final FullHttpRequest request) { if (tokenAuthenticator.authRequired()) { String token = extractToken(request); if (!tokenAuthenticator.authorize(token)) { // 401 if no token or auth fails diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/FeatureCheckUtils.java b/proxy/src/main/java/com/wavefront/agent/listeners/FeatureCheckUtils.java new file mode 100644 index 000000000..fec44f473 --- /dev/null +++ b/proxy/src/main/java/com/wavefront/agent/listeners/FeatureCheckUtils.java @@ -0,0 +1,87 @@ +package com.wavefront.agent.listeners; + +import javax.annotation.Nullable; +import java.util.function.Supplier; +import java.util.logging.Logger; + +import org.apache.commons.lang3.StringUtils; + +import com.wavefront.common.MessageDedupingLogger; +import com.yammer.metrics.core.Counter; + +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.util.CharsetUtil; + +/** + * Constants and utility methods for validating feature subscriptions. + * + * @author vasily@wavefront.com + */ +public abstract class FeatureCheckUtils { + private static final Logger logger = Logger.getLogger(FeatureCheckUtils.class.getCanonicalName()); + + private static final Logger featureDisabledLogger = new MessageDedupingLogger(logger, 3, 0.2); + public static final String HISTO_DISABLED = "Ingested point discarded because histogram " + + "feature has not been enabled for your account"; + public static final String SPAN_DISABLED = "Ingested span discarded because distributed " + + "tracing feature has not been enabled for your account."; + public static final String SPANLOGS_DISABLED = "Ingested span log discarded because " + + "this feature has not been enabled for your account."; + + /** + * Check whether feature disabled flag is set, log a warning message, increment the counter by 1. + * + * @param featureDisabledFlag Supplier for feature disabled flag. + * @param message Warning message to log if feature is disabled. + * @param discardedCounter Optional counter for discarded items. + * @return true if feature is disabled + */ + public static boolean isFeatureDisabled(Supplier featureDisabledFlag, + String message, @Nullable Counter discardedCounter) { + return isFeatureDisabled(featureDisabledFlag, message, discardedCounter, null, null); + } + + /** + * Check whether feature disabled flag is set, log a warning message, increment the counter by 1. + * + * @param featureDisabledFlag Supplier for feature disabled flag. + * @param message Warning message to log if feature is disabled. + * @param discardedCounter Optional counter for discarded items. + * @param output Optional stringbuilder for messages + * @return true if feature is disabled + */ + public static boolean isFeatureDisabled(Supplier featureDisabledFlag, String message, + @Nullable Counter discardedCounter, + @Nullable StringBuilder output) { + return isFeatureDisabled(featureDisabledFlag, message, discardedCounter, output, null); + } + + /** + * Check whether feature disabled flag is set, log a warning message, increment the counter + * either by 1 or by number of \n characters in request payload, if provided. + * + * @param featureDisabledFlag Supplier for feature disabled flag. + * @param message Warning message to log if feature is disabled. + * @param discardedCounter Optional counter for discarded items. + * @param output Optional stringbuilder for messages + * @param request Optional http request to use payload size + * @return true if feature is disabled + */ + public static boolean isFeatureDisabled(Supplier featureDisabledFlag, String message, + @Nullable Counter discardedCounter, + @Nullable StringBuilder output, + @Nullable FullHttpRequest request) { + if (featureDisabledFlag.get()) { + featureDisabledLogger.warning(message); + if (output != null) { + output.append(message); + } + if (discardedCounter != null) { + discardedCounter.inc(request == null ? 1 : + StringUtils.countMatches(request.content().toString(CharsetUtil.UTF_8), "\n") + 1); + } + return true; + } + return false; + } +} diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/RawLogsIngesterPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/RawLogsIngesterPortUnificationHandler.java index 75aa157d9..38ca58ded 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/RawLogsIngesterPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/RawLogsIngesterPortUnificationHandler.java @@ -4,6 +4,7 @@ import com.wavefront.agent.auth.TokenAuthenticator; import com.wavefront.agent.channel.HealthCheckManager; +import com.wavefront.agent.formatter.DataFormat; import com.wavefront.agent.logsharvesting.LogsIngester; import com.wavefront.agent.logsharvesting.LogsMessage; import com.wavefront.agent.preprocessor.ReportableEntityPreprocessor; @@ -25,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.FullHttpRequest; import static com.wavefront.agent.channel.ChannelUtils.getRemoteAddress; @@ -80,10 +82,16 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { super.exceptionCaught(ctx, cause); } + @Nullable + @Override + protected DataFormat getFormat(FullHttpRequest httpRequest) { + return null; + } + @VisibleForTesting @Override - public void processLine(final ChannelHandlerContext ctx, String message) { - if (message.isEmpty()) return; + public void processLine(final ChannelHandlerContext ctx, @Nonnull String message, + @Nullable DataFormat format) { received.inc(); ReportableEntityPreprocessor preprocessor = preprocessorSupplier == null ? null : preprocessorSupplier.get(); diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java index 868864be9..7fcf4d875 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java @@ -1,12 +1,9 @@ package com.wavefront.agent.listeners; -import com.google.common.collect.Lists; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.wavefront.common.MessageDedupingLogger; import com.wavefront.common.Utils; import com.wavefront.agent.auth.TokenAuthenticator; import com.wavefront.agent.channel.HealthCheckManager; @@ -24,12 +21,10 @@ import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.MetricName; +import org.apache.commons.lang3.StringUtils; import org.apache.http.NameValuePair; -import org.apache.http.client.utils.URLEncodedUtils; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,8 +44,14 @@ import static com.wavefront.agent.channel.ChannelUtils.formatErrorMessage; import static com.wavefront.agent.channel.ChannelUtils.errorMessageWithRootCause; +import static com.wavefront.agent.channel.ChannelUtils.getPath; +import static com.wavefront.agent.channel.ChannelUtils.getQueryParams; import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse; -import static com.wavefront.agent.handlers.LineDelimitedUtils.splitPushData; +import static com.wavefront.agent.handlers.LineDelimitedUtils.splitStringIterator; +import static com.wavefront.agent.listeners.FeatureCheckUtils.HISTO_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.isFeatureDisabled; import static com.wavefront.agent.listeners.WavefrontPortUnificationHandler.preprocessAndHandlePoint; /** @@ -67,13 +68,6 @@ public class RelayPortUnificationHandler extends AbstractHttpOnlyHandler { private static final Logger logger = Logger.getLogger( RelayPortUnificationHandler.class.getCanonicalName()); - private static final Logger featureDisabledLogger = new MessageDedupingLogger(logger, 3, 0.2); - private static final String ERROR_HISTO_DISABLED = "Ingested point discarded because histogram " + - "feature has not been enabled for your account"; - private static final String ERROR_SPAN_DISABLED = "Ingested span discarded because distributed " + - "tracing feature has not been enabled for your account."; - private static final String ERROR_SPANLOGS_DISABLED = "Ingested span log discarded because " + - "this feature has not been enabled for your account."; private static final ObjectMapper JSON_PARSER = new ObjectMapper(); @@ -145,10 +139,9 @@ public RelayPortUnificationHandler( @Override protected void handleHttpMessage(final ChannelHandlerContext ctx, - final FullHttpRequest request) throws URISyntaxException { + final FullHttpRequest request) { StringBuilder output = new StringBuilder(); - URI uri = new URI(request.uri()); - String path = uri.getPath(); + String path = getPath(request.uri()); final boolean isDirectIngestion = path.startsWith("/report"); if (path.endsWith("/checkin") && (path.startsWith("/api/daemon") || path.contains("wfproxy"))) { // simulate checkin response for proxy chaining @@ -158,6 +151,9 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, writeHttpResponse(ctx, HttpResponseStatus.OK, jsonResponse, request); return; } + String format = getQueryParams(request.uri()).stream(). + filter(x -> x.getName().equals("format") || x.getName().equals("f")). + map(NameValuePair::getValue).findFirst().orElse(Constants.PUSH_FORMAT_WAVEFRONT); // Return HTTP 200 (OK) for payloads received on the proxy endpoint // Return HTTP 202 (ACCEPTED) for payloads received on the DDI endpoint @@ -170,20 +166,14 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, } else { okStatus = HttpResponseStatus.NO_CONTENT; } - String format = URLEncodedUtils.parse(uri, CharsetUtil.UTF_8).stream(). - filter(x -> x.getName().equals("format") || x.getName().equals("f")). - map(NameValuePair::getValue).findFirst().orElse(Constants.PUSH_FORMAT_WAVEFRONT); - String[] lines = splitPushData(request.content().toString(CharsetUtil.UTF_8)); + String payload = request.content().toString(CharsetUtil.UTF_8); HttpResponseStatus status; - switch (format) { case Constants.PUSH_FORMAT_HISTOGRAM: - if (histogramDisabled.get()) { - discardedHistograms.get().inc(lines.length); + if (isFeatureDisabled(histogramDisabled, HISTO_DISABLED, discardedHistograms.get(), + output, request)) { status = HttpResponseStatus.FORBIDDEN; - featureDisabledLogger.info(ERROR_HISTO_DISABLED); - output.append(ERROR_HISTO_DISABLED); break; } case Constants.PUSH_FORMAT_WAVEFRONT: @@ -194,7 +184,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, ReportableEntityDecoder histogramDecoder = (ReportableEntityDecoder) decoders. get(ReportableEntityType.HISTOGRAM); - Arrays.stream(lines).forEach(line -> { + splitStringIterator(payload, '\n').forEachRemaining(line -> { String message = line.trim(); if (message.isEmpty()) return; DataFormat dataFormat = DataFormat.autodetect(message); @@ -208,10 +198,8 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, "sourceTag-formatted data!"); break; case HISTOGRAM: - if (histogramDisabled.get()) { - discardedHistograms.get().inc(lines.length); - featureDisabledLogger.info(ERROR_HISTO_DISABLED); - output.append(ERROR_HISTO_DISABLED); + if (isFeatureDisabled(histogramDisabled, HISTO_DISABLED, + discardedHistograms.get(), output)) { break; } preprocessAndHandlePoint(message, histogramDecoder, histogramHandlerSupplier.get(), @@ -236,20 +224,18 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, } break; case Constants.PUSH_FORMAT_TRACING: - if (traceDisabled.get()) { - discardedSpans.get().inc(lines.length); + if (isFeatureDisabled(traceDisabled, SPAN_DISABLED, discardedSpans.get(), output, + request)) { status = HttpResponseStatus.FORBIDDEN; - featureDisabledLogger.info(ERROR_SPAN_DISABLED); - output.append(ERROR_SPAN_DISABLED); break; } - List spans = Lists.newArrayListWithCapacity(lines.length); + List spans = new ArrayList<>(); //noinspection unchecked ReportableEntityDecoder spanDecoder = (ReportableEntityDecoder) decoders. get(ReportableEntityType.TRACE); ReportableEntityHandler spanHandler = spanHandlerSupplier.get(); - Arrays.stream(lines).forEach(line -> { + splitStringIterator(payload, '\n').forEachRemaining(line -> { try { spanDecoder.decode(line, spans, "dummy"); } catch (Exception e) { @@ -260,20 +246,18 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, status = okStatus; break; case Constants.PUSH_FORMAT_TRACING_SPAN_LOGS: - if (spanLogsDisabled.get()) { - discardedSpanLogs.get().inc(lines.length); + if (isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, discardedSpanLogs.get(), + output, request)) { status = HttpResponseStatus.FORBIDDEN; - featureDisabledLogger.info(ERROR_SPANLOGS_DISABLED); - output.append(ERROR_SPANLOGS_DISABLED); break; } - List spanLogs = Lists.newArrayListWithCapacity(lines.length); + List spanLogs = new ArrayList<>(); //noinspection unchecked ReportableEntityDecoder spanLogDecoder = (ReportableEntityDecoder) decoders. get(ReportableEntityType.TRACE_SPAN_LOGS); ReportableEntityHandler spanLogsHandler = spanLogsHandlerSupplier.get(); - Arrays.stream(lines).forEach(line -> { + splitStringIterator(payload, '\n').forEachRemaining(line -> { try { spanLogDecoder.decode(JSON_PARSER.readTree(line), spanLogs, "dummy"); } catch (Exception e) { diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java index b4a594e8f..764985680 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java @@ -1,5 +1,7 @@ package com.wavefront.agent.listeners; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.wavefront.common.Utils; import com.wavefront.agent.auth.TokenAuthenticator; import com.wavefront.agent.channel.HealthCheckManager; @@ -12,21 +14,41 @@ import com.wavefront.data.ReportableEntityType; import com.wavefront.dto.SourceTag; import com.wavefront.ingester.ReportableEntityDecoder; +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.MetricName; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.Supplier; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.http.NameValuePair; + import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; import wavefront.report.ReportEvent; import wavefront.report.ReportPoint; import wavefront.report.ReportSourceTag; +import wavefront.report.Span; +import wavefront.report.SpanLogs; import static com.wavefront.agent.channel.ChannelUtils.formatErrorMessage; +import static com.wavefront.agent.channel.ChannelUtils.getQueryParams; +import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse; +import static com.wavefront.agent.formatter.DataFormat.HISTOGRAM; +import static com.wavefront.agent.formatter.DataFormat.SPAN; +import static com.wavefront.agent.formatter.DataFormat.SPAN_LOG; +import static com.wavefront.agent.listeners.FeatureCheckUtils.HISTO_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.isFeatureDisabled; +import static com.wavefront.agent.listeners.tracing.TracePortUnificationHandler.preprocessAndHandleSpan; /** * Process incoming Wavefront-formatted data. Also allows sourceTag formatted data and @@ -39,6 +61,7 @@ */ @ChannelHandler.Sharable public class WavefrontPortUnificationHandler extends AbstractLineDelimitedHandler { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Nullable private final SharedGraphiteHostAnnotator annotator; @@ -48,11 +71,22 @@ public class WavefrontPortUnificationHandler extends AbstractLineDelimitedHandle private final ReportableEntityDecoder sourceTagDecoder; private final ReportableEntityDecoder eventDecoder; private final ReportableEntityDecoder histogramDecoder; + private final ReportableEntityDecoder spanDecoder; + private final ReportableEntityDecoder spanLogsDecoder; private final ReportableEntityHandler wavefrontHandler; private final Supplier> histogramHandlerSupplier; private final Supplier> sourceTagHandlerSupplier; + private final Supplier> spanHandlerSupplier; + private final Supplier> spanLogsHandlerSupplier; private final Supplier> eventHandlerSupplier; + private final Supplier histogramDisabled; + private final Supplier traceDisabled; + private final Supplier spanLogsDisabled; + + private final Supplier discardedHistograms; + private final Supplier discardedSpans; + private final Supplier discardedSpanLogs; /** * Create new instance with lazy initialization for handlers. * @@ -62,7 +96,10 @@ public class WavefrontPortUnificationHandler extends AbstractLineDelimitedHandle * @param decoders decoders. * @param handlerFactory factory for ReportableEntityHandler objects. * @param annotator hostAnnotator that makes sure all points have a source= tag. - * @param preprocessor preprocessor. + * @param preprocessor preprocessor supplier. + * @param histogramDisabled supplier for backend-controlled feature flag for histograms. + * @param traceDisabled supplier for backend-controlled feature flag for spans. + * @param spanLogsDisabled supplier for backend-controlled feature flag for span logs. */ @SuppressWarnings("unchecked") public WavefrontPortUnificationHandler( @@ -71,7 +108,9 @@ public WavefrontPortUnificationHandler( final Map> decoders, final ReportableEntityHandlerFactory handlerFactory, @Nullable final SharedGraphiteHostAnnotator annotator, - @Nullable final Supplier preprocessor) { + @Nullable final Supplier preprocessor, + final Supplier histogramDisabled, final Supplier traceDisabled, + final Supplier spanLogsDisabled) { super(tokenAuthenticator, healthCheckManager, handle); this.wavefrontDecoder = (ReportableEntityDecoder) decoders. get(ReportableEntityType.POINT); @@ -82,14 +121,54 @@ public WavefrontPortUnificationHandler( get(ReportableEntityType.HISTOGRAM); this.sourceTagDecoder = (ReportableEntityDecoder) decoders. get(ReportableEntityType.SOURCE_TAG); + this.spanDecoder = (ReportableEntityDecoder) decoders. + get(ReportableEntityType.TRACE); + this.spanLogsDecoder = (ReportableEntityDecoder) decoders. + get(ReportableEntityType.TRACE_SPAN_LOGS); this.eventDecoder = (ReportableEntityDecoder) decoders. get(ReportableEntityType.EVENT); this.histogramHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler( HandlerKey.of(ReportableEntityType.HISTOGRAM, handle))); this.sourceTagHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler( HandlerKey.of(ReportableEntityType.SOURCE_TAG, handle))); + this.spanHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler( + HandlerKey.of(ReportableEntityType.TRACE, handle))); + this.spanLogsHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler( + HandlerKey.of(ReportableEntityType.TRACE_SPAN_LOGS, handle))); this.eventHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler( HandlerKey.of(ReportableEntityType.EVENT, handle))); + this.histogramDisabled = histogramDisabled; + this.traceDisabled = traceDisabled; + this.spanLogsDisabled = spanLogsDisabled; + this.discardedHistograms = Utils.lazySupplier(() -> Metrics.newCounter(new MetricName( + "histogram", "", "discarded_points"))); + this.discardedSpans = Utils.lazySupplier(() -> Metrics.newCounter(new MetricName( + "spans." + handle, "", "discarded"))); + this.discardedSpanLogs = Utils.lazySupplier(() -> Metrics.newCounter(new MetricName( + "spanLogs." + handle, "", "discarded"))); + } + + @Override + protected DataFormat getFormat(FullHttpRequest httpRequest) { + return DataFormat.parse(getQueryParams(httpRequest.uri()).stream(). + filter(x -> x.getName().equals("format") || x.getName().equals("f")). + map(NameValuePair::getValue).findFirst().orElse(null)); + } + + @Override + protected void handleHttpMessage(ChannelHandlerContext ctx, FullHttpRequest request) { + StringBuilder out = new StringBuilder(); + DataFormat format = getFormat(request); + if ((format == HISTOGRAM && isFeatureDisabled(histogramDisabled, HISTO_DISABLED, + discardedHistograms.get(), out, request)) || + (format == SPAN && isFeatureDisabled(traceDisabled, SPAN_DISABLED, + discardedSpans.get(), out, request)) || + (format == SPAN_LOG && isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, + discardedSpanLogs.get(), out, request))) { + writeHttpResponse(ctx, HttpResponseStatus.FORBIDDEN, out, request); + return; + } + super.handleHttpMessage(ctx, request); } /** @@ -98,9 +177,9 @@ public WavefrontPortUnificationHandler( * @param message line being processed */ @Override - protected void processLine(final ChannelHandlerContext ctx, String message) { - if (message.isEmpty()) return; - DataFormat dataFormat = DataFormat.autodetect(message); + protected void processLine(final ChannelHandlerContext ctx, @Nonnull String message, + @Nullable DataFormat format) { + DataFormat dataFormat = format == null ? DataFormat.autodetect(message) : format; switch (dataFormat) { case SOURCE_TAG: ReportableEntityHandler sourceTagHandler = @@ -138,7 +217,37 @@ protected void processLine(final ChannelHandlerContext ctx, String message) { "\"", e, ctx)); } return; + case SPAN: + ReportableEntityHandler spanHandler = spanHandlerSupplier.get(); + if (spanHandler == null || spanDecoder == null) { + wavefrontHandler.reject(message, "Port is not configured to accept " + + "tracing data (spans)!"); + return; + } + message = annotator == null ? message : annotator.apply(ctx, message); + preprocessAndHandleSpan(message, spanDecoder, spanHandler, spanHandler::report, + preprocessorSupplier, ctx, true, x -> true); + return; + case SPAN_LOG: + if (isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, discardedSpanLogs.get())) return; + ReportableEntityHandler spanLogsHandler = spanLogsHandlerSupplier.get(); + if (spanLogsHandler == null || spanLogsDecoder == null) { + wavefrontHandler.reject(message, "Port is not configured to accept " + + "tracing data (span logs)!"); + return; + } + try { + List spanLogs = new ArrayList<>(1); + spanLogsDecoder.decode(OBJECT_MAPPER.readTree(message), spanLogs, "dummy"); + for (SpanLogs object : spanLogs) { + spanLogsHandler.report(object); + } + } catch (Exception e) { + spanLogsHandler.reject(message, formatErrorMessage(message, e, ctx)); + } + return; case HISTOGRAM: + if (isFeatureDisabled(histogramDisabled, HISTO_DISABLED, discardedHistograms.get())) return; ReportableEntityHandler histogramHandler = histogramHandlerSupplier.get(); if (histogramHandler == null || histogramDecoder == null) { wavefrontHandler.reject(message, "Port is not configured to accept " + diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/JaegerThriftUtils.java b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/JaegerThriftUtils.java index c8d7b8851..3be4b8e97 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/JaegerThriftUtils.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/JaegerThriftUtils.java @@ -3,7 +3,6 @@ import com.google.common.collect.ImmutableSet; import com.wavefront.agent.handlers.ReportableEntityHandler; import com.wavefront.agent.preprocessor.ReportableEntityPreprocessor; -import com.wavefront.common.MessageDedupingLogger; import com.wavefront.common.TraceConstants; import com.wavefront.internal.reporter.WavefrontInternalReporter; import com.wavefront.sdk.entities.tracing.sampling.Sampler; @@ -33,6 +32,9 @@ import java.util.logging.Logger; import java.util.stream.Collectors; +import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.isFeatureDisabled; import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.DEBUG_SPAN_TAG_KEY; import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_KEY; import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_VAL; @@ -53,7 +55,6 @@ public abstract class JaegerThriftUtils { protected static final Logger logger = Logger.getLogger(JaegerThriftUtils.class.getCanonicalName()); - private static final Logger featureDisabledLogger = new MessageDedupingLogger(logger, 2, 0.2); // TODO: support sampling private final static Set IGNORE_TAGS = ImmutableSet.of("sampler.type", "sampler.param"); @@ -112,15 +113,8 @@ public static void processBatch(Batch batch, } } } - if (traceDisabled.get()) { - featureDisabledLogger.info("Ingested spans discarded because tracing feature is not " + - "enabled on the server"); - discardedBatches.inc(); + if (isFeatureDisabled(traceDisabled, SPAN_DISABLED, discardedBatches, output)) { discardedTraces.inc(batch.getSpansSize()); - if (output != null) { - output.append("Ingested spans discarded because tracing feature is not enabled on the " + - "server."); - } return; } for (io.jaegertracing.thriftjava.Span span : batch.getSpans()) { @@ -274,43 +268,39 @@ private static void processSpan(io.jaegertracing.thriftjava.Span span, if (isForceSampled || isDebugSpanTag || (alwaysSampleErrors && isError) || sample(wavefrontSpan, sampler, discardedSpansBySampler)) { spanHandler.report(wavefrontSpan); - if (span.getLogs() != null && !span.getLogs().isEmpty()) { - if (spanLogsDisabled.get()) { - featureDisabledLogger.info("Span logs discarded because the feature is not " + - "enabled on the server!"); - } else { - SpanLogs spanLogs = SpanLogs.newBuilder(). - setCustomer("default"). - setTraceId(wavefrontSpan.getTraceId()). - setSpanId(wavefrontSpan.getSpanId()). - setLogs(span.getLogs().stream().map(x -> { - Map fields = new HashMap<>(x.fields.size()); - x.fields.forEach(t -> { - switch (t.vType) { - case STRING: - fields.put(t.getKey(), t.getVStr()); - break; - case BOOL: - fields.put(t.getKey(), String.valueOf(t.isVBool())); - break; - case LONG: - fields.put(t.getKey(), String.valueOf(t.getVLong())); - break; - case DOUBLE: - fields.put(t.getKey(), String.valueOf(t.getVDouble())); - break; - case BINARY: - // ignore - default: - } - }); - return SpanLog.newBuilder(). - setTimestamp(x.timestamp). - setFields(fields). - build(); - }).collect(Collectors.toList())).build(); - spanLogsHandler.report(spanLogs); - } + if (span.getLogs() != null && !span.getLogs().isEmpty() && + !isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, null)) { + SpanLogs spanLogs = SpanLogs.newBuilder(). + setCustomer("default"). + setTraceId(wavefrontSpan.getTraceId()). + setSpanId(wavefrontSpan.getSpanId()). + setLogs(span.getLogs().stream().map(x -> { + Map fields = new HashMap<>(x.fields.size()); + x.fields.forEach(t -> { + switch (t.vType) { + case STRING: + fields.put(t.getKey(), t.getVStr()); + break; + case BOOL: + fields.put(t.getKey(), String.valueOf(t.isVBool())); + break; + case LONG: + fields.put(t.getKey(), String.valueOf(t.getVLong())); + break; + case DOUBLE: + fields.put(t.getKey(), String.valueOf(t.getVDouble())); + break; + case BINARY: + // ignore + default: + } + }); + return SpanLog.newBuilder(). + setTimestamp(x.timestamp). + setFields(fields). + build(); + }).collect(Collectors.toList())).build(); + spanLogsHandler.report(spanLogs); } } // report stats irrespective of span sampling. diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java index 1a79cd449..c259719f4 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java @@ -6,12 +6,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.wavefront.agent.auth.TokenAuthenticator; import com.wavefront.agent.channel.HealthCheckManager; +import com.wavefront.agent.formatter.DataFormat; import com.wavefront.agent.handlers.HandlerKey; import com.wavefront.agent.handlers.ReportableEntityHandler; import com.wavefront.agent.handlers.ReportableEntityHandlerFactory; import com.wavefront.agent.listeners.AbstractLineDelimitedHandler; import com.wavefront.agent.preprocessor.ReportableEntityPreprocessor; -import com.wavefront.common.MessageDedupingLogger; import com.wavefront.data.ReportableEntityType; import com.wavefront.ingester.ReportableEntityDecoder; import com.wavefront.sdk.entities.tracing.sampling.Sampler; @@ -30,13 +30,20 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; + import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import wavefront.report.ReportPoint; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.util.CharsetUtil; import wavefront.report.Span; import wavefront.report.SpanLogs; import static com.wavefront.agent.channel.ChannelUtils.formatErrorMessage; +import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.isFeatureDisabled; import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_KEY; import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_VAL; @@ -52,7 +59,6 @@ public class TracePortUnificationHandler extends AbstractLineDelimitedHandler { private static final Logger logger = Logger.getLogger( TracePortUnificationHandler.class.getCanonicalName()); - private static final Logger featureDisabledLogger = new MessageDedupingLogger(logger, 2, 0.2); private static final ObjectMapper JSON_PARSER = new ObjectMapper(); @@ -110,20 +116,20 @@ public TracePortUnificationHandler( "sampler.discarded")); } + @Nullable @Override - protected void processLine(final ChannelHandlerContext ctx, @Nonnull String message) { - if (traceDisabled.get()) { - featureDisabledLogger.warning("Ingested spans discarded because tracing feature is not " + - "enabled on the server"); - discardedSpans.inc(); - return; - } - if (message.startsWith("{") && message.endsWith("}")) { // span logs - if (spanLogsDisabled.get()) { - featureDisabledLogger.warning("Ingested span logs discarded because the feature is not " + - "enabled on the server"); - return; - } + protected DataFormat getFormat(FullHttpRequest httpRequest) { + return DataFormat.parse(URLEncodedUtils.parse(httpRequest.uri(), CharsetUtil.UTF_8).stream(). + filter(x -> x.getName().equals("format") || x.getName().equals("f")). + map(NameValuePair::getValue).findFirst().orElse(null)); + } + + @Override + protected void processLine(final ChannelHandlerContext ctx, @Nonnull String message, + @Nullable DataFormat format) { + if (isFeatureDisabled(traceDisabled, SPAN_DISABLED, discardedSpans)) return; + if (format == DataFormat.SPAN_LOG || (message.startsWith("{") && message.endsWith("}"))) { + if (isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, null)) return; try { List output = new ArrayList<>(1); spanLogsDecoder.decode(JSON_PARSER.readTree(message), output, "dummy"); diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java index fc261d8dd..4cb663e88 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java @@ -5,7 +5,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.wavefront.common.MessageDedupingLogger; import com.wavefront.common.Utils; import com.wavefront.agent.auth.TokenAuthenticatorBuilder; import com.wavefront.agent.channel.HealthCheckManager; @@ -61,6 +60,9 @@ import static com.wavefront.agent.channel.ChannelUtils.errorMessageWithRootCause; import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse; +import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED; +import static com.wavefront.agent.listeners.FeatureCheckUtils.isFeatureDisabled; import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.DEBUG_SPAN_TAG_KEY; import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.DEBUG_SPAN_TAG_VAL; import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_KEY; @@ -85,7 +87,6 @@ public class ZipkinPortUnificationHandler extends AbstractHttpOnlyHandler implements Runnable, Closeable { private static final Logger logger = Logger.getLogger( ZipkinPortUnificationHandler.class.getCanonicalName()); - private static final Logger featureDisabledLogger = new MessageDedupingLogger(logger, 2, 0.2); private final ReportableEntityHandler spanHandler; private final ReportableEntityHandler spanLogsHandler; @@ -207,12 +208,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, StringBuilder output = new StringBuilder(); // Handle case when tracing is disabled, ignore reported spans. - if (traceDisabled.get()) { - featureDisabledLogger.info("Ingested spans discarded because tracing feature is not " + - "enabled on the server"); - discardedBatches.inc(); - output.append("Ingested spans discarded because tracing feature is not enabled on the " + - "server."); + if (isFeatureDisabled(traceDisabled, SPAN_DISABLED, discardedBatches, output)) { status = HttpResponseStatus.ACCEPTED; writeHttpResponse(ctx, status, output, request); return; @@ -390,26 +386,22 @@ private void processZipkinSpan(zipkin2.Span zipkinSpan) { if (isDebugSpanTag || isDebug || (alwaysSampleErrors && isError) || sample(wavefrontSpan)) { spanHandler.report(wavefrontSpan); - if (zipkinSpan.annotations() != null && !zipkinSpan.annotations().isEmpty()) { - if (spanLogsDisabled.get()) { - featureDisabledLogger.info("Span logs discarded because the feature is not " + - "enabled on the server!"); - } else { - SpanLogs spanLogs = SpanLogs.newBuilder(). - setCustomer("default"). - setTraceId(wavefrontSpan.getTraceId()). - setSpanId(wavefrontSpan.getSpanId()). - setSpanSecondaryId(zipkinSpan.kind() != null ? - zipkinSpan.kind().toString().toLowerCase() : null). - setLogs(zipkinSpan.annotations().stream().map( - x -> SpanLog.newBuilder(). - setTimestamp(x.timestamp()). - setFields(ImmutableMap.of("annotation", x.value())). - build()). - collect(Collectors.toList())). - build(); - spanLogsHandler.report(spanLogs); - } + if (zipkinSpan.annotations() != null && !zipkinSpan.annotations().isEmpty() && + !isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, null)) { + SpanLogs spanLogs = SpanLogs.newBuilder(). + setCustomer("default"). + setTraceId(wavefrontSpan.getTraceId()). + setSpanId(wavefrontSpan.getSpanId()). + setSpanSecondaryId(zipkinSpan.kind() != null ? + zipkinSpan.kind().toString().toLowerCase() : null). + setLogs(zipkinSpan.annotations().stream().map( + x -> SpanLog.newBuilder(). + setTimestamp(x.timestamp()). + setFields(ImmutableMap.of("annotation", x.value())). + build()). + collect(Collectors.toList())). + build(); + spanLogsHandler.report(spanLogs); } } // report stats irrespective of span sampling. diff --git a/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java b/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java index 38358021a..1738b1f89 100644 --- a/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java +++ b/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java @@ -71,6 +71,7 @@ import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.startsWith; import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -360,6 +361,197 @@ public void testWavefrontUnifiedPortHandlerPlaintextUncompressedMixedDataPayload verifyWithTimeout(500, mockPointHandler, mockHistogramHandler, mockEventHandler); } + @Test + public void testWavefrontHandlerAsDDIEndpoint() throws Exception { + port = findAvailablePort(2978); + proxy.proxyConfig.pushListenerPorts = String.valueOf(port); + proxy.proxyConfig.dataBackfillCutoffHours = 8640; + proxy.startGraphiteListener(proxy.proxyConfig.getPushListenerPorts(), mockHandlerFactory, + null); + waitUntilListenerIsOnline(port); + String traceId = UUID.randomUUID().toString(); + long timestamp1 = startTime * 1000000 + 12345; + long timestamp2 = startTime * 1000000 + 23456; + + String payloadStr = "metric4.test 0 " + startTime + " source=test1\n" + + "metric4.test 1 " + (startTime + 1) + " source=test2\n" + + "metric4.test 2 " + (startTime + 2) + " source=test3"; // note the lack of newline at the end! + String histoData = "!M " + startTime + " #5 10.0 #10 100.0 metric.test.histo source=test1\n" + + "!M " + (startTime + 60) + " #5 20.0 #6 30.0 #7 40.0 metric.test.histo source=test2"; + String spanData = "testSpanName parent=parent1 source=testsource spanId=testspanid " + + "traceId=\"" + traceId + "\" parent=parent2 " + startTime + " " + (startTime + 1); + String spanLogData = "{\"spanId\":\"testspanid\",\"traceId\":\"" + traceId + + "\",\"logs\":[{\"timestamp\":" + timestamp1 + + ",\"fields\":{\"key\":\"value\",\"key2\":\"value2\"}},{\"timestamp\":" + + timestamp2 + ",\"fields\":{\"key3\":\"value3\",\"key4\":\"value4\"}}]}\n"; + String mixedData = "@SourceTag action=save source=testSource newtag1 newtag2\n" + + "@Event " + startTime + " \"Event name for testing\" host=host1 host=host2 tag=tag1 " + + "severity=INFO multi=bar multi=baz\n" + + "!M " + (startTime + 60) + " #5 20.0 #6 30.0 #7 40.0 metric.test.histo source=test2\n" + + "metric4.test 0 " + startTime + " source=test1\n" + spanLogData; + + String invalidData = "{\"spanId\"}\n@SourceTag\n@Event\n!M #5\nmetric.name\n" + + "metric5.test 0 1234567890 source=test1\n"; + + + reset(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + mockPointHandler.report(ReportPoint.newBuilder().setTable("dummy"). + setMetric("metric4.test").setHost("test1").setTimestamp(startTime * 1000). + setValue(0.0d).build()); + expectLastCall().times(2); + mockPointHandler.report(ReportPoint.newBuilder().setTable("dummy"). + setMetric("metric4.test").setHost("test2").setTimestamp((startTime + 1) * 1000). + setValue(1.0d).build()); + expectLastCall().times(2); + mockPointHandler.report(ReportPoint.newBuilder().setTable("dummy"). + setMetric("metric4.test").setHost("test3").setTimestamp((startTime + 2) * 1000). + setValue(2.0d).build()); + expectLastCall().times(2); + replay(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + + assertEquals(202, gzippedHttpPost("http://localhost:" + port + "/report", payloadStr)); + assertEquals(202, gzippedHttpPost("http://localhost:" + port + + "/report?format=wavefront", payloadStr)); + verify(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + + reset(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + mockHistogramHandler.report(ReportPoint.newBuilder().setTable("dummy"). + setMetric("metric.test.histo").setHost("test1").setTimestamp(startTime * 1000).setValue( + Histogram.newBuilder() + .setType(HistogramType.TDIGEST) + .setDuration(60000) + .setBins(ImmutableList.of(10.0d, 100.0d)) + .setCounts(ImmutableList.of(5, 10)) + .build()) + .build()); + expectLastCall(); + mockHistogramHandler.report(ReportPoint.newBuilder().setTable("dummy"). + setMetric("metric.test.histo").setHost("test2").setTimestamp((startTime + 60) * 1000). + setValue(Histogram.newBuilder() + .setType(HistogramType.TDIGEST) + .setDuration(60000) + .setBins(ImmutableList.of(20.0d, 30.0d, 40.0d)) + .setCounts(ImmutableList.of(5, 6, 7)) + .build()) + .build()); + expectLastCall(); + replay(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + + assertEquals(202, gzippedHttpPost("http://localhost:" + port + + "/report?format=histogram", histoData)); + verify(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + + reset(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + mockTraceSpanLogsHandler.report(SpanLogs.newBuilder(). + setCustomer("dummy"). + setTraceId(traceId). + setSpanId("testspanid"). + setLogs(ImmutableList.of( + SpanLog.newBuilder(). + setTimestamp(timestamp1). + setFields(ImmutableMap.of("key", "value", "key2", "value2")). + build(), + SpanLog.newBuilder(). + setTimestamp(timestamp2). + setFields(ImmutableMap.of("key3", "value3", "key4", "value4")). + build() + )). + build()); + expectLastCall(); + mockTraceHandler.report(Span.newBuilder().setCustomer("dummy").setStartMillis(startTime * 1000) + .setDuration(1000) + .setName("testSpanName") + .setSource("testsource") + .setSpanId("testspanid") + .setTraceId(traceId) + .setAnnotations(ImmutableList.of(new Annotation("parent", "parent1"), + new Annotation("parent", "parent2"))) + .build()); + expectLastCall(); + replay(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + + assertEquals(202, gzippedHttpPost("http://localhost:" + port + + "/report?format=trace", spanData)); + assertEquals(202, gzippedHttpPost("http://localhost:" + port + + "/report?format=spanLogs", spanLogData)); + verify(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + + reset(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + mockSourceTagHandler.report(ReportSourceTag.newBuilder(). + setOperation(SourceOperationType.SOURCE_TAG).setAction(SourceTagAction.SAVE). + setSource("testSource").setAnnotations(ImmutableList.of("newtag1", "newtag2")).build()); + expectLastCall(); + mockEventHandler.report(ReportEvent.newBuilder().setStartTime(startTime * 1000). + setEndTime(startTime * 1000 + 1).setName("Event name for testing"). + setHosts(ImmutableList.of("host1", "host2")).setTags(ImmutableList.of("tag1")). + setAnnotations(ImmutableMap.of("severity", "INFO")). + setDimensions(ImmutableMap.of("multi", ImmutableList.of("bar", "baz"))).build()); + expectLastCall(); + mockPointHandler.report(ReportPoint.newBuilder().setTable("dummy"). + setMetric("metric4.test").setHost("test1").setTimestamp(startTime * 1000). + setValue(0.0d).build()); + expectLastCall(); + replay(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + + proxy.entityProps.get(ReportableEntityType.HISTOGRAM).setFeatureDisabled(true); + assertEquals(403, gzippedHttpPost("http://localhost:" + port + + "/report?format=histogram", histoData)); + proxy.entityProps.get(ReportableEntityType.TRACE).setFeatureDisabled(true); + assertEquals(403, gzippedHttpPost("http://localhost:" + port + + "/report?format=trace", spanData)); + proxy.entityProps.get(ReportableEntityType.TRACE_SPAN_LOGS).setFeatureDisabled(true); + assertEquals(403, gzippedHttpPost("http://localhost:" + port + + "/report?format=spanLogs", spanLogData)); + assertEquals(202, gzippedHttpPost("http://localhost:" + port + "/report", mixedData)); + verify(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + + reset(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + mockSourceTagHandler.report(ReportSourceTag.newBuilder(). + setOperation(SourceOperationType.SOURCE_TAG).setAction(SourceTagAction.SAVE). + setSource("testSource").setAnnotations(ImmutableList.of("newtag1", "newtag2")).build()); + expectLastCall(); + mockEventHandler.report(ReportEvent.newBuilder().setStartTime(startTime * 1000). + setEndTime(startTime * 1000 + 1).setName("Event name for testing"). + setHosts(ImmutableList.of("host1", "host2")).setTags(ImmutableList.of("tag1")). + setAnnotations(ImmutableMap.of("severity", "INFO")). + setDimensions(ImmutableMap.of("multi", ImmutableList.of("bar", "baz"))).build()); + expectLastCall(); + mockPointHandler.report(ReportPoint.newBuilder().setTable("dummy").setMetric("metric4.test"). + setHost("test1").setTimestamp(startTime * 1000).setValue(0.0d).build()); + expectLastCall(); + mockSourceTagHandler.reject(eq("@SourceTag"), anyString()); + expectLastCall(); + mockEventHandler.reject(eq("@Event"), anyString()); + expectLastCall(); + mockPointHandler.reject(eq("metric.name"), anyString()); + expectLastCall(); + mockPointHandler.reject(eq(ReportPoint.newBuilder().setTable("dummy").setMetric("metric5.test"). + setHost("test1").setTimestamp(1234567890000L).setValue(0.0d).build()), + startsWith("WF-402: Point outside of reasonable timeframe")); + expectLastCall(); + replay(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + + assertEquals(202, gzippedHttpPost("http://localhost:" + port + "/report", + mixedData + "\n" + invalidData)); + + verify(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler, + mockSourceTagHandler, mockEventHandler); + } + @Test public void testTraceUnifiedPortHandlerPlaintext() throws Exception { tracePort = findAvailablePort(3888); diff --git a/proxy/src/test/java/com/wavefront/agent/QueuedAgentServiceTest.java b/proxy/src/test/java/com/wavefront/agent/QueuedAgentServiceTest.java deleted file mode 100644 index e69de29bb..000000000 diff --git a/proxy/src/test/java/com/wavefront/agent/handlers/LineDelimitedUtilsTest.java b/proxy/src/test/java/com/wavefront/agent/handlers/LineDelimitedUtilsTest.java new file mode 100644 index 000000000..846f59c3a --- /dev/null +++ b/proxy/src/test/java/com/wavefront/agent/handlers/LineDelimitedUtilsTest.java @@ -0,0 +1,31 @@ +package com.wavefront.agent.handlers; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * @author vasily@wavefront.com + */ +public class LineDelimitedUtilsTest { + + @Test + public void testSplitStringIterator() { + assertArrayEquals(new String[] {"str1"}, + toArray(LineDelimitedUtils.splitStringIterator("str1", '\n'))); + assertArrayEquals(new String[] {"str1", "str2", "str3"}, + toArray(LineDelimitedUtils.splitStringIterator("str1\nstr2\nstr3", '\n'))); + assertArrayEquals(new String[] {"str1", "str2", "str3"}, + toArray(LineDelimitedUtils.splitStringIterator("\nstr1\nstr2\n\nstr3\n\n", '\n'))); + } + + private String[] toArray(Iterator iterator) { + List list = new ArrayList<>(); + iterator.forEachRemaining(list::add); + return list.toArray(new String[list.size()]); + } +} \ No newline at end of file diff --git a/proxy/src/test/java/com/wavefront/agent/logsharvesting/LogsIngesterTest.java b/proxy/src/test/java/com/wavefront/agent/logsharvesting/LogsIngesterTest.java index faa462adb..501360d7b 100644 --- a/proxy/src/test/java/com/wavefront/agent/logsharvesting/LogsIngesterTest.java +++ b/proxy/src/test/java/com/wavefront/agent/logsharvesting/LogsIngesterTest.java @@ -107,7 +107,7 @@ private void receiveRawLog(String log) { InetSocketAddress addr = InetSocketAddress.createUnresolved("testHost", 1234); EasyMock.expect(channel.remoteAddress()).andReturn(addr); EasyMock.replay(ctx, channel); - rawLogsIngesterUnderTest.processLine(ctx, log); + rawLogsIngesterUnderTest.processLine(ctx, log, null); EasyMock.verify(ctx, channel); } From 009763115571e8be63341031bc7e08b327b08849 Mon Sep 17 00:00:00 2001 From: basilisk487 Date: Tue, 17 Mar 2020 12:26:23 -0500 Subject: [PATCH 2/5] Avoid unnecessary double conversion to string --- .../agent/listeners/RelayPortUnificationHandler.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java index 7fcf4d875..f2171eb71 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java @@ -167,7 +167,6 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, okStatus = HttpResponseStatus.NO_CONTENT; } - String payload = request.content().toString(CharsetUtil.UTF_8); HttpResponseStatus status; switch (format) { case Constants.PUSH_FORMAT_HISTOGRAM: @@ -184,6 +183,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, ReportableEntityDecoder histogramDecoder = (ReportableEntityDecoder) decoders. get(ReportableEntityType.HISTOGRAM); + String payload = request.content().toString(CharsetUtil.UTF_8); splitStringIterator(payload, '\n').forEachRemaining(line -> { String message = line.trim(); if (message.isEmpty()) return; @@ -235,6 +235,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, (ReportableEntityDecoder) decoders. get(ReportableEntityType.TRACE); ReportableEntityHandler spanHandler = spanHandlerSupplier.get(); + String payload = request.content().toString(CharsetUtil.UTF_8); splitStringIterator(payload, '\n').forEachRemaining(line -> { try { spanDecoder.decode(line, spans, "dummy"); @@ -257,7 +258,8 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, (ReportableEntityDecoder) decoders. get(ReportableEntityType.TRACE_SPAN_LOGS); ReportableEntityHandler spanLogsHandler = spanLogsHandlerSupplier.get(); - splitStringIterator(payload, '\n').forEachRemaining(line -> { + String spanLogsPayload = request.content().toString(CharsetUtil.UTF_8); + splitStringIterator(spanLogsPayload, '\n').forEachRemaining(line -> { try { spanLogDecoder.decode(JSON_PARSER.readTree(line), spanLogs, "dummy"); } catch (Exception e) { From cbd6813cc0e45d043ee9e3c206ecdf7f6b693e09 Mon Sep 17 00:00:00 2001 From: basilisk487 Date: Thu, 26 Mar 2020 18:22:01 -0500 Subject: [PATCH 3/5] Updated as per code review --- .../wavefront/agent/channel/ChannelUtils.java | 52 +++++++++-------- .../channel/PlainTextOrHttpFrameDecoder.java | 2 +- .../StatusTrackingHttpObjectAggregator.java | 28 +++++++++ .../agent/handlers/LineDelimitedUtils.java | 57 ------------------- .../listeners/AbstractHttpOnlyHandler.java | 4 +- .../AbstractLineDelimitedHandler.java | 14 ++--- .../AbstractPortUnificationHandler.java | 10 ++-- .../OpenTSDBPortUnificationHandler.java | 8 +-- .../RelayPortUnificationHandler.java | 26 ++++----- .../WavefrontPortUnificationHandler.java | 8 ++- .../tracing/TracePortUnificationHandler.java | 5 +- .../agent/queueing/DataSubmissionQueue.java | 25 ++++++-- .../handlers/LineDelimitedUtilsTest.java | 31 ---------- 13 files changed, 117 insertions(+), 153 deletions(-) create mode 100644 proxy/src/main/java/com/wavefront/agent/channel/StatusTrackingHttpObjectAggregator.java delete mode 100644 proxy/src/test/java/com/wavefront/agent/handlers/LineDelimitedUtilsTest.java diff --git a/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java b/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java index d37d61e58..ac0d2229b 100644 --- a/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java +++ b/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java @@ -1,20 +1,24 @@ package com.wavefront.agent.channel; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.base.Throwables; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableList; +import com.wavefront.agent.SharedMetricsRegistry; +import com.wavefront.common.TaggedMetricName; +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.MetricName; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.apache.http.NameValuePair; -import org.apache.http.client.utils.URLEncodedUtils; - import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -36,6 +40,9 @@ */ public abstract class ChannelUtils { + private static final Map> RESPONSE_STATUS_CACHES = + new ConcurrentHashMap<>(); + /** * Create a detailed error message from an exception, including current handle (port). * @@ -113,6 +120,7 @@ public static void writeHttpResponse(final ChannelHandlerContext ctx, public static void writeHttpResponse(final ChannelHandlerContext ctx, final HttpResponse response, boolean keepAlive) { + getHttpStatusCounter(ctx, response.status().code()).inc(); // Decide whether to close the connection or not. if (keepAlive) { // Add keep alive header as per: @@ -200,25 +208,23 @@ public static InetAddress getRemoteAddress(@Nonnull ChannelHandlerContext ctx) { } /** - * Get path from the URI (i.e. discard everything after "?"). - * - * @param uri URI to get path from - * @return path - */ - public static String getPath(String uri) { - int paramsDelimiter = uri.indexOf('?'); - return paramsDelimiter == -1 ? uri : uri.substring(0, paramsDelimiter); - } - - /** - * Parse query params into name/value pairs. + * Get a counter for ~proxy.listeners.http-requests.status.###.count metric for a specific + * status code, with port= point tag for added context. * - * @param uri URI to get query params from - * @return list of {@link NameValuePair} + * @param ctx channel handler context where a response is being sent. + * @param status response status code. */ - public static List getQueryParams(String uri) { - int paramsDelimiter = uri.indexOf('?'); - return paramsDelimiter == -1 ? ImmutableList.of() : - URLEncodedUtils.parse(uri.substring(paramsDelimiter + 1), CharsetUtil.UTF_8); + public static Counter getHttpStatusCounter(ChannelHandlerContext ctx, int status) { + if (ctx != null && ctx.channel() != null) { + InetSocketAddress localAddress = (InetSocketAddress) ctx.channel().localAddress(); + if (localAddress != null) { + return RESPONSE_STATUS_CACHES.computeIfAbsent(localAddress.getPort(), + port -> Caffeine.newBuilder().build(statusCode -> Metrics.newCounter( + new TaggedMetricName("listeners", "http-requests.status." + statusCode + ".count", + "port", String.valueOf(port))))).get(status); + } + } + // return a non-reportable counter otherwise + return SharedMetricsRegistry.getInstance().newCounter(new MetricName("", "", "dummy")); } } diff --git a/proxy/src/main/java/com/wavefront/agent/channel/PlainTextOrHttpFrameDecoder.java b/proxy/src/main/java/com/wavefront/agent/channel/PlainTextOrHttpFrameDecoder.java index 74dbe81a7..8e97b5663 100644 --- a/proxy/src/main/java/com/wavefront/agent/channel/PlainTextOrHttpFrameDecoder.java +++ b/proxy/src/main/java/com/wavefront/agent/channel/PlainTextOrHttpFrameDecoder.java @@ -95,7 +95,7 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf buffer, Lis addLast("decoder", new HttpRequestDecoder()). addLast("inflater", new HttpContentDecompressor()). addLast("encoder", new HttpResponseEncoder()). - addLast("aggregator", new HttpObjectAggregator(maxLengthHttp)). + addLast("aggregator", new StatusTrackingHttpObjectAggregator(maxLengthHttp)). addLast("handler", this.handler); } else { logger.fine("Switching to plaintext TCP protocol"); diff --git a/proxy/src/main/java/com/wavefront/agent/channel/StatusTrackingHttpObjectAggregator.java b/proxy/src/main/java/com/wavefront/agent/channel/StatusTrackingHttpObjectAggregator.java new file mode 100644 index 000000000..6728c3b84 --- /dev/null +++ b/proxy/src/main/java/com/wavefront/agent/channel/StatusTrackingHttpObjectAggregator.java @@ -0,0 +1,28 @@ +package com.wavefront.agent.channel; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; + +/** + * A {@link HttpObjectAggregator} that correctly tracks HTTP 413 returned + * for incoming payloads that are too large. + * + * @author vasily@wavefront.com + */ +public class StatusTrackingHttpObjectAggregator extends HttpObjectAggregator { + + public StatusTrackingHttpObjectAggregator(int maxContentLength) { + super(maxContentLength); + } + + @Override + protected void handleOversizedMessage(ChannelHandlerContext ctx, HttpMessage oversized) + throws Exception { + if (oversized instanceof HttpRequest) { + ChannelUtils.getHttpStatusCounter(ctx, 413).inc(); + } + super.handleOversizedMessage(ctx, oversized); + } +} diff --git a/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java b/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java index ae8aae29e..c802fe7b3 100644 --- a/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java +++ b/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java @@ -3,8 +3,6 @@ import org.apache.commons.lang.StringUtils; import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; /** * A collection of helper methods around plaintext newline-delimited payloads. @@ -28,61 +26,6 @@ public static String[] splitPushData(String pushData) { return StringUtils.split(pushData, PUSH_DATA_DELIMETER); } - /** - * Iterate over individual strings in a newline-delimited string. Skips empty strings. - * - * @param input payload to split. - * @return string iterator - */ - public static Iterator splitStringIterator(String input, char delimiter) { - return new Iterator() { - int currentPos = 0; - int indexOfDelimiter = input.indexOf(delimiter); - String peek = null; - - @Override - public boolean hasNext() { - if (peek == null) peek = advance(); - return peek != null; - } - - @Override - public String next() { - try { - if (peek == null) peek = advance(); - if (peek == null) throw new NoSuchElementException(); - return peek; - } finally { - peek = null; - } - } - - private String advance() { - String result = ""; - while ("".equals(result)) { - result = internalNext(); - } - return result; - } - - private String internalNext() { - if (indexOfDelimiter >= 0) { - try { - return input.substring(currentPos, indexOfDelimiter); - } finally { - currentPos = indexOfDelimiter + 1; - indexOfDelimiter = input.indexOf(delimiter, currentPos); - } - } else if (indexOfDelimiter == -1) { - indexOfDelimiter = Integer.MIN_VALUE; - return input.substring(currentPos); - } else { - return null; - } - } - }; - } - /** * Join a batch of strings into a payload string. * diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java index 21ceb7861..1b24e2e54 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java @@ -8,6 +8,8 @@ import javax.annotation.Nullable; +import org.jetbrains.annotations.NotNull; + import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; @@ -43,7 +45,7 @@ protected abstract void handleHttpMessage( */ @Override protected void handlePlainTextMessage(final ChannelHandlerContext ctx, - final String message) { + @NotNull final String message) { pointsDiscarded.get().inc(); logger.warning("Input discarded: plaintext protocol is not supported on port " + handle); } diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java index 6d9ce052c..d7ea8980e 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java @@ -1,5 +1,6 @@ package com.wavefront.agent.listeners; +import com.google.common.base.Splitter; import com.wavefront.agent.auth.TokenAuthenticator; import com.wavefront.agent.channel.HealthCheckManager; import com.wavefront.agent.formatter.DataFormat; @@ -7,6 +8,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.jetbrains.annotations.NotNull; + import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; @@ -15,7 +18,6 @@ import static com.wavefront.agent.channel.ChannelUtils.errorMessageWithRootCause; import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse; -import static com.wavefront.agent.handlers.LineDelimitedUtils.splitStringIterator; /** * Base class for all line-based protocols. Supports TCP line protocol as well as HTTP POST @@ -47,8 +49,9 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, HttpResponseStatus status; try { DataFormat format = getFormat(request); - splitStringIterator(request.content().toString(CharsetUtil.UTF_8), '\n'). - forEachRemaining(line -> processLine(ctx, line.trim(), format)); + Splitter.on('\n').trimResults().omitEmptyStrings(). + split(request.content().toString(CharsetUtil.UTF_8)). + forEach(line -> processLine(ctx, line, format)); status = HttpResponseStatus.ACCEPTED; } catch (Exception e) { status = HttpResponseStatus.BAD_REQUEST; @@ -64,10 +67,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, */ @Override protected void handlePlainTextMessage(final ChannelHandlerContext ctx, - final String message) { - if (message == null) { - throw new IllegalArgumentException("Message cannot be null"); - } + @NotNull final String message) { String trimmedMessage = message.trim(); if (trimmedMessage.isEmpty()) return; processLine(ctx, trimmedMessage, null); diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java index da4c61b98..3779154c7 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java @@ -15,6 +15,7 @@ import org.apache.http.client.utils.URLEncodedUtils; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -22,6 +23,7 @@ import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import io.netty.channel.ChannelHandler; @@ -117,7 +119,7 @@ protected abstract void handleHttpMessage( * @param message Plaintext message to process */ protected abstract void handlePlainTextMessage(final ChannelHandlerContext ctx, - final String message); + @Nonnull final String message); @Override public void channelReadComplete(ChannelHandlerContext ctx) { @@ -148,9 +150,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { protected String extractToken(final FullHttpRequest request) { String token = firstNonNull(request.headers().getAsString("X-AUTH-TOKEN"), request.headers().getAsString("Authorization"), "").replaceAll("^Bearer ", "").trim(); - Optional tokenParam = URLEncodedUtils.parse(request.uri(), CharsetUtil.UTF_8). - stream().filter(x -> x.getName().equals("t") || x.getName().equals("token") || - x.getName().equals("api_key")).findFirst(); + Optional tokenParam = URLEncodedUtils.parse(URI.create(request.uri()), + CharsetUtil.UTF_8).stream().filter(x -> x.getName().equals("t") || + x.getName().equals("token") || x.getName().equals("api_key")).findFirst(); if (tokenParam.isPresent()) { token = tokenParam.get().getValue(); } diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java index 3963f1f7b..b64ac6952 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java @@ -23,8 +23,11 @@ import java.util.function.Function; import java.util.function.Supplier; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.jetbrains.annotations.NotNull; + import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; @@ -122,10 +125,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, * Handles an incoming plain text (string) message. */ protected void handlePlainTextMessage(final ChannelHandlerContext ctx, - String message) { - if (message == null) { - throw new IllegalArgumentException("Message cannot be null"); - } + @NotNull @Nonnull String message) { if (message.startsWith("version")) { ChannelFuture f = ctx.writeAndFlush("Wavefront OpenTSDB Endpoint\n"); if (!f.isSuccess()) { diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java index f2171eb71..7afc9c137 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Splitter; import com.wavefront.common.Utils; import com.wavefront.agent.auth.TokenAuthenticator; import com.wavefront.agent.channel.HealthCheckManager; @@ -21,9 +22,10 @@ import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.MetricName; -import org.apache.commons.lang3.StringUtils; import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -44,10 +46,7 @@ import static com.wavefront.agent.channel.ChannelUtils.formatErrorMessage; import static com.wavefront.agent.channel.ChannelUtils.errorMessageWithRootCause; -import static com.wavefront.agent.channel.ChannelUtils.getPath; -import static com.wavefront.agent.channel.ChannelUtils.getQueryParams; import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse; -import static com.wavefront.agent.handlers.LineDelimitedUtils.splitStringIterator; import static com.wavefront.agent.listeners.FeatureCheckUtils.HISTO_DISABLED; import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED; import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED; @@ -140,8 +139,9 @@ public RelayPortUnificationHandler( @Override protected void handleHttpMessage(final ChannelHandlerContext ctx, final FullHttpRequest request) { + URI uri = URI.create(request.uri()); StringBuilder output = new StringBuilder(); - String path = getPath(request.uri()); + String path = uri.getPath(); final boolean isDirectIngestion = path.startsWith("/report"); if (path.endsWith("/checkin") && (path.startsWith("/api/daemon") || path.contains("wfproxy"))) { // simulate checkin response for proxy chaining @@ -151,7 +151,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, writeHttpResponse(ctx, HttpResponseStatus.OK, jsonResponse, request); return; } - String format = getQueryParams(request.uri()).stream(). + String format = URLEncodedUtils.parse(uri, CharsetUtil.UTF_8).stream(). filter(x -> x.getName().equals("format") || x.getName().equals("f")). map(NameValuePair::getValue).findFirst().orElse(Constants.PUSH_FORMAT_WAVEFRONT); @@ -183,10 +183,8 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, ReportableEntityDecoder histogramDecoder = (ReportableEntityDecoder) decoders. get(ReportableEntityType.HISTOGRAM); - String payload = request.content().toString(CharsetUtil.UTF_8); - splitStringIterator(payload, '\n').forEachRemaining(line -> { - String message = line.trim(); - if (message.isEmpty()) return; + Splitter.on('\n').trimResults().omitEmptyStrings(). + split(request.content().toString(CharsetUtil.UTF_8)).forEach(message -> { DataFormat dataFormat = DataFormat.autodetect(message); switch (dataFormat) { case EVENT: @@ -235,8 +233,8 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, (ReportableEntityDecoder) decoders. get(ReportableEntityType.TRACE); ReportableEntityHandler spanHandler = spanHandlerSupplier.get(); - String payload = request.content().toString(CharsetUtil.UTF_8); - splitStringIterator(payload, '\n').forEachRemaining(line -> { + Splitter.on('\n').trimResults().omitEmptyStrings(). + split(request.content().toString(CharsetUtil.UTF_8)).forEach(line -> { try { spanDecoder.decode(line, spans, "dummy"); } catch (Exception e) { @@ -258,8 +256,8 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, (ReportableEntityDecoder) decoders. get(ReportableEntityType.TRACE_SPAN_LOGS); ReportableEntityHandler spanLogsHandler = spanLogsHandlerSupplier.get(); - String spanLogsPayload = request.content().toString(CharsetUtil.UTF_8); - splitStringIterator(spanLogsPayload, '\n').forEachRemaining(line -> { + Splitter.on('\n').trimResults().omitEmptyStrings(). + split(request.content().toString(CharsetUtil.UTF_8)).forEach(line -> { try { spanLogDecoder.decode(JSON_PARSER.readTree(line), spanLogs, "dummy"); } catch (Exception e) { diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java index 764985680..ff88f15f8 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java @@ -18,6 +18,7 @@ import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.MetricName; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -27,11 +28,13 @@ import javax.annotation.Nullable; import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.CharsetUtil; import wavefront.report.ReportEvent; import wavefront.report.ReportPoint; import wavefront.report.ReportSourceTag; @@ -39,7 +42,6 @@ import wavefront.report.SpanLogs; import static com.wavefront.agent.channel.ChannelUtils.formatErrorMessage; -import static com.wavefront.agent.channel.ChannelUtils.getQueryParams; import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse; import static com.wavefront.agent.formatter.DataFormat.HISTOGRAM; import static com.wavefront.agent.formatter.DataFormat.SPAN; @@ -150,8 +152,8 @@ public WavefrontPortUnificationHandler( @Override protected DataFormat getFormat(FullHttpRequest httpRequest) { - return DataFormat.parse(getQueryParams(httpRequest.uri()).stream(). - filter(x -> x.getName().equals("format") || x.getName().equals("f")). + return DataFormat.parse(URLEncodedUtils.parse(URI.create(httpRequest.uri()), CharsetUtil.UTF_8). + stream().filter(x -> x.getName().equals("format") || x.getName().equals("f")). map(NameValuePair::getValue).findFirst().orElse(null)); } diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java index c259719f4..ef6af13e6 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java @@ -19,6 +19,7 @@ import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.MetricName; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -119,8 +120,8 @@ public TracePortUnificationHandler( @Nullable @Override protected DataFormat getFormat(FullHttpRequest httpRequest) { - return DataFormat.parse(URLEncodedUtils.parse(httpRequest.uri(), CharsetUtil.UTF_8).stream(). - filter(x -> x.getName().equals("format") || x.getName().equals("f")). + return DataFormat.parse(URLEncodedUtils.parse(URI.create(httpRequest.uri()), CharsetUtil.UTF_8). + stream().filter(x -> x.getName().equals("format") || x.getName().equals("f")). map(NameValuePair::getValue).findFirst().orElse(null)); } diff --git a/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java b/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java index bf657efa5..092b295f0 100644 --- a/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java +++ b/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java @@ -7,6 +7,7 @@ import com.wavefront.common.TaggedMetricName; import com.wavefront.data.ReportableEntityType; import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Counter; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -49,6 +50,12 @@ public class DataSubmissionQueue> extends Object private final String handle; private final String entityName; + private final Counter tasksAddedCounter; + private final Counter itemsAddedCounter; + private final Counter tasksRemovedCounter; + private final Counter itemsRemovedCounter; + + // maintain a fair lock on the queue private final ReentrantLock queueLock = new ReentrantLock(true); @@ -66,6 +73,14 @@ public DataSubmissionQueue(ObjectQueue delegate, if (delegate.isEmpty()) { initializeTracking(); } + this.tasksAddedCounter = Metrics.newCounter(new TaggedMetricName("buffer", "task-added", + "port", handle)); + this.itemsAddedCounter = Metrics.newCounter(new TaggedMetricName("buffer", entityName + + "-added", "port", handle)); + this.tasksRemovedCounter = Metrics.newCounter(new TaggedMetricName("buffer", "task-removed", + "port", handle)); + this.itemsRemovedCounter = Metrics.newCounter(new TaggedMetricName("buffer", entityName + + "-removed", "port", handle)); } @Override @@ -107,11 +122,10 @@ public void add(@Nonnull T t) throws IOException { if (currentWeight != null) { currentWeight.addAndGet(t.weight()); } + tasksAddedCounter.inc(); + itemsAddedCounter.inc(t.weight()); } finally { queueLock.unlock(); - Metrics.newCounter(new TaggedMetricName("buffer", "task-added", "port", handle)).inc(); - Metrics.newCounter(new TaggedMetricName("buffer", entityName + "-added", "port", handle)). - inc(t.weight()); } } @@ -146,14 +160,13 @@ public void remove(int tasksToRemove) { if (delegate.isEmpty()) { initializeTracking(); } + tasksRemovedCounter.inc(); + itemsRemovedCounter.inc(taskSize); } catch (IOException e) { Metrics.newCounter(new TaggedMetricName("buffer", "failures", "port", handle)).inc(); log.severe("I/O error removing task from the queue: " + e.getMessage()); } finally { queueLock.unlock(); - Metrics.newCounter(new TaggedMetricName("buffer", "task-removed", "port", handle)).inc(); - Metrics.newCounter(new TaggedMetricName("buffer", entityName + "-removed", "port", handle)). - inc(taskSize); } } diff --git a/proxy/src/test/java/com/wavefront/agent/handlers/LineDelimitedUtilsTest.java b/proxy/src/test/java/com/wavefront/agent/handlers/LineDelimitedUtilsTest.java deleted file mode 100644 index 846f59c3a..000000000 --- a/proxy/src/test/java/com/wavefront/agent/handlers/LineDelimitedUtilsTest.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.wavefront.agent.handlers; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.junit.Test; - -import static org.junit.Assert.*; - -/** - * @author vasily@wavefront.com - */ -public class LineDelimitedUtilsTest { - - @Test - public void testSplitStringIterator() { - assertArrayEquals(new String[] {"str1"}, - toArray(LineDelimitedUtils.splitStringIterator("str1", '\n'))); - assertArrayEquals(new String[] {"str1", "str2", "str3"}, - toArray(LineDelimitedUtils.splitStringIterator("str1\nstr2\nstr3", '\n'))); - assertArrayEquals(new String[] {"str1", "str2", "str3"}, - toArray(LineDelimitedUtils.splitStringIterator("\nstr1\nstr2\n\nstr3\n\n", '\n'))); - } - - private String[] toArray(Iterator iterator) { - List list = new ArrayList<>(); - iterator.forEachRemaining(list::add); - return list.toArray(new String[list.size()]); - } -} \ No newline at end of file From 48bde3aa52e902dd4f9a4bf7f8a49830ae404471 Mon Sep 17 00:00:00 2001 From: basilisk487 Date: Thu, 26 Mar 2020 18:27:04 -0500 Subject: [PATCH 4/5] Fix annotations --- .../wavefront/agent/listeners/AbstractHttpOnlyHandler.java | 5 ++--- .../agent/listeners/AbstractLineDelimitedHandler.java | 4 +--- .../agent/listeners/OpenTSDBPortUnificationHandler.java | 4 +--- proxy/src/test/java/com/wavefront/agent/PushAgentTest.java | 3 +-- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java index 1b24e2e54..3be9c7262 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java @@ -6,10 +6,9 @@ import java.net.URISyntaxException; import java.util.logging.Logger; +import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.jetbrains.annotations.NotNull; - import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; @@ -45,7 +44,7 @@ protected abstract void handleHttpMessage( */ @Override protected void handlePlainTextMessage(final ChannelHandlerContext ctx, - @NotNull final String message) { + @Nonnull final String message) { pointsDiscarded.get().inc(); logger.warning("Input discarded: plaintext protocol is not supported on port " + handle); } diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java index d7ea8980e..e18158bf4 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java @@ -8,8 +8,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.jetbrains.annotations.NotNull; - import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; @@ -67,7 +65,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, */ @Override protected void handlePlainTextMessage(final ChannelHandlerContext ctx, - @NotNull final String message) { + @Nonnull final String message) { String trimmedMessage = message.trim(); if (trimmedMessage.isEmpty()) return; processLine(ctx, trimmedMessage, null); diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java index b64ac6952..b6c4e8d14 100644 --- a/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java @@ -26,8 +26,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.jetbrains.annotations.NotNull; - import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; @@ -125,7 +123,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx, * Handles an incoming plain text (string) message. */ protected void handlePlainTextMessage(final ChannelHandlerContext ctx, - @NotNull @Nonnull String message) { + @Nonnull String message) { if (message.startsWith("version")) { ChannelFuture f = ctx.writeAndFlush("Wavefront OpenTSDB Endpoint\n"); if (!f.isSuccess()) { diff --git a/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java b/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java index 1738b1f89..d43046292 100644 --- a/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java +++ b/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java @@ -26,7 +26,6 @@ import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; -import org.jetbrains.annotations.NotNull; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -107,7 +106,7 @@ public class PushAgentTest { private SenderTaskFactory mockSenderTaskFactory = new SenderTaskFactory() { @SuppressWarnings("unchecked") @Override - public Collection> createSenderTasks(@NotNull HandlerKey handlerKey) { + public Collection> createSenderTasks(@Nonnull HandlerKey handlerKey) { return mockSenderTasks; } From 4002753347814007521c774c6ca88af57c0ef60b Mon Sep 17 00:00:00 2001 From: basilisk487 Date: Thu, 26 Mar 2020 18:28:12 -0500 Subject: [PATCH 5/5] Fix formatting --- .../java/com/wavefront/agent/queueing/DataSubmissionQueue.java | 1 - 1 file changed, 1 deletion(-) diff --git a/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java b/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java index 092b295f0..aa89d06bb 100644 --- a/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java +++ b/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java @@ -55,7 +55,6 @@ public class DataSubmissionQueue> extends Object private final Counter tasksRemovedCounter; private final Counter itemsRemovedCounter; - // maintain a fair lock on the queue private final ReentrantLock queueLock = new ReentrantLock(true);