Skip to content

Commit

Permalink
Explicitly require a OriginSettingClient in ML results iterators (ela…
Browse files Browse the repository at this point in the history
…stic#50802)

In classes where the client is used directly rather than through a call to 
executeAsyncWithOrigin explicitly require the client to be OriginSettingClient 
rather than using the Client interface. 

Also remove calls to deprecated ClientHelper.clientWithOrigin() method.
  • Loading branch information
davidkyle authored and SivagurunathanV committed Jan 21, 2020
1 parent 91d7e32 commit 500aaab
Show file tree
Hide file tree
Showing 22 changed files with 229 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -46,7 +47,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del

private final ThreadPool threadPool;
private final String executor;
private final Client client;
private final OriginSettingClient client;
private final ClusterService clusterService;
private final Clock clock;

Expand All @@ -62,7 +63,7 @@ public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
this.threadPool = threadPool;
this.executor = executor;
this.client = ClientHelper.clientWithOrigin(client, ClientHelper.ML_ORIGIN);
this.client = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
this.clusterService = clusterService;
this.clock = clock;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -22,7 +22,7 @@

class BatchedBucketsIterator extends BatchedResultsIterator<Bucket> {

BatchedBucketsIterator(Client client, String jobId) {
BatchedBucketsIterator(OriginSettingClient client, String jobId) {
super(client, jobId, Bucket.RESULT_TYPE_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -21,7 +21,7 @@
import java.io.InputStream;

class BatchedInfluencersIterator extends BatchedResultsIterator<Influencer> {
BatchedInfluencersIterator(Client client, String jobId) {
BatchedInfluencersIterator(OriginSettingClient client, String jobId) {
super(client, jobId, Influencer.RESULT_TYPE_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -23,7 +23,7 @@

public class BatchedJobsIterator extends BatchedDocumentsIterator<Job.Builder> {

public BatchedJobsIterator(Client client, String index) {
public BatchedJobsIterator(OriginSettingClient client, String index) {
super(client, index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -22,7 +22,7 @@

class BatchedRecordsIterator extends BatchedResultsIterator<AnomalyRecord> {

BatchedRecordsIterator(Client client, String jobId) {
BatchedRecordsIterator(OriginSettingClient client, String jobId) {
super(client, jobId, AnomalyRecord.RESULT_TYPE_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
Expand All @@ -16,7 +16,7 @@ public abstract class BatchedResultsIterator<T> extends BatchedDocumentsIterator

private final ResultsFilterBuilder filterBuilder;

public BatchedResultsIterator(Client client, String jobId, String resultType) {
public BatchedResultsIterator(OriginSettingClient client, String jobId, String resultType) {
super(client, AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
this.filterBuilder = new ResultsFilterBuilder(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
Expand All @@ -16,7 +16,7 @@
*/
public class BatchedStateDocIdsIterator extends BatchedDocumentsIterator<String> {

public BatchedStateDocIdsIterator(Client client, String index) {
public BatchedStateDocIdsIterator(OriginSettingClient client, String index) {
super(client, index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -130,7 +131,6 @@
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.clientWithOrigin;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class JobResultsProvider {
Expand Down Expand Up @@ -715,7 +715,7 @@ private void expandBuckets(String jobId, BucketsQueryBuilder query, QueryPage<Bu
* @return a bucket {@link BatchedResultsIterator}
*/
public BatchedResultsIterator<Bucket> newBatchedBucketsIterator(String jobId) {
return new BatchedBucketsIterator(clientWithOrigin(client, ML_ORIGIN), jobId);
return new BatchedBucketsIterator(new OriginSettingClient(client, ML_ORIGIN), jobId);
}

/**
Expand All @@ -727,7 +727,7 @@ public BatchedResultsIterator<Bucket> newBatchedBucketsIterator(String jobId) {
* @return a record {@link BatchedResultsIterator}
*/
public BatchedResultsIterator<AnomalyRecord> newBatchedRecordsIterator(String jobId) {
return new BatchedRecordsIterator(clientWithOrigin(client, ML_ORIGIN), jobId);
return new BatchedRecordsIterator(new OriginSettingClient(client, ML_ORIGIN), jobId);
}

/**
Expand Down Expand Up @@ -924,7 +924,7 @@ public void influencers(String jobId, InfluencersQuery query, Consumer<QueryPage
* @return an influencer {@link BatchedResultsIterator}
*/
public BatchedResultsIterator<Influencer> newBatchedInfluencersIterator(String jobId) {
return new BatchedInfluencersIterator(clientWithOrigin(client, ML_ORIGIN), jobId);
return new BatchedInfluencersIterator(new OriginSettingClient(client, ML_ORIGIN), jobId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.retention;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -34,9 +34,9 @@
*/
abstract class AbstractExpiredJobDataRemover implements MlDataRemover {

private final Client client;
private final OriginSettingClient client;

AbstractExpiredJobDataRemover(Client client) {
AbstractExpiredJobDataRemover(OriginSettingClient client) {
this.client = client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -62,11 +62,11 @@ public class ExpiredForecastsRemover implements MlDataRemover {
private static final int MAX_FORECASTS = 10000;
private static final String RESULTS_INDEX_PATTERN = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*";

private final Client client;
private final OriginSettingClient client;
private final ThreadPool threadPool;
private final long cutoffEpochMs;

public ExpiredForecastsRemover(Client client, ThreadPool threadPool) {
public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool) {
this.client = Objects.requireNonNull(client);
this.threadPool = Objects.requireNonNull(threadPool);
this.cutoffEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
Expand Down Expand Up @@ -55,10 +55,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
*/
private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000;

private final Client client;
private final OriginSettingClient client;
private final ThreadPool threadPool;

public ExpiredModelSnapshotsRemover(Client client, ThreadPool threadPool) {
public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threadPool) {
super(client);
this.client = Objects.requireNonNull(client);
this.threadPool = Objects.requireNonNull(threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
Expand Down Expand Up @@ -46,10 +46,10 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {

private static final Logger LOGGER = LogManager.getLogger(ExpiredResultsRemover.class);

private final Client client;
private final OriginSettingClient client;
private final AnomalyDetectionAuditor auditor;

public ExpiredResultsRemover(Client client, AnomalyDetectionAuditor auditor) {
public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor) {
super(client);
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -49,10 +49,10 @@ public class UnusedStateRemover implements MlDataRemover {

private static final Logger LOGGER = LogManager.getLogger(UnusedStateRemover.class);

private final Client client;
private final OriginSettingClient client;
private final ClusterService clusterService;

public UnusedStateRemover(Client client, ClusterService clusterService) {
public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -34,14 +34,14 @@ public abstract class BatchedDocumentsIterator<T> {
private static final String CONTEXT_ALIVE_DURATION = "5m";
private static final int BATCH_SIZE = 10000;

private final Client client;
private final OriginSettingClient client;
private final String index;
private volatile long count;
private volatile long totalHits;
private volatile String scrollId;
private volatile boolean isScrollInitialised;

protected BatchedDocumentsIterator(Client client, String index) {
protected BatchedDocumentsIterator(OriginSettingClient client, String index) {
this.client = Objects.requireNonNull(client);
this.index = Objects.requireNonNull(index);
this.totalHits = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.ml.utils.persistence;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;

Expand All @@ -18,7 +18,7 @@ public class DocIdBatchedDocumentIterator extends BatchedDocumentsIterator<Strin

private final QueryBuilder query;

public DocIdBatchedDocumentIterator(Client client, String index, QueryBuilder query) {
public DocIdBatchedDocumentIterator(OriginSettingClient client, String index, QueryBuilder query) {
super(client, index);
this.query = Objects.requireNonNull(query);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;

import java.util.Deque;
import java.util.List;
Expand All @@ -25,7 +27,7 @@ public class MockBatchedDocumentsIterator<T> extends BatchedResultsIterator<T> {
private Boolean requireIncludeInterim;

public MockBatchedDocumentsIterator(List<Deque<Result<T>>> batches, String resultType) {
super(mock(Client.class), "foo", resultType);
super(MockOriginSettingClient.mockOriginSettingClient(mock(Client.class), ClientHelper.ML_ORIGIN), "foo", resultType);
this.batches = batches;
index = 0;
wasTimeRangeCalled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public class ScoresUpdaterTests extends ESTestCase {
private Job job;
private ScoresUpdater scoresUpdater;

private Bucket generateBucket(Date timestamp) throws IOException {
private Bucket generateBucket(Date timestamp) {
return new Bucket(JOB_ID, timestamp, DEFAULT_BUCKET_SPAN);
}

@Before
public void setUpMocks() throws IOException {
public void setUpMocks() {
MockitoAnnotations.initMocks(this);

Job.Builder jobBuilder = new Job.Builder(JOB_ID);
Expand Down
Loading

0 comments on commit 500aaab

Please sign in to comment.