Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/extensions] Get namedXContentRegistry from ExtensionsRunner #725

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
@@ -1,51 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ad;

import static java.util.Collections.unmodifiableList;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.rest.RestCreateDetectorAction;
import org.opensearch.ad.rest.RestGetDetectorAction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.sdk.Extension;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.sdk.BaseExtension;
import org.opensearch.sdk.ExtensionRestHandler;
import org.opensearch.sdk.ExtensionSettings;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient;
import org.opensearch.threadpool.ThreadPool;

import com.google.common.collect.ImmutableList;

public class AnomalyDetectorExtension implements Extension {
public class AnomalyDetectorExtension extends BaseExtension {

private static final String EXTENSION_SETTINGS_PATH = "/ad-extension.yml";

private ExtensionSettings settings;

public AnomalyDetectorExtension() {
try {
this.settings = initializeSettings();
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}

@Override
public ExtensionSettings getExtensionSettings() {
return this.settings;
super(EXTENSION_SETTINGS_PATH);
}

@Override
public List<ExtensionRestHandler> getExtensionRestHandlers() {
return List.of(new RestCreateDetectorAction(), new RestGetDetectorAction());
return List.of(new RestCreateDetectorAction(extensionsRunner, this), new RestGetDetectorAction());
}

@Override
Expand Down Expand Up @@ -90,22 +88,20 @@ public List<Setting<?>> getSettings() {
}

@Override
public Collection<Object> createComponents(SDKClient sdkClient, ClusterService clusterService, ThreadPool threadPool) {
return null;
}

private static ExtensionSettings initializeSettings() throws IOException {
ExtensionSettings settings = Extension.readSettingsFromYaml(EXTENSION_SETTINGS_PATH);
if (settings == null || settings.getHostAddress() == null || settings.getHostPort() == null) {
throw new IOException("Failed to initialize Extension settings. No port bound.");
}
return settings;
public List<NamedXContentRegistry.Entry> getNamedXContent() {
// Copied from AnomalyDetectorPlugin getNamedXContent
return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, AnomalyResult.XCONTENT_REGISTRY, DetectorInternalState.XCONTENT_REGISTRY
// Pending Job Scheduler Integration
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
// AnomalyDetectorJob.XCONTENT_REGISTRY
);
}

// TODO: replace or override client object on BaseExtension
// https://github.com/opensearch-project/opensearch-sdk-java/issues/160
public OpenSearchClient getClient() {
SDKClient sdkClient = new SDKClient();
OpenSearchClient client = sdkClient
.initializeClient(settings.getOpensearchAddress(), Integer.parseInt(settings.getOpensearchPort()));
.initializeClient(getExtensionSettings().getOpensearchAddress(), Integer.parseInt(getExtensionSettings().getOpensearchPort()));
return client;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ad.rest;

import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTORS_INDEX_MAPPING_FILE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestRequest.Method.POST;
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.rest.RestStatus.CREATED;
import static org.opensearch.rest.RestStatus.NOT_FOUND;
import static org.opensearch.rest.RestStatus.OK;

import java.io.*;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
Expand All @@ -21,8 +31,6 @@
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
Expand All @@ -37,29 +45,25 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.extensions.rest.ExtensionRestRequest;
import org.opensearch.extensions.rest.ExtensionRestResponse;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.NestedQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.rest.RestHandler.Route;
import org.opensearch.rest.RestRequest.Method;
import org.opensearch.rest.RestStatus;
import org.opensearch.sdk.ExtensionRestHandler;
import org.opensearch.search.aggregations.BaseAggregationBuilder;
import org.opensearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.opensearch.search.aggregations.bucket.filter.InternalFilter;
import org.opensearch.search.aggregations.metrics.InternalSum;
import org.opensearch.search.aggregations.metrics.SumAggregationBuilder;
import org.opensearch.sdk.ExtensionsRunner;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import jakarta.json.stream.JsonParser;

public class RestCreateDetectorAction implements ExtensionRestHandler {
private final Logger logger = LogManager.getLogger(RestCreateDetectorAction.class);
private AnomalyDetectorExtension anomalyDetectorExtension = new AnomalyDetectorExtension();
private OpenSearchClient sdkClient = anomalyDetectorExtension.getClient();
private static final Logger logger = LogManager.getLogger(RestCreateDetectorAction.class);

private final OpenSearchClient sdkClient;
private final NamedXContentRegistry xContentRegistry;

public RestCreateDetectorAction(ExtensionsRunner runner, AnomalyDetectorExtension extension) {
this.xContentRegistry = runner.getNamedXContentRegistry().getRegistry();
this.sdkClient = extension.getClient();
}

@Override
public List<Route> routes() {
Expand Down Expand Up @@ -101,56 +105,6 @@ private IndexResponse indexAnomalyDetector(AnomalyDetector anomalyDetector) thro

}

public List<NamedXContentRegistry.Entry> getNamedXWriteables() {
List<NamedXContentRegistry.Entry> entries = new ArrayList<>();
entries.add(AnomalyDetector.XCONTENT_REGISTRY);
entries.add(AnomalyResult.XCONTENT_REGISTRY);
entries.add(DetectorInternalState.XCONTENT_REGISTRY);
entries
.add(
registerQuery(
new SearchPlugin.QuerySpec<>(NestedQueryBuilder.NAME, NestedQueryBuilder::new, NestedQueryBuilder::fromXContent)
)
);
entries
.add(registerQuery(new SearchPlugin.QuerySpec<>(BoolQueryBuilder.NAME, BoolQueryBuilder::new, BoolQueryBuilder::fromXContent)));
entries
.add(
registerAggregation(
new SearchPlugin.AggregationSpec(SumAggregationBuilder.NAME, SumAggregationBuilder::new, SumAggregationBuilder.PARSER)
.addResultReader(InternalSum::new)
)
);

entries
.add(
registerAggregation(
new SearchPlugin.AggregationSpec(
FilterAggregationBuilder.NAME,
FilterAggregationBuilder::new,
FilterAggregationBuilder::parse
).addResultReader(InternalFilter::new)
)
);
entries
.add(
registerQuery(new SearchPlugin.QuerySpec<>(RangeQueryBuilder.NAME, RangeQueryBuilder::new, RangeQueryBuilder::fromXContent))
);
return entries;

}

private NamedXContentRegistry.Entry registerQuery(SearchPlugin.QuerySpec<?> spec) {
return new NamedXContentRegistry.Entry(QueryBuilder.class, spec.getName(), (p, c) -> spec.getParser().fromXContent(p));
}

private NamedXContentRegistry.Entry registerAggregation(SearchPlugin.AggregationSpec spec) {
return new NamedXContentRegistry.Entry(BaseAggregationBuilder.class, spec.getName(), (p, c) -> {
String name = (String) c;
return spec.getParser().parse(p, name);
});
}

private CreateIndexRequest initAnomalyDetectorIndex() {
JsonpMapper mapper = sdkClient._transport().jsonpMapper();
JsonParser parser = null;
Expand Down Expand Up @@ -190,13 +144,12 @@ public ExtensionRestResponse handleRequest(ExtensionRestRequest request) {
);
}

NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(getNamedXWriteables());
XContentParser parser;
AnomalyDetector detector;
XContentBuilder builder = null;
CreateIndexRequest createIndexRequest;
try {
parser = request.contentParser(xContentRegistry);
parser = request.contentParser(this.xContentRegistry);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
detector = AnomalyDetector.parse(parser);
createIndexRequest = initAnomalyDetectorIndex();
Expand All @@ -211,7 +164,7 @@ public ExtensionRestResponse handleRequest(ExtensionRestRequest request) {
builder.field("seqNo", indexResponse.seqNo());
builder.field("primaryTerm", indexResponse.primaryTerm());
builder.field("detector", detector);
builder.field("status", RestStatus.CREATED);
builder.field("status", CREATED);
builder.endObject();
} catch (IOException e) {
e.printStackTrace();
Expand Down