From 937add9320715a10b3ce3d703a6988d7df04badd Mon Sep 17 00:00:00 2001 From: brharrington Date: Mon, 29 Jan 2024 21:41:27 -0600 Subject: [PATCH] stateless: add validation metrics (#1112) When reporting data to aggregator check the response and report validation metrics/logs. This makes it easier to debug if data is getting rejected due to validation errors. --- spectator-reg-stateless/README.md | 4 +- spectator-reg-stateless/build.gradle | 1 + .../stateless/StatelessRegistry.java | 10 +- .../spectator/stateless/ValidationHelper.java | 76 ++++++++++ .../stateless/ValidationResponse.java | 104 ++++++++++++++ .../stateless/StatelessRegistryTest.java | 1 - .../stateless/ValidationHelperTest.java | 132 ++++++++++++++++++ .../stateless/ValidationResponseTest.java | 84 +++++++++++ 8 files changed, 407 insertions(+), 5 deletions(-) create mode 100644 spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/ValidationHelper.java create mode 100644 spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/ValidationResponse.java create mode 100644 spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/ValidationHelperTest.java create mode 100644 spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/ValidationResponseTest.java diff --git a/spectator-reg-stateless/README.md b/spectator-reg-stateless/README.md index d8f213616..f59859dc5 100644 --- a/spectator-reg-stateless/README.md +++ b/spectator-reg-stateless/README.md @@ -1,8 +1,6 @@ ## Description -**DEPRECATED:** should not be used for any new development as Prana is deprecated. - -Registry implementation for reporting data to a [Prana] sidecar. +Registry implementation for reporting data to an aggregator cluster. [Prana]: https://github.com/Netflix/Prana diff --git a/spectator-reg-stateless/build.gradle b/spectator-reg-stateless/build.gradle index b1bc176eb..721f3c590 100644 --- a/spectator-reg-stateless/build.gradle +++ b/spectator-reg-stateless/build.gradle @@ -2,6 +2,7 @@ dependencies { api project(':spectator-api') api project(':spectator-ext-ipc') implementation 'com.fasterxml.jackson.core:jackson-core' + testImplementation 'com.fasterxml.jackson.core:jackson-databind' } jar { diff --git a/spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/StatelessRegistry.java b/spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/StatelessRegistry.java index 923ac5d8a..f23153651 100644 --- a/spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/StatelessRegistry.java +++ b/spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/StatelessRegistry.java @@ -26,6 +26,8 @@ import com.netflix.spectator.impl.Scheduler; import com.netflix.spectator.ipc.http.HttpClient; import com.netflix.spectator.ipc.http.HttpResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.URI; import java.time.Duration; @@ -48,6 +50,8 @@ */ public final class StatelessRegistry extends AbstractRegistry { + private static final Logger LOGGER = LoggerFactory.getLogger(StatelessRegistry.class); + private final boolean enabled; private final Duration frequency; private final long meterTTL; @@ -58,6 +62,7 @@ public final class StatelessRegistry extends AbstractRegistry { private final Map commonTags; private final HttpClient client; + private final ValidationHelper validationHelper; private Scheduler scheduler; @@ -73,6 +78,7 @@ public StatelessRegistry(Clock clock, StatelessConfig config) { this.batchSize = config.batchSize(); this.commonTags = config.commonTags(); this.client = HttpClient.create(this); + this.validationHelper = new ValidationHelper(LOGGER, this); } /** @@ -121,10 +127,12 @@ private void collectData() { .withReadTimeout(readTimeout) .withContent("application/json", payload) .compress(Deflater.BEST_SPEED) - .send(); + .send() + .decompress(); if (res.status() != 200) { logger.warn("failed to send metrics, status {}: {}", res.status(), res.entityAsString()); } + validationHelper.recordResults(batch.size(), res); } removeExpiredMeters(); } catch (Exception e) { diff --git a/spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/ValidationHelper.java b/spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/ValidationHelper.java new file mode 100644 index 000000000..fba75d686 --- /dev/null +++ b/spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/ValidationHelper.java @@ -0,0 +1,76 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.stateless; + +import com.netflix.spectator.api.Counter; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Registry; +import com.netflix.spectator.ipc.http.HttpResponse; +import org.slf4j.Logger; + +final class ValidationHelper { + + private final Logger logger; + + private final Counter measurementsSent; + private final Counter measurementsDroppedInvalid; + private final Counter measurementsDroppedHttp; + private final Counter measurementsDroppedOther; + + ValidationHelper(Logger logger, Registry registry) { + this.logger = logger; + + Id baseId = registry.createId("spectator.measurements"); + Id droppedId = baseId.withTag("id", "dropped"); + this.measurementsSent = registry.counter(baseId.withTag("id", "sent")); + this.measurementsDroppedHttp = registry.counter(droppedId.withTag("error", "http-error")); + this.measurementsDroppedInvalid = registry.counter(droppedId.withTag("error", "validation")); + this.measurementsDroppedOther = registry.counter(droppedId.withTag("error", "other")); + } + + void incrementDroppedHttp(int amount) { + measurementsDroppedHttp.increment(amount); + } + + /** + * Report metrics and do basic logging of validation results to help the user with + * debugging. + */ + void recordResults(int numMeasurements, HttpResponse res) { + if (res.status() == 200) { + measurementsSent.increment(numMeasurements); + } else if (res.status() < 500) { + // For validation: + // 202 - partial failure + // 400 - all failed, could also be some other sort of failure + try { + ValidationResponse vres = ValidationResponse.fromJson(res.entity()); + measurementsDroppedInvalid.increment(vres.getErrorCount()); + measurementsSent.increment(numMeasurements - vres.getErrorCount()); + logger.warn("{} measurement(s) dropped due to validation errors: {}", + vres.getErrorCount(), vres.errorSummary()); + } catch (Exception e) { + // Likely some other 400 error. Log at trace level in case the cause is really needed. + logger.trace("failed to parse response", e); + logger.warn("{} measurement(s) dropped. Http status: {}", numMeasurements, res.status()); + measurementsDroppedOther.increment(numMeasurements); + } + } else { + // Some sort of server side failure + measurementsDroppedHttp.increment(numMeasurements); + } + } +} diff --git a/spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/ValidationResponse.java b/spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/ValidationResponse.java new file mode 100644 index 000000000..9e8882a2a --- /dev/null +++ b/spectator-reg-stateless/src/main/java/com/netflix/spectator/stateless/ValidationResponse.java @@ -0,0 +1,104 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.stateless; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +/** + * Validation failure response from Atlas aggregator endpoint. + */ +@SuppressWarnings("PMD.DataClass") +final class ValidationResponse { + + private final String type; + private final int errorCount; + private final List message; // Singular to match backend response + + ValidationResponse(String type, int errorCount, List message) { + this.type = type; + this.errorCount = errorCount; + this.message = message; + } + + public String getType() { + return type; + } + + public int getErrorCount() { + return errorCount; + } + + public List getMessage() { + return message; + } + + String errorSummary() { + return (message == null || message.isEmpty()) + ? "unknown cause" + : String.join("; ", message); + } + + private static final JsonFactory JSON_FACTORY = new JsonFactory(); + + static ValidationResponse fromJson(byte[] json) throws IOException { + try (JsonParser parser = JSON_FACTORY.createParser(json)) { + String type = null; + int errorCount = 0; + List messages = new ArrayList<>(); + + checkToken(parser.nextToken(), EnumSet.of(JsonToken.START_OBJECT)); + while (parser.nextToken() == JsonToken.FIELD_NAME) { + switch (parser.getText()) { + case "type": + type = parser.nextTextValue(); + break; + case "errorCount": + errorCount = parser.nextIntValue(0); + break; + case "message": + JsonToken token = parser.nextToken(); + checkToken(token, EnumSet.of(JsonToken.VALUE_NULL, JsonToken.START_ARRAY)); + if (token == JsonToken.START_ARRAY) { + while (parser.nextToken() != JsonToken.END_ARRAY) { + messages.add(parser.getText()); + } + } + break; + default: + parser.nextToken(); + parser.skipChildren(); + break; + } + } + + return new ValidationResponse(type, errorCount, messages); + } + } + + private static void checkToken(JsonToken actual, EnumSet expected) throws IOException { + if (!expected.contains(actual)) { + throw new JsonParseException("expected " + expected + ", but found " + actual); + } + } +} diff --git a/spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/StatelessRegistryTest.java b/spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/StatelessRegistryTest.java index f3e116176..c64066881 100644 --- a/spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/StatelessRegistryTest.java +++ b/spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/StatelessRegistryTest.java @@ -120,5 +120,4 @@ public void batchesExpiration() { clock.setWallTime(Duration.ofMinutes(15).toMillis() + 1); Assertions.assertEquals(0, registry.getBatches().size()); } - } diff --git a/spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/ValidationHelperTest.java b/spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/ValidationHelperTest.java new file mode 100644 index 000000000..20141f5ae --- /dev/null +++ b/spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/ValidationHelperTest.java @@ -0,0 +1,132 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.stateless; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.spectator.api.DefaultRegistry; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Registry; +import com.netflix.spectator.ipc.http.HttpResponse; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +public class ValidationHelperTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ValidationHelperTest.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private void check(Registry r, long sent, long http, long invalid, long other) { + Id baseId = r.createId("spectator.measurements"); + Id droppedId = baseId.withTag("id", "dropped"); + + Assertions.assertEquals(sent, r.counter(baseId.withTag("id", "sent")).count()); + Assertions.assertEquals(http, r.counter(droppedId.withTag("error", "http-error")).count()); + Assertions.assertEquals(invalid, r.counter(droppedId.withTag("error", "validation")).count()); + Assertions.assertEquals(other, r.counter(droppedId.withTag("error", "other")).count()); + } + + private HttpResponse httpResponse(int status, ValidationResponse vres) throws IOException { + String json = MAPPER.writeValueAsString(vres); + return new HttpResponse(status, Collections.emptyMap(), json.getBytes(StandardCharsets.UTF_8)); + } + + @Test + public void incrementDroppedHttp() { + Registry registry = new DefaultRegistry(); + ValidationHelper helper = new ValidationHelper(LOGGER, registry); + helper.incrementDroppedHttp(42); + check(registry, 0, 42, 0, 0); + } + + @Test + public void ok() { + Registry registry = new DefaultRegistry(); + ValidationHelper helper = new ValidationHelper(LOGGER, registry); + helper.recordResults(42, new HttpResponse(200, Collections.emptyMap())); + check(registry, 42, 0, 0, 0); + } + + @Test + public void validationErrorPartial() throws IOException { + Registry registry = new DefaultRegistry(); + ValidationHelper helper = new ValidationHelper(LOGGER, registry); + ValidationResponse vres = new ValidationResponse( + "error", + 3, + Collections.singletonList("foo") + ); + helper.recordResults(42, httpResponse(202, vres)); + check(registry, 39, 0, 3, 0); + } + + @Test + public void validationErrorAll() throws IOException { + Registry registry = new DefaultRegistry(); + ValidationHelper helper = new ValidationHelper(LOGGER, registry); + ValidationResponse vres = new ValidationResponse( + "error", + 42, + Collections.singletonList("foo") + ); + helper.recordResults(42, httpResponse(400, vres)); + check(registry, 0, 0, 42, 0); + } + + @Test + public void validationErrorNullMessages() throws IOException { + Registry registry = new DefaultRegistry(); + ValidationHelper helper = new ValidationHelper(LOGGER, registry); + ValidationResponse vres = new ValidationResponse("error", 42, null); + helper.recordResults(42, httpResponse(400, vres)); + check(registry, 0, 0, 42, 0); + } + + @Test + public void validationErrorEmptyMessages() throws IOException { + Registry registry = new DefaultRegistry(); + ValidationHelper helper = new ValidationHelper(LOGGER, registry); + ValidationResponse vres = new ValidationResponse( + "error", + 42, + Collections.emptyList() + ); + helper.recordResults(42, httpResponse(400, vres)); + check(registry, 0, 0, 42, 0); + } + + @Test + public void validationErrorBadJson() throws IOException { + Registry registry = new DefaultRegistry(); + ValidationHelper helper = new ValidationHelper(LOGGER, registry); + HttpResponse res = new HttpResponse(400, Collections.emptyMap()); + helper.recordResults(42, res); + check(registry, 0, 0, 0, 42); + } + + @Test + public void serverError() { + Registry registry = new DefaultRegistry(); + ValidationHelper helper = new ValidationHelper(LOGGER, registry); + helper.recordResults(42, new HttpResponse(500, Collections.emptyMap())); + check(registry, 0, 42, 0, 0); + } +} diff --git a/spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/ValidationResponseTest.java b/spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/ValidationResponseTest.java new file mode 100644 index 000000000..74ed0c99d --- /dev/null +++ b/spectator-reg-stateless/src/test/java/com/netflix/spectator/stateless/ValidationResponseTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.stateless; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +public class ValidationResponseTest { + + private List list(String... vs) { + return Arrays.asList(vs); + } + + @Test + public void simple() throws Exception { + String payload = "{\"type\":\"error\",\"errorCount\":42,\"message\":[\"foo\",\"bar\"]}"; + ValidationResponse response = ValidationResponse.fromJson(payload.getBytes(StandardCharsets.UTF_8)); + Assertions.assertEquals("error", response.getType()); + Assertions.assertEquals(42, response.getErrorCount()); + Assertions.assertEquals(list("foo", "bar"), response.getMessage()); + } + + @Test + public void noMessage() throws Exception { + String payload = "{\"type\":\"error\",\"errorCount\":42}"; + ValidationResponse response = ValidationResponse.fromJson(payload.getBytes(StandardCharsets.UTF_8)); + Assertions.assertEquals("error", response.getType()); + Assertions.assertEquals(42, response.getErrorCount()); + Assertions.assertEquals(list(), response.getMessage()); + } + + @Test + public void emptyMessage() throws Exception { + String payload = "{\"type\":\"error\",\"errorCount\":42,\"message\":[]}"; + ValidationResponse response = ValidationResponse.fromJson(payload.getBytes(StandardCharsets.UTF_8)); + Assertions.assertEquals("error", response.getType()); + Assertions.assertEquals(42, response.getErrorCount()); + Assertions.assertEquals(list(), response.getMessage()); + } + + @Test + public void nullMessage() throws Exception { + String payload = "{\"type\":\"error\",\"errorCount\":42,\"message\":null}"; + ValidationResponse response = ValidationResponse.fromJson(payload.getBytes(StandardCharsets.UTF_8)); + Assertions.assertEquals("error", response.getType()); + Assertions.assertEquals(42, response.getErrorCount()); + Assertions.assertEquals(list(), response.getMessage()); + } + + @Test + public void noErrorCount() throws Exception { + String payload = "{\"type\":\"error\",\"errorCount\":42}"; + ValidationResponse response = ValidationResponse.fromJson(payload.getBytes(StandardCharsets.UTF_8)); + Assertions.assertEquals("error", response.getType()); + Assertions.assertEquals(42, response.getErrorCount()); + Assertions.assertEquals(list(), response.getMessage()); + } + + @Test + public void ignoredFields() throws Exception { + String payload = "{\"type\":\"error\",\"foo\":\"bar\",\"errorCount\":42,\"a\":{\"b\":[1,2,3]},\"message\":[\"foo\",\"bar\"]}"; + ValidationResponse response = ValidationResponse.fromJson(payload.getBytes(StandardCharsets.UTF_8)); + Assertions.assertEquals("error", response.getType()); + Assertions.assertEquals(42, response.getErrorCount()); + Assertions.assertEquals(list("foo", "bar"), response.getMessage()); + } +}