Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PUB-221: Default proxy port (2878) in HTTP mode should work like DDI endpoint #510

Merged
merged 5 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 40 additions & 12 deletions proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,6 @@
<artifactId>jafama</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
<version>1.3</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -312,9 +307,10 @@
</goals>
<configuration>
<propertyName>jacocoArgLine</propertyName>
<includes>
<include>com.wavefront.*</include>
</includes>
<excludes>
<exclude>com.tdunning.math.*</exclude>
<exclude>org.logstash.*</exclude>
</excludes>
</configuration>
</execution>
<execution>
Expand All @@ -325,11 +321,7 @@
</goals>
<configuration>
<rules>
<!-- Ensure minimum lines covered at project level for some projects. mvn artifactid is used to identify project -->
<rule>
<includes>
<include>com.wavefront.*</include>
</includes>
<element>BUNDLE</element>
<limits>
<limit>
Expand All @@ -339,7 +331,37 @@
</limit>
</limits>
</rule>
<rule>
<element>PACKAGE</element>
<includes>
<include>com.wavefront.agent.preprocessor</include>
</includes>
<limits>
<limit>
<counter>LINE</counter>
<value>COVEREDRATIO</value>
<minimum>0.91</minimum>
</limit>
</limits>
</rule>
<rule>
<element>PACKAGE</element>
<includes>
<include>com.wavefront.agent.histogram</include>
</includes>
<limits>
<limit>
<counter>LINE</counter>
<value>COVEREDRATIO</value>
<minimum>0.93</minimum>
</limit>
</limits>
</rule>
</rules>
<excludes>
<exclude>com/tdunning/**</exclude>
<exclude>org/logstash/**</exclude>
</excludes>
</configuration>
</execution>
<execution>
Expand All @@ -348,6 +370,12 @@
<goals>
<goal>report</goal>
</goals>
<configuration>
<excludes>
<exclude>com/tdunning/**</exclude>
<exclude>org/logstash/**</exclude>
</excludes>
</configuration>
</execution>
</executions>
</plugin>
Expand Down
12 changes: 9 additions & 3 deletions proxy/src/main/java/com/wavefront/agent/PushAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,10 @@ protected void startGraphiteListener(String strPort,

WavefrontPortUnificationHandler wavefrontPortUnificationHandler =
new WavefrontPortUnificationHandler(strPort, tokenAuthenticator, healthCheckManager,
decoderSupplier.get(), handlerFactory, hostAnnotator, preprocessors.get(strPort));
decoderSupplier.get(), handlerFactory, hostAnnotator, preprocessors.get(strPort),
() -> entityProps.get(ReportableEntityType.HISTOGRAM).isFeatureDisabled(),
() -> entityProps.get(ReportableEntityType.TRACE).isFeatureDisabled(),
() -> entityProps.get(ReportableEntityType.TRACE_SPAN_LOGS).isFeatureDisabled());

startAsManagedThread(port,
new TcpIngester(createInitializer(wavefrontPortUnificationHandler, port,
Expand Down Expand Up @@ -710,7 +713,7 @@ public void shutdown(@Nonnull String handle) {
WavefrontPortUnificationHandler wavefrontPortUnificationHandler =
new WavefrontPortUnificationHandler(strPort, tokenAuthenticator, healthCheckManager,
decoderSupplier.get(), deltaCounterHandlerFactory, hostAnnotator,
preprocessors.get(strPort));
preprocessors.get(strPort), () -> false, () -> false, () -> false);

startAsManagedThread(port,
new TcpIngester(createInitializer(wavefrontPortUnificationHandler, port,
Expand Down Expand Up @@ -968,7 +971,10 @@ public void shutdown(@Nonnull String handle) {
WavefrontPortUnificationHandler wavefrontPortUnificationHandler =
new WavefrontPortUnificationHandler(strPort, tokenAuthenticator, healthCheckManager,
decoderSupplier.get(), histogramHandlerFactory, hostAnnotator,
preprocessors.get(strPort));
preprocessors.get(strPort),
() -> entityProps.get(ReportableEntityType.HISTOGRAM).isFeatureDisabled(),
() -> entityProps.get(ReportableEntityType.TRACE).isFeatureDisabled(),
() -> entityProps.get(ReportableEntityType.TRACE_SPAN_LOGS).isFeatureDisabled());
startAsManagedThread(port,
new TcpIngester(createInitializer(wavefrontPortUnificationHandler, port,
proxyConfig.getHistogramMaxReceivedLength(), proxyConfig.getHistogramHttpBufferSize(),
Expand Down
34 changes: 34 additions & 0 deletions proxy/src/main/java/com/wavefront/agent/channel/ChannelUtils.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
package com.wavefront.agent.channel;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.base.Throwables;

import com.fasterxml.jackson.databind.JsonNode;
import com.wavefront.agent.SharedMetricsRegistry;
import com.wavefront.common.TaggedMetricName;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.MetricName;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -31,6 +40,9 @@
*/
public abstract class ChannelUtils {

private static final Map<Integer, LoadingCache<Integer, Counter>> RESPONSE_STATUS_CACHES =
new ConcurrentHashMap<>();

/**
* Create a detailed error message from an exception, including current handle (port).
*
Expand Down Expand Up @@ -108,6 +120,7 @@ public static void writeHttpResponse(final ChannelHandlerContext ctx,
public static void writeHttpResponse(final ChannelHandlerContext ctx,
final HttpResponse response,
boolean keepAlive) {
getHttpStatusCounter(ctx, response.status().code()).inc();
// Decide whether to close the connection or not.
if (keepAlive) {
// Add keep alive header as per:
Expand Down Expand Up @@ -193,4 +206,25 @@ public static InetAddress getRemoteAddress(@Nonnull ChannelHandlerContext ctx) {
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
return remoteAddress == null ? null : remoteAddress.getAddress();
}

/**
* Get a counter for ~proxy.listeners.http-requests.status.###.count metric for a specific
* status code, with port= point tag for added context.
*
* @param ctx channel handler context where a response is being sent.
* @param status response status code.
*/
public static Counter getHttpStatusCounter(ChannelHandlerContext ctx, int status) {
if (ctx != null && ctx.channel() != null) {
InetSocketAddress localAddress = (InetSocketAddress) ctx.channel().localAddress();
if (localAddress != null) {
return RESPONSE_STATUS_CACHES.computeIfAbsent(localAddress.getPort(),
port -> Caffeine.newBuilder().build(statusCode -> Metrics.newCounter(
new TaggedMetricName("listeners", "http-requests.status." + statusCode + ".count",
"port", String.valueOf(port))))).get(status);
}
}
// return a non-reportable counter otherwise
return SharedMetricsRegistry.getInstance().newCounter(new MetricName("", "", "dummy"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf buffer, Lis
addLast("decoder", new HttpRequestDecoder()).
addLast("inflater", new HttpContentDecompressor()).
addLast("encoder", new HttpResponseEncoder()).
addLast("aggregator", new HttpObjectAggregator(maxLengthHttp)).
addLast("aggregator", new StatusTrackingHttpObjectAggregator(maxLengthHttp)).
addLast("handler", this.handler);
} else {
logger.fine("Switching to plaintext TCP protocol");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.wavefront.agent.channel;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;

/**
* A {@link HttpObjectAggregator} that correctly tracks HTTP 413 returned
* for incoming payloads that are too large.
*
* @author vasily@wavefront.com
*/
public class StatusTrackingHttpObjectAggregator extends HttpObjectAggregator {

public StatusTrackingHttpObjectAggregator(int maxContentLength) {
super(maxContentLength);
}

@Override
protected void handleOversizedMessage(ChannelHandlerContext ctx, HttpMessage oversized)
throws Exception {
if (oversized instanceof HttpRequest) {
ChannelUtils.getHttpStatusCounter(ctx, 413).inc();
}
super.handleOversizedMessage(ctx, oversized);
}
}
29 changes: 25 additions & 4 deletions proxy/src/main/java/com/wavefront/agent/formatter/DataFormat.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.wavefront.agent.formatter;

import javax.annotation.Nullable;

import com.wavefront.api.agent.Constants;
import com.wavefront.ingester.AbstractIngesterFormatter;

/**
Expand All @@ -8,10 +11,10 @@
* @author vasily@wavefront.com
*/
public enum DataFormat {
GENERIC, HISTOGRAM, SOURCE_TAG, EVENT, JSON_STRING;
DEFAULT, WAVEFRONT, HISTOGRAM, SOURCE_TAG, EVENT, SPAN, SPAN_LOG;

public static DataFormat autodetect(final String input) {
if (input.length() < 2) return GENERIC;
if (input.length() < 2) return DEFAULT;
char firstChar = input.charAt(0);
switch (firstChar) {
case '@':
Expand All @@ -22,14 +25,32 @@ public static DataFormat autodetect(final String input) {
if (input.startsWith(AbstractIngesterFormatter.EVENT_LITERAL)) return EVENT;
break;
case '{':
if (input.charAt(input.length() - 1) == '}') return JSON_STRING;
if (input.charAt(input.length() - 1) == '}') return SPAN_LOG;
break;
case '!':
if (input.startsWith("!M ") || input.startsWith("!H ") || input.startsWith("!D ")) {
return HISTOGRAM;
}
break;
}
return GENERIC;
return DEFAULT;
}

@Nullable
public static DataFormat parse(String format) {
if (format == null) return null;
switch (format) {
case Constants.PUSH_FORMAT_WAVEFRONT:
case Constants.PUSH_FORMAT_GRAPHITE_V2:
return DataFormat.WAVEFRONT;
case Constants.PUSH_FORMAT_HISTOGRAM:
return DataFormat.HISTOGRAM;
case Constants.PUSH_FORMAT_TRACING:
return DataFormat.SPAN;
case Constants.PUSH_FORMAT_TRACING_SPAN_LOGS:
return DataFormat.SPAN_LOG;
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ private LineDelimitedUtils() {
* @param pushData payload to split.
* @return string array
*/
@Deprecated
public static String[] splitPushData(String pushData) {
return StringUtils.split(pushData, PUSH_DATA_DELIMETER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.net.URISyntaxException;
import java.util.logging.Logger;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.netty.channel.ChannelHandler;
Expand Down Expand Up @@ -43,7 +44,7 @@ protected abstract void handleHttpMessage(
*/
@Override
protected void handlePlainTextMessage(final ChannelHandlerContext ctx,
final String message) {
@Nonnull final String message) {
pointsDiscarded.get().inc();
logger.warning("Input discarded: plaintext protocol is not supported on port " + handle);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.wavefront.agent.listeners;

import com.google.common.base.Splitter;
import com.wavefront.agent.auth.TokenAuthenticator;
import com.wavefront.agent.channel.HealthCheckManager;
import com.wavefront.agent.formatter.DataFormat;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.netty.channel.ChannelHandler;
Expand All @@ -13,7 +16,6 @@

import static com.wavefront.agent.channel.ChannelUtils.errorMessageWithRootCause;
import static com.wavefront.agent.channel.ChannelUtils.writeHttpResponse;
import static com.wavefront.agent.handlers.LineDelimitedUtils.splitPushData;

/**
* Base class for all line-based protocols. Supports TCP line protocol as well as HTTP POST
Expand Down Expand Up @@ -44,9 +46,10 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,
StringBuilder output = new StringBuilder();
HttpResponseStatus status;
try {
for (String line : splitPushData(request.content().toString(CharsetUtil.UTF_8))) {
processLine(ctx, line.trim());
}
DataFormat format = getFormat(request);
Splitter.on('\n').trimResults().omitEmptyStrings().
split(request.content().toString(CharsetUtil.UTF_8)).
forEach(line -> processLine(ctx, line, format));
status = HttpResponseStatus.ACCEPTED;
} catch (Exception e) {
status = HttpResponseStatus.BAD_REQUEST;
Expand All @@ -58,22 +61,33 @@ protected void handleHttpMessage(final ChannelHandlerContext ctx,

/**
* Handles an incoming plain text (string) message. By default simply passes a string to
* {@link #processLine(ChannelHandlerContext, String)} method.
* {@link #processLine(ChannelHandlerContext, String, DataFormat)} method.
*/
@Override
protected void handlePlainTextMessage(final ChannelHandlerContext ctx,
final String message) {
if (message == null) {
throw new IllegalArgumentException("Message cannot be null");
}
processLine(ctx, message.trim());
@Nonnull final String message) {
String trimmedMessage = message.trim();
if (trimmedMessage.isEmpty()) return;
processLine(ctx, trimmedMessage, null);
}

/**
* Detect data format for an incoming HTTP request, if possible.
*
* @param httpRequest http request.
* @return Detected data format or null if unknown.
*/
@Nullable
protected abstract DataFormat getFormat(FullHttpRequest httpRequest);

/**
* Process a single line for a line-based stream.
*
* @param ctx Channel handler context.
* @param message Message to process.
* @param format Data format, if known
*/
protected abstract void processLine(final ChannelHandlerContext ctx, final String message);
protected abstract void processLine(final ChannelHandlerContext ctx,
@Nonnull final String message,
@Nullable DataFormat format);
}
Loading