Skip to content

Commit

Permalink
Query insights exporters implementation (opensearch-project#12982)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Jun 6, 2024
1 parent 1cded65 commit 0ddf4bd
Show file tree
Hide file tree
Showing 18 changed files with 845 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Collection<Object> createComponents(
final Supplier<RepositoriesService> repositoriesServiceSupplier
) {
// create top n queries service
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}

Expand Down Expand Up @@ -110,7 +110,8 @@ public List<Setting<?>> getSettings() {
// Settings for top N queries
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.util.List;

/**
* Debug exporter for development purpose
*/
public final class DebugExporter implements QueryInsightsExporter {
/**
* Logger of the debug exporter
*/
private final Logger logger = LogManager.getLogger();

/**
* Constructor of DebugExporter
*/
private DebugExporter() {}

private static class InstanceHolder {
private static final DebugExporter INSTANCE = new DebugExporter();
}

/**
Get the singleton instance of DebugExporter
*
@return DebugExporter instance
*/
public static DebugExporter getInstance() {
return InstanceHolder.INSTANCE;
}

/**
* Write the list of SearchQueryRecord to debug log
*
* @param records list of {@link SearchQueryRecord}
*/
@Override
public void export(final List<SearchQueryRecord> records) {
logger.debug("QUERY_INSIGHTS_RECORDS: " + records.toString());
}

/**
* Close the debugger exporter sink
*/
@Override
public void close() {
logger.debug("Closing the DebugExporter..");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;

import java.util.List;

/**
* Local index exporter for exporting query insights data to local OpenSearch indices.
*/
public final class LocalIndexExporter implements QueryInsightsExporter {
/**
* Logger of the local index exporter
*/
private final Logger logger = LogManager.getLogger();
private final Client client;
private DateTimeFormatter indexPattern;

/**
* Constructor of LocalIndexExporter
*
* @param client OS client
* @param indexPattern the pattern of index to export to
*/
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
this.client = client;
}

/**
* Getter of indexPattern
*
* @return indexPattern
*/
public DateTimeFormatter getIndexPattern() {
return indexPattern;
}

/**
* Setter of indexPattern
*
* @param indexPattern index pattern
* @return the current LocalIndexExporter
*/
public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
return this;
}

/**
* Export a list of SearchQueryRecord to a local index
*
* @param records list of {@link SearchQueryRecord}
*/
@Override
public void export(final List<SearchQueryRecord> records) {
if (records == null || records.size() == 0) {
return;
}
try {
final String index = getDateTimeFromFormat();
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
for (SearchQueryRecord record : records) {
bulkRequestBuilder.add(
new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {}

@Override
public void onFailure(Exception e) {
logger.error("Failed to execute bulk operation for query insights data: ", e);
}
});
} catch (final Exception e) {
logger.error("Unable to index query insights data: ", e);
}
}

/**
* Close the exporter sink
*/
@Override
public void close() {
logger.debug("Closing the LocalIndexExporter..");
}

private String getDateTimeFromFormat() {
return indexPattern.print(DateTime.now(DateTimeZone.UTC));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.io.Closeable;
import java.util.List;

/**
* Base interface for Query Insights exporters
*/
public interface QueryInsightsExporter extends Closeable {
/**
* Export a list of SearchQueryRecord to the exporter sink
*
* @param records list of {@link SearchQueryRecord}
*/
void export(final List<SearchQueryRecord> records);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.joda.time.format.DateTimeFormat;

import java.io.IOException;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;

/**
* Factory class for validating and creating exporters based on provided settings
*/
public class QueryInsightsExporterFactory {
/**
* Logger of the query insights exporter factory
*/
private final Logger logger = LogManager.getLogger();
final private Client client;
final private Set<QueryInsightsExporter> exporters;

/**
* Constructor of QueryInsightsExporterFactory
*
* @param client OS client
*/
public QueryInsightsExporterFactory(final Client client) {
this.client = client;
this.exporters = new HashSet<>();
}

/**
* Validate exporter sink config
*
* @param settings exporter sink config {@link Settings}
* @throws IllegalArgumentException if provided exporter sink config settings are invalid
*/
public void validateExporterConfig(final Settings settings) throws IllegalArgumentException {
// Disable exporter if the EXPORTER_TYPE setting is null
if (settings.get(EXPORTER_TYPE) == null) {
return;
}
SinkType type;
try {
type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Invalid exporter type [%s], type should be one of %s",
settings.get(EXPORTER_TYPE),
SinkType.allSinkTypes()
)
);
}
switch (type) {
case LOCAL_INDEX:
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN);
if (indexPattern.length() == 0) {
throw new IllegalArgumentException("Empty index pattern configured for the exporter");
}
try {
DateTimeFormat.forPattern(indexPattern);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the exporter", indexPattern)
);
}
}
}

/**
* Create an exporter based on provided parameters
*
* @param type The type of exporter to create
* @param indexPattern the index pattern if creating a index exporter
* @return QueryInsightsExporter the created exporter sink
*/
public QueryInsightsExporter createExporter(SinkType type, String indexPattern) {
if (SinkType.LOCAL_INDEX.equals(type)) {
QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormat.forPattern(indexPattern));
this.exporters.add(exporter);
return exporter;
}
return DebugExporter.getInstance();
}

/**
* Update an exporter based on provided parameters
*
* @param exporter The exporter to update
* @param indexPattern the index pattern if creating a index exporter
* @return QueryInsightsExporter the updated exporter sink
*/
public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, String indexPattern) {
if (exporter.getClass() == LocalIndexExporter.class) {
((LocalIndexExporter) exporter).setIndexPattern(DateTimeFormat.forPattern(indexPattern));
}
return exporter;
}

/**
* Close an exporter
*
* @param exporter the exporter to close
*/
public void closeExporter(QueryInsightsExporter exporter) throws IOException {
if (exporter != null) {
exporter.close();
this.exporters.remove(exporter);
}
}

/**
* Close all exporters
*
*/
public void closeAllExporters() {
for (QueryInsightsExporter exporter : exporters) {
try {
closeExporter(exporter);
} catch (IOException e) {
logger.error("Fail to close query insights exporter, error: ", e);
}
}
}
}
Loading

0 comments on commit 0ddf4bd

Please sign in to comment.