Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Internal and external latency to OpenSearch and S3 sinks. #3583

Merged
merged 16 commits into from
Nov 7, 2023

Conversation

kkondaka
Copy link
Collaborator

@kkondaka kkondaka commented Nov 2, 2023

Description

Add Internal and external latency to OpenSearch and S3 sinks
Date processor modified to support new option to put external origination time in the event metadata
OpenSearch and S3 sinks calculate the internal and external latencies when the event handle is released.

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • [ X] New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@@ -141,6 +144,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.sinkContext = sinkContext != null ? sinkContext : new SinkContext(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
this.expressionEvaluator = expressionEvaluator;
this.latencyMetrics = new LatencyMetrics(pluginMetrics);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there really no way to give the EventHandle class a copy of PluginMetrics, and then to just have the releaseEventHandle call automatically populate the metric so we don't have to pass them to every sink, and we keep to just one thing that everyone has to call (releaseEventHandle). You could even add to the function, and make it releaseEventHandle(final EventHandle eventHandle, final String pluginId). The pluginId could just be the plugin name for now, but it can be used in the metric.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we do that? Metrics are sink specific, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but we could "fake" that the metrics are owned by the sink by adding the name of the sink here (since you can name the metrics anything and that it can still follow our metric naming pattern

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that's the clean way because an event handle can only have one name for metrics. How can it have multiple names? I think this is way cleaner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every sink will need to provide this latency and we want to make it as easy as possible for sinks to use this feature.

I think that data-prepper-core could hold a LatencyMetrics per Sink. Just before calling Sink::output, data-prepper-core registers an onRelease method similar to what I suggested in another comment. Then it receives the callback when release is called.

One thing we would need to be sure to do here is ensure that sinks always call release. This is possible now that #3546 is merged.

Krishna Kondaka added 7 commits November 3, 2023 15:26
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Comment on lines 43 to 46
final long internalLatency = Duration.between(eventHandle.getInternalOriginationTime(), Instant.now()).toMillis();
pluginMetrics.gauge(INTERNAL_LATENCY_MIN_MS, internalLatency, latency -> Math.min(internalLatencyMin, latency));
pluginMetrics.gauge(INTERNAL_LATENCY_MAX_MS, internalLatency, latency -> Math.max(internalLatencyMax, latency));
pluginMetrics.gauge(INTERNAL_LATENCY_AVG_MS, internalLatency, latency -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using guage here? Shouldn't this be DistributionSummary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I wasn't aware of that. If everyone prefers that, I can modify to use DistributionSummary.

Copy link
Member

@graytaylor0 graytaylor0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric can just be a DistributionSummary

Krishna Kondaka added 3 commits November 4, 2023 02:35
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
graytaylor0
graytaylor0 previously approved these changes Nov 4, 2023
Instant timeStamp = result.getRight();
if (dateProcessorConfig.getToOriginationMetadata()) {
Event event = (Event)record.getData();
event.getMetadata().setExternalOriginationTime(timeStamp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have EventMetadata call setExternalOriginationTime with the value when it is set? This way we can avoid making two calls here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not do that because I am not sure if we want to tie them together. What if we don't want external latency metric but set metadata only? This way, we could introduce another flag in date processor in future if such requirement arises.

@@ -76,6 +73,7 @@ public EventHandle getEventHandle() {

public void releaseEventHandle(boolean result) {
if (eventHandle != null) {
sink.updateLatencyMetrics(eventHandle);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately we want this called because release is called. Instead of doubling up calls, how about we configure the EventHandle to provide this type of callback?

interface EventHandle {
  ...
  void onRelease(BiConsumer<EventHandle, Boolean> reseaseConsumer);
  ...
}

Then, in OpenSearchSink, be sure to make this call.

event.getHandle().onRelease((handle, result) -> updateLatencyMetrics(handle));

The current design results in too many classes having to be aware of these interactions.

@@ -141,6 +144,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.sinkContext = sinkContext != null ? sinkContext : new SinkContext(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
this.expressionEvaluator = expressionEvaluator;
this.latencyMetrics = new LatencyMetrics(pluginMetrics);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every sink will need to provide this latency and we want to make it as easy as possible for sinks to use this feature.

I think that data-prepper-core could hold a LatencyMetrics per Sink. Just before calling Sink::output, data-prepper-core registers an onRelease method similar to what I suggested in another comment. Then it receives the callback when release is called.

One thing we would need to be sure to do here is ensure that sinks always call release. This is possible now that #3546 is merged.

@@ -68,6 +69,10 @@ public class BulkRetryStrategyTests {

@BeforeEach
public void setUp() {
sink = mock(OpenSearchSink.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using some of the other suggestions, all of these changes should be unnecessary.

@@ -36,10 +37,12 @@
class JavaClientAccumulatingCompressedBulkRequestTest {

private BulkRequest.Builder bulkRequestBuilder;
private OpenSearchSink sink;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, we can make all these changes unnecessary with the callback approaches recommended above.

import java.time.Duration;
import java.time.Instant;

public class LatencyMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are able to move the tracking of this into data-prepper-core, please move this class into data-prepper-core. There should be no need to expose it in that situation.

internalLatencySummary = pluginMetrics.summary(INTERNAL_LATENCY);
externalLatencySummary = pluginMetrics.summary(EXTERNAL_LATENCY);
}
public void update(final EventHandle eventHandle) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to think that this should take the result passed into the EventHandle and make the decision as to whether or not to record here.

public void update(final EventHandle eventHandle, final boolean result) {
  if(result == false)
    return;
  ...
}

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Krishna Kondaka added 2 commits November 7, 2023 00:04
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kkondaka , this is looking good. There are a few small corrections to make.

We can follow-up after merging this PR with some alternate names for the metric.

import java.time.Instant;

public class SinkLatencyMetrics {
public static final String INTERNAL_LATENCY = "internalLatency";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have different names here. What would a user interpret "internal" and "external" to mean?

Maybe we could follow up in a different PR before releasing 2.6.0.

@@ -91,6 +91,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class);
private static final int INITIALIZE_RETRY_WAIT_TIME_MS = 5000;
private final AwsCredentialsSupplier awsCredentialsSupplier;
//private final LatencyMetrics latencyMetrics;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this commented out code.

@@ -256,6 +257,20 @@ private void doInitializeInternal() throws IOException {
return invalidActionErrorsCounter.count();
}

/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this commented out code.

this.bulkOperation = bulkOperation;
this.eventHandle = eventHandle;
this.jsonNode = null;
this(bulkOperation, eventHandle, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the code improvements!

Assertions.assertTrue(event.getMetadata().getExternalOriginationTime() != null);
Assertions.assertTrue(event.getEventHandle().getExternalOriginationTime() != null);
ZonedDateTime expectedZonedDatetime = expectedDateTime.atZone(mockDateProcessorConfig.getSourceZoneId()).truncatedTo(ChronoUnit.SECONDS);
Assertions.assertTrue(expectedZonedDatetime.equals(event.getMetadata().getExternalOriginationTime().atZone(mockDateProcessorConfig.getSourceZoneId())));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation on this line appears off.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. It appears off. But it's not. If you reduce your font size, you can see that it is fine.

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@dlvenable
Copy link
Member

Whitesource is failing, but this PR does not add the library being reported. We can ignore this failure.

@dlvenable dlvenable merged commit a40f4b8 into opensearch-project:main Nov 7, 2023
62 of 63 checks passed
@juergen-walter
Copy link
Contributor

@kkondaka @dlvenable could you also update documentation for the new metrics
https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md

Please also state if a metric is given as millisecond or nanosecond.

@JannikBrand
Copy link
Contributor

We could only get the name of the metric (e.g. <pipeline_name>_opensearch_PipelineLatency_sum) by looking at all exported metrics of the Data Prepper server (port: 4900; path: /metrics/prometheus).
I also wonder how it works with sub-pipelines, e.g. if we take a look at the trace analytics use case, which consists of three pipelines (one entry pipeline which redirects to the raw-pipeline as well as the service-map pipeline), I saw that only the raw and service-map pipeline include the PipelineLatency metric, which makes sense. Does it mean the latency of the first (entry-) pipeline is added on top of that (e.g. total_latency_of_raw_pipeline> = latency_of_entry_pipeline> + <latency_of_raw_pipeline>) ?

More general: Is it planned to document other (missing) metrics? I know about the doc that @juergen-walter linked and about the READMEs of the individual plugins which contain a metric section, but they are not including every metric. For example, the README of the opensearch sink lists some metrics and points to the AbstractSink, but this is only the java file. Another example would be circuit breaker metrics.

@kkondaka kkondaka deleted the os-s3-e2e-latency-metric branch May 13, 2024 05:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants