Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly require a OriginSettingClient in ML results iterators #50802

Merged
merged 3 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -46,7 +47,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del

private final ThreadPool threadPool;
private final String executor;
private final Client client;
private final OriginSettingClient client;
private final ClusterService clusterService;
private final Clock clock;

Expand All @@ -62,7 +63,7 @@ public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
this.threadPool = threadPool;
this.executor = executor;
this.client = ClientHelper.clientWithOrigin(client, ClientHelper.ML_ORIGIN);
this.client = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
this.clusterService = clusterService;
this.clock = clock;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -22,7 +22,7 @@

class BatchedBucketsIterator extends BatchedResultsIterator<Bucket> {

BatchedBucketsIterator(Client client, String jobId) {
BatchedBucketsIterator(OriginSettingClient client, String jobId) {
super(client, jobId, Bucket.RESULT_TYPE_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -21,7 +21,7 @@
import java.io.InputStream;

class BatchedInfluencersIterator extends BatchedResultsIterator<Influencer> {
BatchedInfluencersIterator(Client client, String jobId) {
BatchedInfluencersIterator(OriginSettingClient client, String jobId) {
super(client, jobId, Influencer.RESULT_TYPE_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -23,7 +23,7 @@

public class BatchedJobsIterator extends BatchedDocumentsIterator<Job.Builder> {

public BatchedJobsIterator(Client client, String index) {
public BatchedJobsIterator(OriginSettingClient client, String index) {
super(client, index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -22,7 +22,7 @@

class BatchedRecordsIterator extends BatchedResultsIterator<AnomalyRecord> {

BatchedRecordsIterator(Client client, String jobId) {
BatchedRecordsIterator(OriginSettingClient client, String jobId) {
super(client, jobId, AnomalyRecord.RESULT_TYPE_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
Expand All @@ -16,7 +16,7 @@ public abstract class BatchedResultsIterator<T> extends BatchedDocumentsIterator

private final ResultsFilterBuilder filterBuilder;

public BatchedResultsIterator(Client client, String jobId, String resultType) {
public BatchedResultsIterator(OriginSettingClient client, String jobId, String resultType) {
super(client, AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
this.filterBuilder = new ResultsFilterBuilder(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
Expand All @@ -16,7 +16,7 @@
*/
public class BatchedStateDocIdsIterator extends BatchedDocumentsIterator<String> {

public BatchedStateDocIdsIterator(Client client, String index) {
public BatchedStateDocIdsIterator(OriginSettingClient client, String index) {
super(client, index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -130,7 +131,6 @@
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.clientWithOrigin;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class JobResultsProvider {
Expand Down Expand Up @@ -715,7 +715,7 @@ private void expandBuckets(String jobId, BucketsQueryBuilder query, QueryPage<Bu
* @return a bucket {@link BatchedResultsIterator}
*/
public BatchedResultsIterator<Bucket> newBatchedBucketsIterator(String jobId) {
return new BatchedBucketsIterator(clientWithOrigin(client, ML_ORIGIN), jobId);
return new BatchedBucketsIterator(new OriginSettingClient(client, ML_ORIGIN), jobId);
}

/**
Expand All @@ -727,7 +727,7 @@ public BatchedResultsIterator<Bucket> newBatchedBucketsIterator(String jobId) {
* @return a record {@link BatchedResultsIterator}
*/
public BatchedResultsIterator<AnomalyRecord> newBatchedRecordsIterator(String jobId) {
return new BatchedRecordsIterator(clientWithOrigin(client, ML_ORIGIN), jobId);
return new BatchedRecordsIterator(new OriginSettingClient(client, ML_ORIGIN), jobId);
}

/**
Expand Down Expand Up @@ -924,7 +924,7 @@ public void influencers(String jobId, InfluencersQuery query, Consumer<QueryPage
* @return an influencer {@link BatchedResultsIterator}
*/
public BatchedResultsIterator<Influencer> newBatchedInfluencersIterator(String jobId) {
return new BatchedInfluencersIterator(clientWithOrigin(client, ML_ORIGIN), jobId);
return new BatchedInfluencersIterator(new OriginSettingClient(client, ML_ORIGIN), jobId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.retention;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -34,9 +34,9 @@
*/
abstract class AbstractExpiredJobDataRemover implements MlDataRemover {

private final Client client;
private final OriginSettingClient client;

AbstractExpiredJobDataRemover(Client client) {
AbstractExpiredJobDataRemover(OriginSettingClient client) {
this.client = client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -62,11 +62,11 @@ public class ExpiredForecastsRemover implements MlDataRemover {
private static final int MAX_FORECASTS = 10000;
private static final String RESULTS_INDEX_PATTERN = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*";

private final Client client;
private final OriginSettingClient client;
private final ThreadPool threadPool;
private final long cutoffEpochMs;

public ExpiredForecastsRemover(Client client, ThreadPool threadPool) {
public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool) {
this.client = Objects.requireNonNull(client);
this.threadPool = Objects.requireNonNull(threadPool);
this.cutoffEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
Expand Down Expand Up @@ -55,10 +55,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
*/
private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000;

private final Client client;
private final OriginSettingClient client;
private final ThreadPool threadPool;

public ExpiredModelSnapshotsRemover(Client client, ThreadPool threadPool) {
public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threadPool) {
super(client);
this.client = Objects.requireNonNull(client);
this.threadPool = Objects.requireNonNull(threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
Expand Down Expand Up @@ -46,10 +46,10 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {

private static final Logger LOGGER = LogManager.getLogger(ExpiredResultsRemover.class);

private final Client client;
private final OriginSettingClient client;
private final AnomalyDetectionAuditor auditor;

public ExpiredResultsRemover(Client client, AnomalyDetectionAuditor auditor) {
public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor) {
super(client);
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -49,10 +49,10 @@ public class UnusedStateRemover implements MlDataRemover {

private static final Logger LOGGER = LogManager.getLogger(UnusedStateRemover.class);

private final Client client;
private final OriginSettingClient client;
private final ClusterService clusterService;

public UnusedStateRemover(Client client, ClusterService clusterService) {
public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -34,14 +34,14 @@ public abstract class BatchedDocumentsIterator<T> {
private static final String CONTEXT_ALIVE_DURATION = "5m";
private static final int BATCH_SIZE = 10000;

private final Client client;
private final OriginSettingClient client;
private final String index;
private volatile long count;
private volatile long totalHits;
private volatile String scrollId;
private volatile boolean isScrollInitialised;

protected BatchedDocumentsIterator(Client client, String index) {
protected BatchedDocumentsIterator(OriginSettingClient client, String index) {
this.client = Objects.requireNonNull(client);
this.index = Objects.requireNonNull(index);
this.totalHits = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.ml.utils.persistence;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;

Expand All @@ -18,7 +18,7 @@ public class DocIdBatchedDocumentIterator extends BatchedDocumentsIterator<Strin

private final QueryBuilder query;

public DocIdBatchedDocumentIterator(Client client, String index, QueryBuilder query) {
public DocIdBatchedDocumentIterator(OriginSettingClient client, String index, QueryBuilder query) {
super(client, index);
this.query = Objects.requireNonNull(query);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;

import java.util.Deque;
import java.util.List;
Expand All @@ -25,7 +27,7 @@ public class MockBatchedDocumentsIterator<T> extends BatchedResultsIterator<T> {
private Boolean requireIncludeInterim;

public MockBatchedDocumentsIterator(List<Deque<Result<T>>> batches, String resultType) {
super(mock(Client.class), "foo", resultType);
super(MockOriginSettingClient.mockOriginSettingClient(mock(Client.class), ClientHelper.ML_ORIGIN), "foo", resultType);
this.batches = batches;
index = 0;
wasTimeRangeCalled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public class ScoresUpdaterTests extends ESTestCase {
private Job job;
private ScoresUpdater scoresUpdater;

private Bucket generateBucket(Date timestamp) throws IOException {
private Bucket generateBucket(Date timestamp) {
return new Bucket(JOB_ID, timestamp, DEFAULT_BUCKET_SPAN);
}

@Before
public void setUpMocks() throws IOException {
public void setUpMocks() {
MockitoAnnotations.initMocks(this);

Job.Builder jobBuilder = new Job.Builder(JOB_ID);
Expand Down
Loading