diff --git a/proxy/pom.xml b/proxy/pom.xml
index 5a3aa48bb..8473a6088 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -261,11 +261,6 @@
jafama
2.1.0
-
- com.google.re2j
- re2j
- 1.3
-
@@ -312,9 +307,10 @@
jacocoArgLine
-
- com.wavefront.*
-
+
+ com.tdunning.math.*
+ org.logstash.*
+
@@ -325,11 +321,7 @@
-
-
- com.wavefront.*
-
BUNDLE
@@ -339,7 +331,37 @@
+
+ PACKAGE
+
+ com.wavefront.agent.preprocessor
+
+
+
+ LINE
+ COVEREDRATIO
+ 0.91
+
+
+
+
+ PACKAGE
+
+ com.wavefront.agent.histogram
+
+
+
+ LINE
+ COVEREDRATIO
+ 0.93
+
+
+
+
+ com/tdunning/**
+ org/logstash/**
+
@@ -348,6 +370,12 @@
report
+
+
+ com/tdunning/**
+ org/logstash/**
+
+
diff --git a/proxy/src/main/java/com/wavefront/agent/PushAgent.java b/proxy/src/main/java/com/wavefront/agent/PushAgent.java
index 43cb45fbe..ff4eeea8c 100644
--- a/proxy/src/main/java/com/wavefront/agent/PushAgent.java
+++ b/proxy/src/main/java/com/wavefront/agent/PushAgent.java
@@ -663,7 +663,10 @@ protected void startGraphiteListener(String strPort,
WavefrontPortUnificationHandler wavefrontPortUnificationHandler =
new WavefrontPortUnificationHandler(strPort, tokenAuthenticator, healthCheckManager,
- decoderSupplier.get(), handlerFactory, hostAnnotator, preprocessors.get(strPort));
+ decoderSupplier.get(), handlerFactory, hostAnnotator, preprocessors.get(strPort),
+ () -> entityProps.get(ReportableEntityType.HISTOGRAM).isFeatureDisabled(),
+ () -> entityProps.get(ReportableEntityType.TRACE).isFeatureDisabled(),
+ () -> entityProps.get(ReportableEntityType.TRACE_SPAN_LOGS).isFeatureDisabled());
startAsManagedThread(port,
new TcpIngester(createInitializer(wavefrontPortUnificationHandler, port,
@@ -710,7 +713,7 @@ public void shutdown(@Nonnull String handle) {
WavefrontPortUnificationHandler wavefrontPortUnificationHandler =
new WavefrontPortUnificationHandler(strPort, tokenAuthenticator, healthCheckManager,
decoderSupplier.get(), deltaCounterHandlerFactory, hostAnnotator,
- preprocessors.get(strPort));
+ preprocessors.get(strPort), () -> false, () -> false, () -> false);
startAsManagedThread(port,
new TcpIngester(createInitializer(wavefrontPortUnificationHandler, port,
@@ -968,7 +971,10 @@ public void shutdown(@Nonnull String handle) {
WavefrontPortUnificationHandler wavefrontPortUnificationHandler =
new WavefrontPortUnificationHandler(strPort, tokenAuthenticator, healthCheckManager,
decoderSupplier.get(), histogramHandlerFactory, hostAnnotator,
- preprocessors.get(strPort));
+ preprocessors.get(strPort),
+ () -> entityProps.get(ReportableEntityType.HISTOGRAM).isFeatureDisabled(),
+ () -> entityProps.get(ReportableEntityType.TRACE).isFeatureDisabled(),
+ () -> entityProps.get(ReportableEntityType.TRACE_SPAN_LOGS).isFeatureDisabled());
startAsManagedThread(port,
new TcpIngester(createInitializer(wavefrontPortUnificationHandler, port,
proxyConfig.getHistogramMaxReceivedLength(), proxyConfig.getHistogramHttpBufferSize(),
diff --git a/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java b/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java
index 2071ca191..ac0d2229b 100644
--- a/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java
+++ b/proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java
@@ -1,11 +1,20 @@
package com.wavefront.agent.channel;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.base.Throwables;
import com.fasterxml.jackson.databind.JsonNode;
+import com.wavefront.agent.SharedMetricsRegistry;
+import com.wavefront.common.TaggedMetricName;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.MetricName;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -31,6 +40,9 @@
*/
public abstract class ChannelUtils {
+ private static final Map> RESPONSE_STATUS_CACHES =
+ new ConcurrentHashMap<>();
+
/**
* Create a detailed error message from an exception, including current handle (port).
*
@@ -108,6 +120,7 @@ public static void writeHttpResponse(final ChannelHandlerContext ctx,
public static void writeHttpResponse(final ChannelHandlerContext ctx,
final HttpResponse response,
boolean keepAlive) {
+ getHttpStatusCounter(ctx, response.status().code()).inc();
// Decide whether to close the connection or not.
if (keepAlive) {
// Add keep alive header as per:
@@ -193,4 +206,25 @@ public static InetAddress getRemoteAddress(@Nonnull ChannelHandlerContext ctx) {
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
return remoteAddress == null ? null : remoteAddress.getAddress();
}
+
+ /**
+ * Get a counter for ~proxy.listeners.http-requests.status.###.count metric for a specific
+ * status code, with port= point tag for added context.
+ *
+ * @param ctx channel handler context where a response is being sent.
+ * @param status response status code.
+ */
+ public static Counter getHttpStatusCounter(ChannelHandlerContext ctx, int status) {
+ if (ctx != null && ctx.channel() != null) {
+ InetSocketAddress localAddress = (InetSocketAddress) ctx.channel().localAddress();
+ if (localAddress != null) {
+ return RESPONSE_STATUS_CACHES.computeIfAbsent(localAddress.getPort(),
+ port -> Caffeine.newBuilder().build(statusCode -> Metrics.newCounter(
+ new TaggedMetricName("listeners", "http-requests.status." + statusCode + ".count",
+ "port", String.valueOf(port))))).get(status);
+ }
+ }
+ // return a non-reportable counter otherwise
+ return SharedMetricsRegistry.getInstance().newCounter(new MetricName("", "", "dummy"));
+ }
}
diff --git a/proxy/src/main/java/com/wavefront/agent/channel/PlainTextOrHttpFrameDecoder.java b/proxy/src/main/java/com/wavefront/agent/channel/PlainTextOrHttpFrameDecoder.java
index 74dbe81a7..8e97b5663 100644
--- a/proxy/src/main/java/com/wavefront/agent/channel/PlainTextOrHttpFrameDecoder.java
+++ b/proxy/src/main/java/com/wavefront/agent/channel/PlainTextOrHttpFrameDecoder.java
@@ -95,7 +95,7 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf buffer, Lis
addLast("decoder", new HttpRequestDecoder()).
addLast("inflater", new HttpContentDecompressor()).
addLast("encoder", new HttpResponseEncoder()).
- addLast("aggregator", new HttpObjectAggregator(maxLengthHttp)).
+ addLast("aggregator", new StatusTrackingHttpObjectAggregator(maxLengthHttp)).
addLast("handler", this.handler);
} else {
logger.fine("Switching to plaintext TCP protocol");
diff --git a/proxy/src/main/java/com/wavefront/agent/channel/StatusTrackingHttpObjectAggregator.java b/proxy/src/main/java/com/wavefront/agent/channel/StatusTrackingHttpObjectAggregator.java
new file mode 100644
index 000000000..6728c3b84
--- /dev/null
+++ b/proxy/src/main/java/com/wavefront/agent/channel/StatusTrackingHttpObjectAggregator.java
@@ -0,0 +1,28 @@
+package com.wavefront.agent.channel;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpMessage;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+
+/**
+ * A {@link HttpObjectAggregator} that correctly tracks HTTP 413 returned
+ * for incoming payloads that are too large.
+ *
+ * @author vasily@wavefront.com
+ */
+public class StatusTrackingHttpObjectAggregator extends HttpObjectAggregator {
+
+ public StatusTrackingHttpObjectAggregator(int maxContentLength) {
+ super(maxContentLength);
+ }
+
+ @Override
+ protected void handleOversizedMessage(ChannelHandlerContext ctx, HttpMessage oversized)
+ throws Exception {
+ if (oversized instanceof HttpRequest) {
+ ChannelUtils.getHttpStatusCounter(ctx, 413).inc();
+ }
+ super.handleOversizedMessage(ctx, oversized);
+ }
+}
diff --git a/proxy/src/main/java/com/wavefront/agent/formatter/DataFormat.java b/proxy/src/main/java/com/wavefront/agent/formatter/DataFormat.java
index 35672d60a..b089efb6d 100644
--- a/proxy/src/main/java/com/wavefront/agent/formatter/DataFormat.java
+++ b/proxy/src/main/java/com/wavefront/agent/formatter/DataFormat.java
@@ -1,5 +1,8 @@
package com.wavefront.agent.formatter;
+import javax.annotation.Nullable;
+
+import com.wavefront.api.agent.Constants;
import com.wavefront.ingester.AbstractIngesterFormatter;
/**
@@ -8,10 +11,10 @@
* @author vasily@wavefront.com
*/
public enum DataFormat {
- GENERIC, HISTOGRAM, SOURCE_TAG, EVENT, JSON_STRING;
+ DEFAULT, WAVEFRONT, HISTOGRAM, SOURCE_TAG, EVENT, SPAN, SPAN_LOG;
public static DataFormat autodetect(final String input) {
- if (input.length() < 2) return GENERIC;
+ if (input.length() < 2) return DEFAULT;
char firstChar = input.charAt(0);
switch (firstChar) {
case '@':
@@ -22,7 +25,7 @@ public static DataFormat autodetect(final String input) {
if (input.startsWith(AbstractIngesterFormatter.EVENT_LITERAL)) return EVENT;
break;
case '{':
- if (input.charAt(input.length() - 1) == '}') return JSON_STRING;
+ if (input.charAt(input.length() - 1) == '}') return SPAN_LOG;
break;
case '!':
if (input.startsWith("!M ") || input.startsWith("!H ") || input.startsWith("!D ")) {
@@ -30,6 +33,24 @@ public static DataFormat autodetect(final String input) {
}
break;
}
- return GENERIC;
+ return DEFAULT;
+ }
+
+ @Nullable
+ public static DataFormat parse(String format) {
+ if (format == null) return null;
+ switch (format) {
+ case Constants.PUSH_FORMAT_WAVEFRONT:
+ case Constants.PUSH_FORMAT_GRAPHITE_V2:
+ return DataFormat.WAVEFRONT;
+ case Constants.PUSH_FORMAT_HISTOGRAM:
+ return DataFormat.HISTOGRAM;
+ case Constants.PUSH_FORMAT_TRACING:
+ return DataFormat.SPAN;
+ case Constants.PUSH_FORMAT_TRACING_SPAN_LOGS:
+ return DataFormat.SPAN_LOG;
+ default:
+ return null;
+ }
}
}
diff --git a/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java b/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java
index d4006639d..c802fe7b3 100644
--- a/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java
+++ b/proxy/src/main/java/com/wavefront/agent/handlers/LineDelimitedUtils.java
@@ -21,6 +21,7 @@ private LineDelimitedUtils() {
* @param pushData payload to split.
* @return string array
*/
+ @Deprecated
public static String[] splitPushData(String pushData) {
return StringUtils.split(pushData, PUSH_DATA_DELIMETER);
}
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java
index 21ceb7861..3be9c7262 100644
--- a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractHttpOnlyHandler.java
@@ -6,6 +6,7 @@
import java.net.URISyntaxException;
import java.util.logging.Logger;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import io.netty.channel.ChannelHandler;
@@ -43,7 +44,7 @@ protected abstract void handleHttpMessage(
*/
@Override
protected void handlePlainTextMessage(final ChannelHandlerContext ctx,
- final String message) {
+ @Nonnull final String message) {
pointsDiscarded.get().inc();
logger.warning("Input discarded: plaintext protocol is not supported on port " + handle);
}
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java
index e5d429733..e18158bf4 100644
--- a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractLineDelimitedHandler.java
@@ -1,8 +1,11 @@
package com.wavefront.agent.listeners;
+import com.google.common.base.Splitter;
import com.wavefront.agent.auth.TokenAuthenticator;
import com.wavefront.agent.channel.HealthCheckManager;
+import com.wavefront.agent.formatter.DataFormat;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import io.netty.channel.ChannelHandler;
@@ -13,7 +16,6 @@
import static com.wavefront.agent.channel.ChannelUtils.errorMessageWithRootCause;
import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse;
-import static com.wavefront.agent.handlers.LineDelimitedUtils.splitPushData;
/**
* Base class for all line-based protocols. Supports TCP line protocol as well as HTTP POST
@@ -44,9 +46,10 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
StringBuilder output = new StringBuilder();
HttpResponseStatus status;
try {
- for (String line : splitPushData(request.content().toString(CharsetUtil.UTF_8))) {
- processLine(ctx, line.trim());
- }
+ DataFormat format = getFormat(request);
+ Splitter.on('\n').trimResults().omitEmptyStrings().
+ split(request.content().toString(CharsetUtil.UTF_8)).
+ forEach(line -> processLine(ctx, line, format));
status = HttpResponseStatus.ACCEPTED;
} catch (Exception e) {
status = HttpResponseStatus.BAD_REQUEST;
@@ -58,22 +61,33 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
/**
* Handles an incoming plain text (string) message. By default simply passes a string to
- * {@link #processLine(ChannelHandlerContext, String)} method.
+ * {@link #processLine(ChannelHandlerContext, String, DataFormat)} method.
*/
@Override
protected void handlePlainTextMessage(final ChannelHandlerContext ctx,
- final String message) {
- if (message == null) {
- throw new IllegalArgumentException("Message cannot be null");
- }
- processLine(ctx, message.trim());
+ @Nonnull final String message) {
+ String trimmedMessage = message.trim();
+ if (trimmedMessage.isEmpty()) return;
+ processLine(ctx, trimmedMessage, null);
}
+ /**
+ * Detect data format for an incoming HTTP request, if possible.
+ *
+ * @param httpRequest http request.
+ * @return Detected data format or null if unknown.
+ */
+ @Nullable
+ protected abstract DataFormat getFormat(FullHttpRequest httpRequest);
+
/**
* Process a single line for a line-based stream.
*
* @param ctx Channel handler context.
* @param message Message to process.
+ * @param format Data format, if known
*/
- protected abstract void processLine(final ChannelHandlerContext ctx, final String message);
+ protected abstract void processLine(final ChannelHandlerContext ctx,
+ @Nonnull final String message,
+ @Nullable DataFormat format);
}
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java
index 3957abfb1..3779154c7 100644
--- a/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/AbstractPortUnificationHandler.java
@@ -23,6 +23,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import io.netty.channel.ChannelHandler;
@@ -118,7 +119,7 @@ protected abstract void handleHttpMessage(
* @param message Plaintext message to process
*/
protected abstract void handlePlainTextMessage(final ChannelHandlerContext ctx,
- final String message);
+ @Nonnull final String message);
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
@@ -146,21 +147,19 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.log(Level.WARNING, "Unexpected error: ", cause);
}
- protected String extractToken(final FullHttpRequest request) throws URISyntaxException {
- URI requestUri = new URI(request.uri());
+ protected String extractToken(final FullHttpRequest request) {
String token = firstNonNull(request.headers().getAsString("X-AUTH-TOKEN"),
request.headers().getAsString("Authorization"), "").replaceAll("^Bearer ", "").trim();
- Optional tokenParam = URLEncodedUtils.parse(requestUri, CharsetUtil.UTF_8).
- stream().filter(x -> x.getName().equals("t") || x.getName().equals("token") ||
- x.getName().equals("api_key")).findFirst();
+ Optional tokenParam = URLEncodedUtils.parse(URI.create(request.uri()),
+ CharsetUtil.UTF_8).stream().filter(x -> x.getName().equals("t") ||
+ x.getName().equals("token") || x.getName().equals("api_key")).findFirst();
if (tokenParam.isPresent()) {
token = tokenParam.get().getValue();
}
return token;
}
- protected boolean authorized(final ChannelHandlerContext ctx, final FullHttpRequest request)
- throws URISyntaxException {
+ protected boolean authorized(final ChannelHandlerContext ctx, final FullHttpRequest request) {
if (tokenAuthenticator.authRequired()) {
String token = extractToken(request);
if (!tokenAuthenticator.authorize(token)) { // 401 if no token or auth fails
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/FeatureCheckUtils.java b/proxy/src/main/java/com/wavefront/agent/listeners/FeatureCheckUtils.java
new file mode 100644
index 000000000..fec44f473
--- /dev/null
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/FeatureCheckUtils.java
@@ -0,0 +1,87 @@
+package com.wavefront.agent.listeners;
+
+import javax.annotation.Nullable;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.wavefront.common.MessageDedupingLogger;
+import com.yammer.metrics.core.Counter;
+
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.util.CharsetUtil;
+
+/**
+ * Constants and utility methods for validating feature subscriptions.
+ *
+ * @author vasily@wavefront.com
+ */
+public abstract class FeatureCheckUtils {
+ private static final Logger logger = Logger.getLogger(FeatureCheckUtils.class.getCanonicalName());
+
+ private static final Logger featureDisabledLogger = new MessageDedupingLogger(logger, 3, 0.2);
+ public static final String HISTO_DISABLED = "Ingested point discarded because histogram " +
+ "feature has not been enabled for your account";
+ public static final String SPAN_DISABLED = "Ingested span discarded because distributed " +
+ "tracing feature has not been enabled for your account.";
+ public static final String SPANLOGS_DISABLED = "Ingested span log discarded because " +
+ "this feature has not been enabled for your account.";
+
+ /**
+ * Check whether feature disabled flag is set, log a warning message, increment the counter by 1.
+ *
+ * @param featureDisabledFlag Supplier for feature disabled flag.
+ * @param message Warning message to log if feature is disabled.
+ * @param discardedCounter Optional counter for discarded items.
+ * @return true if feature is disabled
+ */
+ public static boolean isFeatureDisabled(Supplier featureDisabledFlag,
+ String message, @Nullable Counter discardedCounter) {
+ return isFeatureDisabled(featureDisabledFlag, message, discardedCounter, null, null);
+ }
+
+ /**
+ * Check whether feature disabled flag is set, log a warning message, increment the counter by 1.
+ *
+ * @param featureDisabledFlag Supplier for feature disabled flag.
+ * @param message Warning message to log if feature is disabled.
+ * @param discardedCounter Optional counter for discarded items.
+ * @param output Optional stringbuilder for messages
+ * @return true if feature is disabled
+ */
+ public static boolean isFeatureDisabled(Supplier featureDisabledFlag, String message,
+ @Nullable Counter discardedCounter,
+ @Nullable StringBuilder output) {
+ return isFeatureDisabled(featureDisabledFlag, message, discardedCounter, output, null);
+ }
+
+ /**
+ * Check whether feature disabled flag is set, log a warning message, increment the counter
+ * either by 1 or by number of \n characters in request payload, if provided.
+ *
+ * @param featureDisabledFlag Supplier for feature disabled flag.
+ * @param message Warning message to log if feature is disabled.
+ * @param discardedCounter Optional counter for discarded items.
+ * @param output Optional stringbuilder for messages
+ * @param request Optional http request to use payload size
+ * @return true if feature is disabled
+ */
+ public static boolean isFeatureDisabled(Supplier featureDisabledFlag, String message,
+ @Nullable Counter discardedCounter,
+ @Nullable StringBuilder output,
+ @Nullable FullHttpRequest request) {
+ if (featureDisabledFlag.get()) {
+ featureDisabledLogger.warning(message);
+ if (output != null) {
+ output.append(message);
+ }
+ if (discardedCounter != null) {
+ discardedCounter.inc(request == null ? 1 :
+ StringUtils.countMatches(request.content().toString(CharsetUtil.UTF_8), "\n") + 1);
+ }
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java
index 3963f1f7b..b6c4e8d14 100644
--- a/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/OpenTSDBPortUnificationHandler.java
@@ -23,6 +23,7 @@
import java.util.function.Function;
import java.util.function.Supplier;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import io.netty.channel.ChannelFuture;
@@ -122,10 +123,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
* Handles an incoming plain text (string) message.
*/
protected void handlePlainTextMessage(final ChannelHandlerContext ctx,
- String message) {
- if (message == null) {
- throw new IllegalArgumentException("Message cannot be null");
- }
+ @Nonnull String message) {
if (message.startsWith("version")) {
ChannelFuture f = ctx.writeAndFlush("Wavefront OpenTSDB Endpoint\n");
if (!f.isSuccess()) {
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/RawLogsIngesterPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/RawLogsIngesterPortUnificationHandler.java
index 75aa157d9..38ca58ded 100644
--- a/proxy/src/main/java/com/wavefront/agent/listeners/RawLogsIngesterPortUnificationHandler.java
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/RawLogsIngesterPortUnificationHandler.java
@@ -4,6 +4,7 @@
import com.wavefront.agent.auth.TokenAuthenticator;
import com.wavefront.agent.channel.HealthCheckManager;
+import com.wavefront.agent.formatter.DataFormat;
import com.wavefront.agent.logsharvesting.LogsIngester;
import com.wavefront.agent.logsharvesting.LogsMessage;
import com.wavefront.agent.preprocessor.ReportableEntityPreprocessor;
@@ -25,6 +26,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.FullHttpRequest;
import static com.wavefront.agent.channel.ChannelUtils.getRemoteAddress;
@@ -80,10 +82,16 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
super.exceptionCaught(ctx, cause);
}
+ @Nullable
+ @Override
+ protected DataFormat getFormat(FullHttpRequest httpRequest) {
+ return null;
+ }
+
@VisibleForTesting
@Override
- public void processLine(final ChannelHandlerContext ctx, String message) {
- if (message.isEmpty()) return;
+ public void processLine(final ChannelHandlerContext ctx, @Nonnull String message,
+ @Nullable DataFormat format) {
received.inc();
ReportableEntityPreprocessor preprocessor = preprocessorSupplier == null ?
null : preprocessorSupplier.get();
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java
index 868864be9..7afc9c137 100644
--- a/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/RelayPortUnificationHandler.java
@@ -1,12 +1,10 @@
package com.wavefront.agent.listeners;
-import com.google.common.collect.Lists;
-
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.wavefront.common.MessageDedupingLogger;
+import com.google.common.base.Splitter;
import com.wavefront.common.Utils;
import com.wavefront.agent.auth.TokenAuthenticator;
import com.wavefront.agent.channel.HealthCheckManager;
@@ -28,8 +26,7 @@
import org.apache.http.client.utils.URLEncodedUtils;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -50,7 +47,10 @@
import static com.wavefront.agent.channel.ChannelUtils.formatErrorMessage;
import static com.wavefront.agent.channel.ChannelUtils.errorMessageWithRootCause;
import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse;
-import static com.wavefront.agent.handlers.LineDelimitedUtils.splitPushData;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.HISTO_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.isFeatureDisabled;
import static com.wavefront.agent.listeners.WavefrontPortUnificationHandler.preprocessAndHandlePoint;
/**
@@ -67,13 +67,6 @@
public class RelayPortUnificationHandler extends AbstractHttpOnlyHandler {
private static final Logger logger = Logger.getLogger(
RelayPortUnificationHandler.class.getCanonicalName());
- private static final Logger featureDisabledLogger = new MessageDedupingLogger(logger, 3, 0.2);
- private static final String ERROR_HISTO_DISABLED = "Ingested point discarded because histogram " +
- "feature has not been enabled for your account";
- private static final String ERROR_SPAN_DISABLED = "Ingested span discarded because distributed " +
- "tracing feature has not been enabled for your account.";
- private static final String ERROR_SPANLOGS_DISABLED = "Ingested span log discarded because " +
- "this feature has not been enabled for your account.";
private static final ObjectMapper JSON_PARSER = new ObjectMapper();
@@ -145,9 +138,9 @@ public RelayPortUnificationHandler(
@Override
protected void handleHttpMessage(final ChannelHandlerContext ctx,
- final FullHttpRequest request) throws URISyntaxException {
+ final FullHttpRequest request) {
+ URI uri = URI.create(request.uri());
StringBuilder output = new StringBuilder();
- URI uri = new URI(request.uri());
String path = uri.getPath();
final boolean isDirectIngestion = path.startsWith("/report");
if (path.endsWith("/checkin") && (path.startsWith("/api/daemon") || path.contains("wfproxy"))) {
@@ -158,6 +151,9 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
writeHttpResponse(ctx, HttpResponseStatus.OK, jsonResponse, request);
return;
}
+ String format = URLEncodedUtils.parse(uri, CharsetUtil.UTF_8).stream().
+ filter(x -> x.getName().equals("format") || x.getName().equals("f")).
+ map(NameValuePair::getValue).findFirst().orElse(Constants.PUSH_FORMAT_WAVEFRONT);
// Return HTTP 200 (OK) for payloads received on the proxy endpoint
// Return HTTP 202 (ACCEPTED) for payloads received on the DDI endpoint
@@ -170,20 +166,13 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
} else {
okStatus = HttpResponseStatus.NO_CONTENT;
}
- String format = URLEncodedUtils.parse(uri, CharsetUtil.UTF_8).stream().
- filter(x -> x.getName().equals("format") || x.getName().equals("f")).
- map(NameValuePair::getValue).findFirst().orElse(Constants.PUSH_FORMAT_WAVEFRONT);
- String[] lines = splitPushData(request.content().toString(CharsetUtil.UTF_8));
HttpResponseStatus status;
-
switch (format) {
case Constants.PUSH_FORMAT_HISTOGRAM:
- if (histogramDisabled.get()) {
- discardedHistograms.get().inc(lines.length);
+ if (isFeatureDisabled(histogramDisabled, HISTO_DISABLED, discardedHistograms.get(),
+ output, request)) {
status = HttpResponseStatus.FORBIDDEN;
- featureDisabledLogger.info(ERROR_HISTO_DISABLED);
- output.append(ERROR_HISTO_DISABLED);
break;
}
case Constants.PUSH_FORMAT_WAVEFRONT:
@@ -194,9 +183,8 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
ReportableEntityDecoder histogramDecoder =
(ReportableEntityDecoder) decoders.
get(ReportableEntityType.HISTOGRAM);
- Arrays.stream(lines).forEach(line -> {
- String message = line.trim();
- if (message.isEmpty()) return;
+ Splitter.on('\n').trimResults().omitEmptyStrings().
+ split(request.content().toString(CharsetUtil.UTF_8)).forEach(message -> {
DataFormat dataFormat = DataFormat.autodetect(message);
switch (dataFormat) {
case EVENT:
@@ -208,10 +196,8 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
"sourceTag-formatted data!");
break;
case HISTOGRAM:
- if (histogramDisabled.get()) {
- discardedHistograms.get().inc(lines.length);
- featureDisabledLogger.info(ERROR_HISTO_DISABLED);
- output.append(ERROR_HISTO_DISABLED);
+ if (isFeatureDisabled(histogramDisabled, HISTO_DISABLED,
+ discardedHistograms.get(), output)) {
break;
}
preprocessAndHandlePoint(message, histogramDecoder, histogramHandlerSupplier.get(),
@@ -236,20 +222,19 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
}
break;
case Constants.PUSH_FORMAT_TRACING:
- if (traceDisabled.get()) {
- discardedSpans.get().inc(lines.length);
+ if (isFeatureDisabled(traceDisabled, SPAN_DISABLED, discardedSpans.get(), output,
+ request)) {
status = HttpResponseStatus.FORBIDDEN;
- featureDisabledLogger.info(ERROR_SPAN_DISABLED);
- output.append(ERROR_SPAN_DISABLED);
break;
}
- List spans = Lists.newArrayListWithCapacity(lines.length);
+ List spans = new ArrayList<>();
//noinspection unchecked
ReportableEntityDecoder spanDecoder =
(ReportableEntityDecoder) decoders.
get(ReportableEntityType.TRACE);
ReportableEntityHandler spanHandler = spanHandlerSupplier.get();
- Arrays.stream(lines).forEach(line -> {
+ Splitter.on('\n').trimResults().omitEmptyStrings().
+ split(request.content().toString(CharsetUtil.UTF_8)).forEach(line -> {
try {
spanDecoder.decode(line, spans, "dummy");
} catch (Exception e) {
@@ -260,20 +245,19 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
status = okStatus;
break;
case Constants.PUSH_FORMAT_TRACING_SPAN_LOGS:
- if (spanLogsDisabled.get()) {
- discardedSpanLogs.get().inc(lines.length);
+ if (isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, discardedSpanLogs.get(),
+ output, request)) {
status = HttpResponseStatus.FORBIDDEN;
- featureDisabledLogger.info(ERROR_SPANLOGS_DISABLED);
- output.append(ERROR_SPANLOGS_DISABLED);
break;
}
- List spanLogs = Lists.newArrayListWithCapacity(lines.length);
+ List spanLogs = new ArrayList<>();
//noinspection unchecked
ReportableEntityDecoder spanLogDecoder =
(ReportableEntityDecoder) decoders.
get(ReportableEntityType.TRACE_SPAN_LOGS);
ReportableEntityHandler spanLogsHandler = spanLogsHandlerSupplier.get();
- Arrays.stream(lines).forEach(line -> {
+ Splitter.on('\n').trimResults().omitEmptyStrings().
+ split(request.content().toString(CharsetUtil.UTF_8)).forEach(line -> {
try {
spanLogDecoder.decode(JSON_PARSER.readTree(line), spanLogs, "dummy");
} catch (Exception e) {
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java
index b4a594e8f..ff88f15f8 100644
--- a/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/WavefrontPortUnificationHandler.java
@@ -1,5 +1,7 @@
package com.wavefront.agent.listeners;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.wavefront.common.Utils;
import com.wavefront.agent.auth.TokenAuthenticator;
import com.wavefront.agent.channel.HealthCheckManager;
@@ -12,21 +14,43 @@
import com.wavefront.data.ReportableEntityType;
import com.wavefront.dto.SourceTag;
import com.wavefront.ingester.ReportableEntityDecoder;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.MetricName;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.util.CharsetUtil;
import wavefront.report.ReportEvent;
import wavefront.report.ReportPoint;
import wavefront.report.ReportSourceTag;
+import wavefront.report.Span;
+import wavefront.report.SpanLogs;
import static com.wavefront.agent.channel.ChannelUtils.formatErrorMessage;
+import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse;
+import static com.wavefront.agent.formatter.DataFormat.HISTOGRAM;
+import static com.wavefront.agent.formatter.DataFormat.SPAN;
+import static com.wavefront.agent.formatter.DataFormat.SPAN_LOG;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.HISTO_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.isFeatureDisabled;
+import static com.wavefront.agent.listeners.tracing.TracePortUnificationHandler.preprocessAndHandleSpan;
/**
* Process incoming Wavefront-formatted data. Also allows sourceTag formatted data and
@@ -39,6 +63,7 @@
*/
@ChannelHandler.Sharable
public class WavefrontPortUnificationHandler extends AbstractLineDelimitedHandler {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Nullable
private final SharedGraphiteHostAnnotator annotator;
@@ -48,11 +73,22 @@ public class WavefrontPortUnificationHandler extends AbstractLineDelimitedHandle
private final ReportableEntityDecoder sourceTagDecoder;
private final ReportableEntityDecoder eventDecoder;
private final ReportableEntityDecoder histogramDecoder;
+ private final ReportableEntityDecoder spanDecoder;
+ private final ReportableEntityDecoder spanLogsDecoder;
private final ReportableEntityHandler wavefrontHandler;
private final Supplier> histogramHandlerSupplier;
private final Supplier> sourceTagHandlerSupplier;
+ private final Supplier> spanHandlerSupplier;
+ private final Supplier> spanLogsHandlerSupplier;
private final Supplier> eventHandlerSupplier;
+ private final Supplier histogramDisabled;
+ private final Supplier traceDisabled;
+ private final Supplier spanLogsDisabled;
+
+ private final Supplier discardedHistograms;
+ private final Supplier discardedSpans;
+ private final Supplier discardedSpanLogs;
/**
* Create new instance with lazy initialization for handlers.
*
@@ -62,7 +98,10 @@ public class WavefrontPortUnificationHandler extends AbstractLineDelimitedHandle
* @param decoders decoders.
* @param handlerFactory factory for ReportableEntityHandler objects.
* @param annotator hostAnnotator that makes sure all points have a source= tag.
- * @param preprocessor preprocessor.
+ * @param preprocessor preprocessor supplier.
+ * @param histogramDisabled supplier for backend-controlled feature flag for histograms.
+ * @param traceDisabled supplier for backend-controlled feature flag for spans.
+ * @param spanLogsDisabled supplier for backend-controlled feature flag for span logs.
*/
@SuppressWarnings("unchecked")
public WavefrontPortUnificationHandler(
@@ -71,7 +110,9 @@ public WavefrontPortUnificationHandler(
final Map> decoders,
final ReportableEntityHandlerFactory handlerFactory,
@Nullable final SharedGraphiteHostAnnotator annotator,
- @Nullable final Supplier preprocessor) {
+ @Nullable final Supplier preprocessor,
+ final Supplier histogramDisabled, final Supplier traceDisabled,
+ final Supplier spanLogsDisabled) {
super(tokenAuthenticator, healthCheckManager, handle);
this.wavefrontDecoder = (ReportableEntityDecoder) decoders.
get(ReportableEntityType.POINT);
@@ -82,14 +123,54 @@ public WavefrontPortUnificationHandler(
get(ReportableEntityType.HISTOGRAM);
this.sourceTagDecoder = (ReportableEntityDecoder) decoders.
get(ReportableEntityType.SOURCE_TAG);
+ this.spanDecoder = (ReportableEntityDecoder) decoders.
+ get(ReportableEntityType.TRACE);
+ this.spanLogsDecoder = (ReportableEntityDecoder) decoders.
+ get(ReportableEntityType.TRACE_SPAN_LOGS);
this.eventDecoder = (ReportableEntityDecoder) decoders.
get(ReportableEntityType.EVENT);
this.histogramHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(
HandlerKey.of(ReportableEntityType.HISTOGRAM, handle)));
this.sourceTagHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(
HandlerKey.of(ReportableEntityType.SOURCE_TAG, handle)));
+ this.spanHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(
+ HandlerKey.of(ReportableEntityType.TRACE, handle)));
+ this.spanLogsHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(
+ HandlerKey.of(ReportableEntityType.TRACE_SPAN_LOGS, handle)));
this.eventHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(
HandlerKey.of(ReportableEntityType.EVENT, handle)));
+ this.histogramDisabled = histogramDisabled;
+ this.traceDisabled = traceDisabled;
+ this.spanLogsDisabled = spanLogsDisabled;
+ this.discardedHistograms = Utils.lazySupplier(() -> Metrics.newCounter(new MetricName(
+ "histogram", "", "discarded_points")));
+ this.discardedSpans = Utils.lazySupplier(() -> Metrics.newCounter(new MetricName(
+ "spans." + handle, "", "discarded")));
+ this.discardedSpanLogs = Utils.lazySupplier(() -> Metrics.newCounter(new MetricName(
+ "spanLogs." + handle, "", "discarded")));
+ }
+
+ @Override
+ protected DataFormat getFormat(FullHttpRequest httpRequest) {
+ return DataFormat.parse(URLEncodedUtils.parse(URI.create(httpRequest.uri()), CharsetUtil.UTF_8).
+ stream().filter(x -> x.getName().equals("format") || x.getName().equals("f")).
+ map(NameValuePair::getValue).findFirst().orElse(null));
+ }
+
+ @Override
+ protected void handleHttpMessage(ChannelHandlerContext ctx, FullHttpRequest request) {
+ StringBuilder out = new StringBuilder();
+ DataFormat format = getFormat(request);
+ if ((format == HISTOGRAM && isFeatureDisabled(histogramDisabled, HISTO_DISABLED,
+ discardedHistograms.get(), out, request)) ||
+ (format == SPAN && isFeatureDisabled(traceDisabled, SPAN_DISABLED,
+ discardedSpans.get(), out, request)) ||
+ (format == SPAN_LOG && isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED,
+ discardedSpanLogs.get(), out, request))) {
+ writeHttpResponse(ctx, HttpResponseStatus.FORBIDDEN, out, request);
+ return;
+ }
+ super.handleHttpMessage(ctx, request);
}
/**
@@ -98,9 +179,9 @@ public WavefrontPortUnificationHandler(
* @param message line being processed
*/
@Override
- protected void processLine(final ChannelHandlerContext ctx, String message) {
- if (message.isEmpty()) return;
- DataFormat dataFormat = DataFormat.autodetect(message);
+ protected void processLine(final ChannelHandlerContext ctx, @Nonnull String message,
+ @Nullable DataFormat format) {
+ DataFormat dataFormat = format == null ? DataFormat.autodetect(message) : format;
switch (dataFormat) {
case SOURCE_TAG:
ReportableEntityHandler sourceTagHandler =
@@ -138,7 +219,37 @@ protected void processLine(final ChannelHandlerContext ctx, String message) {
"\"", e, ctx));
}
return;
+ case SPAN:
+ ReportableEntityHandler spanHandler = spanHandlerSupplier.get();
+ if (spanHandler == null || spanDecoder == null) {
+ wavefrontHandler.reject(message, "Port is not configured to accept " +
+ "tracing data (spans)!");
+ return;
+ }
+ message = annotator == null ? message : annotator.apply(ctx, message);
+ preprocessAndHandleSpan(message, spanDecoder, spanHandler, spanHandler::report,
+ preprocessorSupplier, ctx, true, x -> true);
+ return;
+ case SPAN_LOG:
+ if (isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, discardedSpanLogs.get())) return;
+ ReportableEntityHandler spanLogsHandler = spanLogsHandlerSupplier.get();
+ if (spanLogsHandler == null || spanLogsDecoder == null) {
+ wavefrontHandler.reject(message, "Port is not configured to accept " +
+ "tracing data (span logs)!");
+ return;
+ }
+ try {
+ List spanLogs = new ArrayList<>(1);
+ spanLogsDecoder.decode(OBJECT_MAPPER.readTree(message), spanLogs, "dummy");
+ for (SpanLogs object : spanLogs) {
+ spanLogsHandler.report(object);
+ }
+ } catch (Exception e) {
+ spanLogsHandler.reject(message, formatErrorMessage(message, e, ctx));
+ }
+ return;
case HISTOGRAM:
+ if (isFeatureDisabled(histogramDisabled, HISTO_DISABLED, discardedHistograms.get())) return;
ReportableEntityHandler histogramHandler = histogramHandlerSupplier.get();
if (histogramHandler == null || histogramDecoder == null) {
wavefrontHandler.reject(message, "Port is not configured to accept " +
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/JaegerThriftUtils.java b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/JaegerThriftUtils.java
index c8d7b8851..3be4b8e97 100644
--- a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/JaegerThriftUtils.java
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/JaegerThriftUtils.java
@@ -3,7 +3,6 @@
import com.google.common.collect.ImmutableSet;
import com.wavefront.agent.handlers.ReportableEntityHandler;
import com.wavefront.agent.preprocessor.ReportableEntityPreprocessor;
-import com.wavefront.common.MessageDedupingLogger;
import com.wavefront.common.TraceConstants;
import com.wavefront.internal.reporter.WavefrontInternalReporter;
import com.wavefront.sdk.entities.tracing.sampling.Sampler;
@@ -33,6 +32,9 @@
import java.util.logging.Logger;
import java.util.stream.Collectors;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.isFeatureDisabled;
import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.DEBUG_SPAN_TAG_KEY;
import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_KEY;
import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_VAL;
@@ -53,7 +55,6 @@
public abstract class JaegerThriftUtils {
protected static final Logger logger =
Logger.getLogger(JaegerThriftUtils.class.getCanonicalName());
- private static final Logger featureDisabledLogger = new MessageDedupingLogger(logger, 2, 0.2);
// TODO: support sampling
private final static Set IGNORE_TAGS = ImmutableSet.of("sampler.type", "sampler.param");
@@ -112,15 +113,8 @@ public static void processBatch(Batch batch,
}
}
}
- if (traceDisabled.get()) {
- featureDisabledLogger.info("Ingested spans discarded because tracing feature is not " +
- "enabled on the server");
- discardedBatches.inc();
+ if (isFeatureDisabled(traceDisabled, SPAN_DISABLED, discardedBatches, output)) {
discardedTraces.inc(batch.getSpansSize());
- if (output != null) {
- output.append("Ingested spans discarded because tracing feature is not enabled on the " +
- "server.");
- }
return;
}
for (io.jaegertracing.thriftjava.Span span : batch.getSpans()) {
@@ -274,43 +268,39 @@ private static void processSpan(io.jaegertracing.thriftjava.Span span,
if (isForceSampled || isDebugSpanTag || (alwaysSampleErrors && isError) ||
sample(wavefrontSpan, sampler, discardedSpansBySampler)) {
spanHandler.report(wavefrontSpan);
- if (span.getLogs() != null && !span.getLogs().isEmpty()) {
- if (spanLogsDisabled.get()) {
- featureDisabledLogger.info("Span logs discarded because the feature is not " +
- "enabled on the server!");
- } else {
- SpanLogs spanLogs = SpanLogs.newBuilder().
- setCustomer("default").
- setTraceId(wavefrontSpan.getTraceId()).
- setSpanId(wavefrontSpan.getSpanId()).
- setLogs(span.getLogs().stream().map(x -> {
- Map fields = new HashMap<>(x.fields.size());
- x.fields.forEach(t -> {
- switch (t.vType) {
- case STRING:
- fields.put(t.getKey(), t.getVStr());
- break;
- case BOOL:
- fields.put(t.getKey(), String.valueOf(t.isVBool()));
- break;
- case LONG:
- fields.put(t.getKey(), String.valueOf(t.getVLong()));
- break;
- case DOUBLE:
- fields.put(t.getKey(), String.valueOf(t.getVDouble()));
- break;
- case BINARY:
- // ignore
- default:
- }
- });
- return SpanLog.newBuilder().
- setTimestamp(x.timestamp).
- setFields(fields).
- build();
- }).collect(Collectors.toList())).build();
- spanLogsHandler.report(spanLogs);
- }
+ if (span.getLogs() != null && !span.getLogs().isEmpty() &&
+ !isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, null)) {
+ SpanLogs spanLogs = SpanLogs.newBuilder().
+ setCustomer("default").
+ setTraceId(wavefrontSpan.getTraceId()).
+ setSpanId(wavefrontSpan.getSpanId()).
+ setLogs(span.getLogs().stream().map(x -> {
+ Map fields = new HashMap<>(x.fields.size());
+ x.fields.forEach(t -> {
+ switch (t.vType) {
+ case STRING:
+ fields.put(t.getKey(), t.getVStr());
+ break;
+ case BOOL:
+ fields.put(t.getKey(), String.valueOf(t.isVBool()));
+ break;
+ case LONG:
+ fields.put(t.getKey(), String.valueOf(t.getVLong()));
+ break;
+ case DOUBLE:
+ fields.put(t.getKey(), String.valueOf(t.getVDouble()));
+ break;
+ case BINARY:
+ // ignore
+ default:
+ }
+ });
+ return SpanLog.newBuilder().
+ setTimestamp(x.timestamp).
+ setFields(fields).
+ build();
+ }).collect(Collectors.toList())).build();
+ spanLogsHandler.report(spanLogs);
}
}
// report stats irrespective of span sampling.
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java
index 1a79cd449..ef6af13e6 100644
--- a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/TracePortUnificationHandler.java
@@ -6,12 +6,12 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wavefront.agent.auth.TokenAuthenticator;
import com.wavefront.agent.channel.HealthCheckManager;
+import com.wavefront.agent.formatter.DataFormat;
import com.wavefront.agent.handlers.HandlerKey;
import com.wavefront.agent.handlers.ReportableEntityHandler;
import com.wavefront.agent.handlers.ReportableEntityHandlerFactory;
import com.wavefront.agent.listeners.AbstractLineDelimitedHandler;
import com.wavefront.agent.preprocessor.ReportableEntityPreprocessor;
-import com.wavefront.common.MessageDedupingLogger;
import com.wavefront.data.ReportableEntityType;
import com.wavefront.ingester.ReportableEntityDecoder;
import com.wavefront.sdk.entities.tracing.sampling.Sampler;
@@ -19,6 +19,7 @@
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.MetricName;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -30,13 +31,20 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
-import wavefront.report.ReportPoint;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.util.CharsetUtil;
import wavefront.report.Span;
import wavefront.report.SpanLogs;
import static com.wavefront.agent.channel.ChannelUtils.formatErrorMessage;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.isFeatureDisabled;
import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_KEY;
import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_VAL;
@@ -52,7 +60,6 @@
public class TracePortUnificationHandler extends AbstractLineDelimitedHandler {
private static final Logger logger = Logger.getLogger(
TracePortUnificationHandler.class.getCanonicalName());
- private static final Logger featureDisabledLogger = new MessageDedupingLogger(logger, 2, 0.2);
private static final ObjectMapper JSON_PARSER = new ObjectMapper();
@@ -110,20 +117,20 @@ public TracePortUnificationHandler(
"sampler.discarded"));
}
+ @Nullable
@Override
- protected void processLine(final ChannelHandlerContext ctx, @Nonnull String message) {
- if (traceDisabled.get()) {
- featureDisabledLogger.warning("Ingested spans discarded because tracing feature is not " +
- "enabled on the server");
- discardedSpans.inc();
- return;
- }
- if (message.startsWith("{") && message.endsWith("}")) { // span logs
- if (spanLogsDisabled.get()) {
- featureDisabledLogger.warning("Ingested span logs discarded because the feature is not " +
- "enabled on the server");
- return;
- }
+ protected DataFormat getFormat(FullHttpRequest httpRequest) {
+ return DataFormat.parse(URLEncodedUtils.parse(URI.create(httpRequest.uri()), CharsetUtil.UTF_8).
+ stream().filter(x -> x.getName().equals("format") || x.getName().equals("f")).
+ map(NameValuePair::getValue).findFirst().orElse(null));
+ }
+
+ @Override
+ protected void processLine(final ChannelHandlerContext ctx, @Nonnull String message,
+ @Nullable DataFormat format) {
+ if (isFeatureDisabled(traceDisabled, SPAN_DISABLED, discardedSpans)) return;
+ if (format == DataFormat.SPAN_LOG || (message.startsWith("{") && message.endsWith("}"))) {
+ if (isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, null)) return;
try {
List output = new ArrayList<>(1);
spanLogsDecoder.decode(JSON_PARSER.readTree(message), output, "dummy");
diff --git a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java
index fc261d8dd..4cb663e88 100644
--- a/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java
+++ b/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java
@@ -5,7 +5,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.wavefront.common.MessageDedupingLogger;
import com.wavefront.common.Utils;
import com.wavefront.agent.auth.TokenAuthenticatorBuilder;
import com.wavefront.agent.channel.HealthCheckManager;
@@ -61,6 +60,9 @@
import static com.wavefront.agent.channel.ChannelUtils.errorMessageWithRootCause;
import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.SPANLOGS_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.SPAN_DISABLED;
+import static com.wavefront.agent.listeners.FeatureCheckUtils.isFeatureDisabled;
import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.DEBUG_SPAN_TAG_KEY;
import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.DEBUG_SPAN_TAG_VAL;
import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_KEY;
@@ -85,7 +87,6 @@ public class ZipkinPortUnificationHandler extends AbstractHttpOnlyHandler
implements Runnable, Closeable {
private static final Logger logger = Logger.getLogger(
ZipkinPortUnificationHandler.class.getCanonicalName());
- private static final Logger featureDisabledLogger = new MessageDedupingLogger(logger, 2, 0.2);
private final ReportableEntityHandler spanHandler;
private final ReportableEntityHandler spanLogsHandler;
@@ -207,12 +208,7 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
StringBuilder output = new StringBuilder();
// Handle case when tracing is disabled, ignore reported spans.
- if (traceDisabled.get()) {
- featureDisabledLogger.info("Ingested spans discarded because tracing feature is not " +
- "enabled on the server");
- discardedBatches.inc();
- output.append("Ingested spans discarded because tracing feature is not enabled on the " +
- "server.");
+ if (isFeatureDisabled(traceDisabled, SPAN_DISABLED, discardedBatches, output)) {
status = HttpResponseStatus.ACCEPTED;
writeHttpResponse(ctx, status, output, request);
return;
@@ -390,26 +386,22 @@ private void processZipkinSpan(zipkin2.Span zipkinSpan) {
if (isDebugSpanTag || isDebug || (alwaysSampleErrors && isError) || sample(wavefrontSpan)) {
spanHandler.report(wavefrontSpan);
- if (zipkinSpan.annotations() != null && !zipkinSpan.annotations().isEmpty()) {
- if (spanLogsDisabled.get()) {
- featureDisabledLogger.info("Span logs discarded because the feature is not " +
- "enabled on the server!");
- } else {
- SpanLogs spanLogs = SpanLogs.newBuilder().
- setCustomer("default").
- setTraceId(wavefrontSpan.getTraceId()).
- setSpanId(wavefrontSpan.getSpanId()).
- setSpanSecondaryId(zipkinSpan.kind() != null ?
- zipkinSpan.kind().toString().toLowerCase() : null).
- setLogs(zipkinSpan.annotations().stream().map(
- x -> SpanLog.newBuilder().
- setTimestamp(x.timestamp()).
- setFields(ImmutableMap.of("annotation", x.value())).
- build()).
- collect(Collectors.toList())).
- build();
- spanLogsHandler.report(spanLogs);
- }
+ if (zipkinSpan.annotations() != null && !zipkinSpan.annotations().isEmpty() &&
+ !isFeatureDisabled(spanLogsDisabled, SPANLOGS_DISABLED, null)) {
+ SpanLogs spanLogs = SpanLogs.newBuilder().
+ setCustomer("default").
+ setTraceId(wavefrontSpan.getTraceId()).
+ setSpanId(wavefrontSpan.getSpanId()).
+ setSpanSecondaryId(zipkinSpan.kind() != null ?
+ zipkinSpan.kind().toString().toLowerCase() : null).
+ setLogs(zipkinSpan.annotations().stream().map(
+ x -> SpanLog.newBuilder().
+ setTimestamp(x.timestamp()).
+ setFields(ImmutableMap.of("annotation", x.value())).
+ build()).
+ collect(Collectors.toList())).
+ build();
+ spanLogsHandler.report(spanLogs);
}
}
// report stats irrespective of span sampling.
diff --git a/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java b/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java
index bf657efa5..aa89d06bb 100644
--- a/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java
+++ b/proxy/src/main/java/com/wavefront/agent/queueing/DataSubmissionQueue.java
@@ -7,6 +7,7 @@
import com.wavefront.common.TaggedMetricName;
import com.wavefront.data.ReportableEntityType;
import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -49,6 +50,11 @@ public class DataSubmissionQueue> extends Object
private final String handle;
private final String entityName;
+ private final Counter tasksAddedCounter;
+ private final Counter itemsAddedCounter;
+ private final Counter tasksRemovedCounter;
+ private final Counter itemsRemovedCounter;
+
// maintain a fair lock on the queue
private final ReentrantLock queueLock = new ReentrantLock(true);
@@ -66,6 +72,14 @@ public DataSubmissionQueue(ObjectQueue delegate,
if (delegate.isEmpty()) {
initializeTracking();
}
+ this.tasksAddedCounter = Metrics.newCounter(new TaggedMetricName("buffer", "task-added",
+ "port", handle));
+ this.itemsAddedCounter = Metrics.newCounter(new TaggedMetricName("buffer", entityName +
+ "-added", "port", handle));
+ this.tasksRemovedCounter = Metrics.newCounter(new TaggedMetricName("buffer", "task-removed",
+ "port", handle));
+ this.itemsRemovedCounter = Metrics.newCounter(new TaggedMetricName("buffer", entityName +
+ "-removed", "port", handle));
}
@Override
@@ -107,11 +121,10 @@ public void add(@Nonnull T t) throws IOException {
if (currentWeight != null) {
currentWeight.addAndGet(t.weight());
}
+ tasksAddedCounter.inc();
+ itemsAddedCounter.inc(t.weight());
} finally {
queueLock.unlock();
- Metrics.newCounter(new TaggedMetricName("buffer", "task-added", "port", handle)).inc();
- Metrics.newCounter(new TaggedMetricName("buffer", entityName + "-added", "port", handle)).
- inc(t.weight());
}
}
@@ -146,14 +159,13 @@ public void remove(int tasksToRemove) {
if (delegate.isEmpty()) {
initializeTracking();
}
+ tasksRemovedCounter.inc();
+ itemsRemovedCounter.inc(taskSize);
} catch (IOException e) {
Metrics.newCounter(new TaggedMetricName("buffer", "failures", "port", handle)).inc();
log.severe("I/O error removing task from the queue: " + e.getMessage());
} finally {
queueLock.unlock();
- Metrics.newCounter(new TaggedMetricName("buffer", "task-removed", "port", handle)).inc();
- Metrics.newCounter(new TaggedMetricName("buffer", entityName + "-removed", "port", handle)).
- inc(taskSize);
}
}
diff --git a/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java b/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java
index 38358021a..d43046292 100644
--- a/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java
+++ b/proxy/src/test/java/com/wavefront/agent/PushAgentTest.java
@@ -26,7 +26,6 @@
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
-import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -71,6 +70,7 @@
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.startsWith;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -106,7 +106,7 @@ public class PushAgentTest {
private SenderTaskFactory mockSenderTaskFactory = new SenderTaskFactory() {
@SuppressWarnings("unchecked")
@Override
- public Collection> createSenderTasks(@NotNull HandlerKey handlerKey) {
+ public Collection> createSenderTasks(@Nonnull HandlerKey handlerKey) {
return mockSenderTasks;
}
@@ -360,6 +360,197 @@ public void testWavefrontUnifiedPortHandlerPlaintextUncompressedMixedDataPayload
verifyWithTimeout(500, mockPointHandler, mockHistogramHandler, mockEventHandler);
}
+ @Test
+ public void testWavefrontHandlerAsDDIEndpoint() throws Exception {
+ port = findAvailablePort(2978);
+ proxy.proxyConfig.pushListenerPorts = String.valueOf(port);
+ proxy.proxyConfig.dataBackfillCutoffHours = 8640;
+ proxy.startGraphiteListener(proxy.proxyConfig.getPushListenerPorts(), mockHandlerFactory,
+ null);
+ waitUntilListenerIsOnline(port);
+ String traceId = UUID.randomUUID().toString();
+ long timestamp1 = startTime * 1000000 + 12345;
+ long timestamp2 = startTime * 1000000 + 23456;
+
+ String payloadStr = "metric4.test 0 " + startTime + " source=test1\n" +
+ "metric4.test 1 " + (startTime + 1) + " source=test2\n" +
+ "metric4.test 2 " + (startTime + 2) + " source=test3"; // note the lack of newline at the end!
+ String histoData = "!M " + startTime + " #5 10.0 #10 100.0 metric.test.histo source=test1\n" +
+ "!M " + (startTime + 60) + " #5 20.0 #6 30.0 #7 40.0 metric.test.histo source=test2";
+ String spanData = "testSpanName parent=parent1 source=testsource spanId=testspanid " +
+ "traceId=\"" + traceId + "\" parent=parent2 " + startTime + " " + (startTime + 1);
+ String spanLogData = "{\"spanId\":\"testspanid\",\"traceId\":\"" + traceId +
+ "\",\"logs\":[{\"timestamp\":" + timestamp1 +
+ ",\"fields\":{\"key\":\"value\",\"key2\":\"value2\"}},{\"timestamp\":" +
+ timestamp2 + ",\"fields\":{\"key3\":\"value3\",\"key4\":\"value4\"}}]}\n";
+ String mixedData = "@SourceTag action=save source=testSource newtag1 newtag2\n" +
+ "@Event " + startTime + " \"Event name for testing\" host=host1 host=host2 tag=tag1 " +
+ "severity=INFO multi=bar multi=baz\n" +
+ "!M " + (startTime + 60) + " #5 20.0 #6 30.0 #7 40.0 metric.test.histo source=test2\n" +
+ "metric4.test 0 " + startTime + " source=test1\n" + spanLogData;
+
+ String invalidData = "{\"spanId\"}\n@SourceTag\n@Event\n!M #5\nmetric.name\n" +
+ "metric5.test 0 1234567890 source=test1\n";
+
+
+ reset(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+ mockPointHandler.report(ReportPoint.newBuilder().setTable("dummy").
+ setMetric("metric4.test").setHost("test1").setTimestamp(startTime * 1000).
+ setValue(0.0d).build());
+ expectLastCall().times(2);
+ mockPointHandler.report(ReportPoint.newBuilder().setTable("dummy").
+ setMetric("metric4.test").setHost("test2").setTimestamp((startTime + 1) * 1000).
+ setValue(1.0d).build());
+ expectLastCall().times(2);
+ mockPointHandler.report(ReportPoint.newBuilder().setTable("dummy").
+ setMetric("metric4.test").setHost("test3").setTimestamp((startTime + 2) * 1000).
+ setValue(2.0d).build());
+ expectLastCall().times(2);
+ replay(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+
+ assertEquals(202, gzippedHttpPost("http://localhost:" + port + "/report", payloadStr));
+ assertEquals(202, gzippedHttpPost("http://localhost:" + port +
+ "/report?format=wavefront", payloadStr));
+ verify(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+
+ reset(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+ mockHistogramHandler.report(ReportPoint.newBuilder().setTable("dummy").
+ setMetric("metric.test.histo").setHost("test1").setTimestamp(startTime * 1000).setValue(
+ Histogram.newBuilder()
+ .setType(HistogramType.TDIGEST)
+ .setDuration(60000)
+ .setBins(ImmutableList.of(10.0d, 100.0d))
+ .setCounts(ImmutableList.of(5, 10))
+ .build())
+ .build());
+ expectLastCall();
+ mockHistogramHandler.report(ReportPoint.newBuilder().setTable("dummy").
+ setMetric("metric.test.histo").setHost("test2").setTimestamp((startTime + 60) * 1000).
+ setValue(Histogram.newBuilder()
+ .setType(HistogramType.TDIGEST)
+ .setDuration(60000)
+ .setBins(ImmutableList.of(20.0d, 30.0d, 40.0d))
+ .setCounts(ImmutableList.of(5, 6, 7))
+ .build())
+ .build());
+ expectLastCall();
+ replay(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+
+ assertEquals(202, gzippedHttpPost("http://localhost:" + port +
+ "/report?format=histogram", histoData));
+ verify(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+
+ reset(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+ mockTraceSpanLogsHandler.report(SpanLogs.newBuilder().
+ setCustomer("dummy").
+ setTraceId(traceId).
+ setSpanId("testspanid").
+ setLogs(ImmutableList.of(
+ SpanLog.newBuilder().
+ setTimestamp(timestamp1).
+ setFields(ImmutableMap.of("key", "value", "key2", "value2")).
+ build(),
+ SpanLog.newBuilder().
+ setTimestamp(timestamp2).
+ setFields(ImmutableMap.of("key3", "value3", "key4", "value4")).
+ build()
+ )).
+ build());
+ expectLastCall();
+ mockTraceHandler.report(Span.newBuilder().setCustomer("dummy").setStartMillis(startTime * 1000)
+ .setDuration(1000)
+ .setName("testSpanName")
+ .setSource("testsource")
+ .setSpanId("testspanid")
+ .setTraceId(traceId)
+ .setAnnotations(ImmutableList.of(new Annotation("parent", "parent1"),
+ new Annotation("parent", "parent2")))
+ .build());
+ expectLastCall();
+ replay(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+
+ assertEquals(202, gzippedHttpPost("http://localhost:" + port +
+ "/report?format=trace", spanData));
+ assertEquals(202, gzippedHttpPost("http://localhost:" + port +
+ "/report?format=spanLogs", spanLogData));
+ verify(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+
+ reset(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+ mockSourceTagHandler.report(ReportSourceTag.newBuilder().
+ setOperation(SourceOperationType.SOURCE_TAG).setAction(SourceTagAction.SAVE).
+ setSource("testSource").setAnnotations(ImmutableList.of("newtag1", "newtag2")).build());
+ expectLastCall();
+ mockEventHandler.report(ReportEvent.newBuilder().setStartTime(startTime * 1000).
+ setEndTime(startTime * 1000 + 1).setName("Event name for testing").
+ setHosts(ImmutableList.of("host1", "host2")).setTags(ImmutableList.of("tag1")).
+ setAnnotations(ImmutableMap.of("severity", "INFO")).
+ setDimensions(ImmutableMap.of("multi", ImmutableList.of("bar", "baz"))).build());
+ expectLastCall();
+ mockPointHandler.report(ReportPoint.newBuilder().setTable("dummy").
+ setMetric("metric4.test").setHost("test1").setTimestamp(startTime * 1000).
+ setValue(0.0d).build());
+ expectLastCall();
+ replay(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+
+ proxy.entityProps.get(ReportableEntityType.HISTOGRAM).setFeatureDisabled(true);
+ assertEquals(403, gzippedHttpPost("http://localhost:" + port +
+ "/report?format=histogram", histoData));
+ proxy.entityProps.get(ReportableEntityType.TRACE).setFeatureDisabled(true);
+ assertEquals(403, gzippedHttpPost("http://localhost:" + port +
+ "/report?format=trace", spanData));
+ proxy.entityProps.get(ReportableEntityType.TRACE_SPAN_LOGS).setFeatureDisabled(true);
+ assertEquals(403, gzippedHttpPost("http://localhost:" + port +
+ "/report?format=spanLogs", spanLogData));
+ assertEquals(202, gzippedHttpPost("http://localhost:" + port + "/report", mixedData));
+ verify(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+
+ reset(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+ mockSourceTagHandler.report(ReportSourceTag.newBuilder().
+ setOperation(SourceOperationType.SOURCE_TAG).setAction(SourceTagAction.SAVE).
+ setSource("testSource").setAnnotations(ImmutableList.of("newtag1", "newtag2")).build());
+ expectLastCall();
+ mockEventHandler.report(ReportEvent.newBuilder().setStartTime(startTime * 1000).
+ setEndTime(startTime * 1000 + 1).setName("Event name for testing").
+ setHosts(ImmutableList.of("host1", "host2")).setTags(ImmutableList.of("tag1")).
+ setAnnotations(ImmutableMap.of("severity", "INFO")).
+ setDimensions(ImmutableMap.of("multi", ImmutableList.of("bar", "baz"))).build());
+ expectLastCall();
+ mockPointHandler.report(ReportPoint.newBuilder().setTable("dummy").setMetric("metric4.test").
+ setHost("test1").setTimestamp(startTime * 1000).setValue(0.0d).build());
+ expectLastCall();
+ mockSourceTagHandler.reject(eq("@SourceTag"), anyString());
+ expectLastCall();
+ mockEventHandler.reject(eq("@Event"), anyString());
+ expectLastCall();
+ mockPointHandler.reject(eq("metric.name"), anyString());
+ expectLastCall();
+ mockPointHandler.reject(eq(ReportPoint.newBuilder().setTable("dummy").setMetric("metric5.test").
+ setHost("test1").setTimestamp(1234567890000L).setValue(0.0d).build()),
+ startsWith("WF-402: Point outside of reasonable timeframe"));
+ expectLastCall();
+ replay(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+
+ assertEquals(202, gzippedHttpPost("http://localhost:" + port + "/report",
+ mixedData + "\n" + invalidData));
+
+ verify(mockPointHandler, mockHistogramHandler, mockTraceHandler, mockTraceSpanLogsHandler,
+ mockSourceTagHandler, mockEventHandler);
+ }
+
@Test
public void testTraceUnifiedPortHandlerPlaintext() throws Exception {
tracePort = findAvailablePort(3888);
diff --git a/proxy/src/test/java/com/wavefront/agent/QueuedAgentServiceTest.java b/proxy/src/test/java/com/wavefront/agent/QueuedAgentServiceTest.java
deleted file mode 100644
index e69de29bb..000000000
diff --git a/proxy/src/test/java/com/wavefront/agent/logsharvesting/LogsIngesterTest.java b/proxy/src/test/java/com/wavefront/agent/logsharvesting/LogsIngesterTest.java
index faa462adb..501360d7b 100644
--- a/proxy/src/test/java/com/wavefront/agent/logsharvesting/LogsIngesterTest.java
+++ b/proxy/src/test/java/com/wavefront/agent/logsharvesting/LogsIngesterTest.java
@@ -107,7 +107,7 @@ private void receiveRawLog(String log) {
InetSocketAddress addr = InetSocketAddress.createUnresolved("testHost", 1234);
EasyMock.expect(channel.remoteAddress()).andReturn(addr);
EasyMock.replay(ctx, channel);
- rawLogsIngesterUnderTest.processLine(ctx, log);
+ rawLogsIngesterUnderTest.processLine(ctx, log, null);
EasyMock.verify(ctx, channel);
}