Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Internal and external latency to OpenSearch and S3 sinks. #3583

Merged
merged 16 commits into from
Nov 7, 2023
1 change: 1 addition & 0 deletions data-prepper-plugins/date-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ dependencies {
implementation project(':data-prepper-test-common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'io.micrometer:micrometer-core'
implementation libs.commons.lang3
testImplementation libs.commons.lang3
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

@DataPrepperPlugin(name = "date", pluginType = Processor.class, pluginConfigurationType = DateProcessorConfig.class)
public class DateProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
Expand Down Expand Up @@ -71,8 +72,16 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
zonedDateTime = getDateTimeFromTimeReceived(record);

else if (keyToParse != null && !keyToParse.isEmpty()) {
zonedDateTime = getDateTimeFromMatch(record);
populateDateProcessorMetrics(zonedDateTime);
Pair<String, Instant> result = getDateTimeFromMatch(record);
if (result != null) {
zonedDateTime = result.getLeft();
Instant timeStamp = result.getRight();
if (dateProcessorConfig.getToOriginationMetadata()) {
Event event = (Event)record.getData();
event.getEventHandle().setExternalOriginationTime(timeStamp);
}
populateDateProcessorMetrics(zonedDateTime);
}
}

if (zonedDateTime != null)
Expand Down Expand Up @@ -119,7 +128,7 @@ private String getDateTimeFromTimeReceived(final Record<Event> record) {
return timeReceived.atZone(dateProcessorConfig.getDestinationZoneId()).format(getOutputFormatter());
}

private String getDateTimeFromMatch(final Record<Event> record) {
private Pair<String, Instant> getDateTimeFromMatch(final Record<Event> record) {
final String sourceTimestamp = getSourceTimestamp(record);
if (sourceTimestamp == null)
return null;
Expand All @@ -136,12 +145,12 @@ private String getSourceTimestamp(final Record<Event> record) {
}
}

private String getFormattedDateTimeString(final String sourceTimestamp) {
private Pair<String, Instant> getFormattedDateTimeString(final String sourceTimestamp) {
for (DateTimeFormatter formatter : dateTimeFormatters) {
try {
return ZonedDateTime.parse(sourceTimestamp, formatter).format(getOutputFormatter().withZone(dateProcessorConfig.getDestinationZoneId()));
ZonedDateTime tmp = ZonedDateTime.parse(sourceTimestamp, formatter);
return Pair.of(tmp.format(getOutputFormatter().withZone(dateProcessorConfig.getDestinationZoneId())), tmp.toInstant());
} catch (Exception ignored) {

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

public class DateProcessorConfig {
static final Boolean DEFAULT_FROM_TIME_RECEIVED = false;
static final Boolean DEFAULT_TO_ORIGINATION_METADATA = false;
static final String DEFAULT_DESTINATION = "@timestamp";
static final String DEFAULT_SOURCE_TIMEZONE = ZoneId.systemDefault().toString();
static final String DEFAULT_DESTINATION_TIMEZONE = ZoneId.systemDefault().toString();
Expand Down Expand Up @@ -45,6 +46,9 @@ public List<String> getPatterns() {
@JsonProperty("from_time_received")
private Boolean fromTimeReceived = DEFAULT_FROM_TIME_RECEIVED;

@JsonProperty("to_origination_metadata")
private Boolean toOriginationMetadata = DEFAULT_TO_ORIGINATION_METADATA;

@JsonProperty("match")
private List<DateMatch> match;

Expand Down Expand Up @@ -76,6 +80,10 @@ public Boolean getFromTimeReceived() {
return fromTimeReceived;
}

public Boolean getToOriginationMetadata() {
return toOriginationMetadata;
}

public List<DateMatch> getMatch() {
return match;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,22 @@ public class BulkOperationWrapper {
private final EventHandle eventHandle;
private final BulkOperation bulkOperation;
private final SerializedJson jsonNode;
private final OpenSearchSink sink;

public BulkOperationWrapper(final BulkOperation bulkOperation) {
this.bulkOperation = bulkOperation;
this.eventHandle = null;
this.jsonNode = null;
public BulkOperationWrapper(final OpenSearchSink sink, final BulkOperation bulkOperation) {
this(sink, bulkOperation, null, null);
}

public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode) {
public BulkOperationWrapper(final OpenSearchSink sink, final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode) {
checkNotNull(bulkOperation);
this.sink = sink;
this.bulkOperation = bulkOperation;
this.eventHandle = eventHandle;
this.jsonNode = jsonNode;
}

public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle) {
checkNotNull(bulkOperation);
this.bulkOperation = bulkOperation;
this.eventHandle = eventHandle;
this.jsonNode = null;
public BulkOperationWrapper(final OpenSearchSink sink, final BulkOperation bulkOperation, final EventHandle eventHandle) {
this(sink, bulkOperation, eventHandle, null);
}

public BulkOperation getBulkOperation() {
Expand All @@ -76,6 +73,7 @@ public EventHandle getEventHandle() {

public void releaseEventHandle(boolean result) {
if (eventHandle != null) {
sink.updateLatencyMetrics(eventHandle);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately we want this called because release is called. Instead of doubling up calls, how about we configure the EventHandle to provide this type of callback?

interface EventHandle {
  ...
  void onRelease(BiConsumer<EventHandle, Boolean> reseaseConsumer);
  ...
}

Then, in OpenSearchSink, be sure to make this call.

event.getHandle().onRelease((handle, result) -> updateLatencyMetrics(handle));

The current design results in too many classes having to be aware of these interactions.

eventHandle.release(result);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy;
import org.opensearch.dataprepper.plugins.sink.LatencyMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.opensearchserverless.OpenSearchServerlessClient;
Expand Down Expand Up @@ -91,6 +93,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class);
private static final int INITIALIZE_RETRY_WAIT_TIME_MS = 5000;
private final AwsCredentialsSupplier awsCredentialsSupplier;
private final LatencyMetrics latencyMetrics;

private DlqWriter dlqWriter;
private BufferedWriter dlqFileWriter;
Expand Down Expand Up @@ -141,6 +144,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.sinkContext = sinkContext != null ? sinkContext : new SinkContext(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
this.expressionEvaluator = expressionEvaluator;
this.latencyMetrics = new LatencyMetrics(pluginMetrics);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there really no way to give the EventHandle class a copy of PluginMetrics, and then to just have the releaseEventHandle call automatically populate the metric so we don't have to pass them to every sink, and we keep to just one thing that everyone has to call (releaseEventHandle). You could even add to the function, and make it releaseEventHandle(final EventHandle eventHandle, final String pluginId). The pluginId could just be the plugin name for now, but it can be used in the metric.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we do that? Metrics are sink specific, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but we could "fake" that the metrics are owned by the sink by adding the name of the sink here (since you can name the metrics anything and that it can still follow our metric naming pattern

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that's the clean way because an event handle can only have one name for metrics. How can it have multiple names? I think this is way cleaner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every sink will need to provide this latency and we want to make it as easy as possible for sinks to use this feature.

I think that data-prepper-core could hold a LatencyMetrics per Sink. Just before calling Sink::output, data-prepper-core registers an onRelease method similar to what I suggested in another comment. Then it receives the callback when release is called.

One thing we would need to be sure to do here is ensure that sinks always call release. This is possible now that #3546 is merged.

bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY);
bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS);
invalidActionErrorsCounter = pluginMetrics.counter(INVALID_ACTION_ERRORS);
Expand Down Expand Up @@ -316,6 +320,10 @@ private BulkOperation getBulkOperationForAction(final String action, final Seria
return bulkOperation;
}

public void updateLatencyMetrics(final EventHandle eventHandle) {
latencyMetrics.update(eventHandle);
}

@Override
public void doOutput(final Collection<Record<Event>> records) {
final long threadId = Thread.currentThread().getId();
Expand Down Expand Up @@ -376,7 +384,7 @@ public void doOutput(final Collection<Record<Event>> records) {
}
BulkOperation bulkOperation = getBulkOperationForAction(eventAction, document, indexName, event.getJsonNode());

BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode);
BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(this, bulkOperation, event.getEventHandle(), serializedJsonNode);
final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper);
if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) {
flushBatch(bulkRequest);
Expand Down Expand Up @@ -449,6 +457,7 @@ private void logFailureForDlqObjects(final List<DlqObject> dlqObjects, final Thr
try {
dlqFileWriter.write(String.format("{\"Document\": [%s], \"failure\": %s}\n",
BulkOperationWriter.dlqObjectToString(dlqObject), message));
updateLatencyMetrics(dlqObject.getEventHandle());
dlqObject.releaseEventHandle(true);
} catch (final IOException e) {
LOG.error("Failed to write a document to the DLQ", e);
Expand All @@ -459,6 +468,7 @@ private void logFailureForDlqObjects(final List<DlqObject> dlqObjects, final Thr
try {
dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginSetting.getName());
dlqObjects.forEach((dlqObject) -> {
updateLatencyMetrics(dlqObject.getEventHandle());
dlqObject.releaseEventHandle(true);
});
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.LatencyMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
Expand All @@ -28,6 +29,7 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -63,6 +65,7 @@ public class S3SinkService {
private final OutputCodecContext codecContext;
private final KeyGenerator keyGenerator;
private final Duration retrySleepTime;
private final LatencyMetrics latencyMetrics;

/**
* @param s3SinkConfig s3 sink related configuration.
Expand All @@ -81,6 +84,7 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer
this.codecContext = codecContext;
this.keyGenerator = keyGenerator;
this.retrySleepTime = retrySleepTime;
this.latencyMetrics = new LatencyMetrics(pluginMetrics);
reentrantLock = new ReentrantLock();

bufferedEventHandles = new LinkedList<>();
Expand All @@ -101,6 +105,10 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer
currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey);
}

public void updateLatencyMetrics(final EventHandle eventHandle) {
latencyMetrics.update(eventHandle);
}

/**
* @param records received records and add into buffer.
*/
Expand Down Expand Up @@ -153,6 +161,9 @@ void output(Collection<Record<Event>> records) {

private void releaseEventHandles(final boolean result) {
for (EventHandle eventHandle : bufferedEventHandles) {
if (result) {
latencyMetrics.update(eventHandle);
}
eventHandle.release(result);
}

Expand Down
Loading