Skip to content

Commit

Permalink
Changes to make security granular for PIT Ids for delete and get pits…
Browse files Browse the repository at this point in the history
… operation

Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Aug 28, 2022
1 parent 2e97a58 commit 7796ace
Show file tree
Hide file tree
Showing 16 changed files with 284 additions and 142 deletions.
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,14 @@
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.NodesGetAllPitsAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportNodesGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -676,6 +678,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
actions.register(NodesGetAllPitsAction.INSTANCE, TransportNodesGetAllPitsAction.class);

// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
public class DeletePitAction extends ActionType<DeletePitResponse> {

public static final DeletePitAction INSTANCE = new DeletePitAction();
public static final String NAME = "cluster:admin/point_in_time/delete";
public static final String NAME = "indices:data/read/point_in_time/delete";

private DeletePitAction() {
super(NAME, DeletePitResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public DeletePitRequest(List<String> pitIds) {
this.pitIds.addAll(pitIds);
}

public void clearAndSetPitIds(List<String> pitIds) {
this.pitIds.clear();
this.pitIds.addAll(pitIds);
}

public DeletePitRequest() {}

public List<String> getPitIds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@
*/
public class GetAllPitNodesRequest extends BaseNodesRequest<GetAllPitNodesRequest> {

private GetAllPitNodesResponse getAllPitNodesResponse;

@Inject
public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) {
super(concreteNodes);
}

public void setGetAllPitNodesResponse(GetAllPitNodesResponse getAllPitNodesResponse) {
this.getAllPitNodesResponse = getAllPitNodesResponse;
}

public GetAllPitNodesResponse getGetAllPitNodesResponse() {
return getAllPitNodesResponse;
}

public GetAllPitNodesRequest(StreamInput in) throws IOException {
super(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ public void writeNodesTo(StreamOutput out, List<GetAllPitNodeResponse> nodes) th
public List<ListPitInfo> getPitInfos() {
return Collections.unmodifiableList(new ArrayList<>(pitInfos));
}

public void clearAndSetPitInfos(List<ListPitInfo> listPitInfos) {
pitInfos.clear();
pitInfos.addAll(listPitInfos);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

/**
* Action type for retrieving all PIT reader contexts from nodes
*/
public class NodesGetAllPitsAction extends ActionType<GetAllPitNodesResponse> {
public static final NodesGetAllPitsAction INSTANCE = new NodesGetAllPitsAction();
public static final String NAME = "cluster:admin/point_in_time/read_from_nodes";

private NodesGetAllPitsAction() {
super(NAME, GetAllPitNodesResponse::new);
}
}
23 changes: 21 additions & 2 deletions server/src/main/java/org/opensearch/action/search/PitService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
Expand Down Expand Up @@ -47,12 +48,19 @@ public class PitService {
private final ClusterService clusterService;
private final SearchTransportService searchTransportService;
private final TransportService transportService;
private final NodeClient nodeClient;

@Inject
public PitService(ClusterService clusterService, SearchTransportService searchTransportService, TransportService transportService) {
public PitService(
ClusterService clusterService,
SearchTransportService searchTransportService,
TransportService transportService,
NodeClient nodeClient
) {
this.clusterService = clusterService;
this.searchTransportService = searchTransportService;
this.transportService = transportService;
this.nodeClient = nodeClient;
}

/**
Expand Down Expand Up @@ -144,6 +152,17 @@ public void onFailure(final Exception e) {
}, size);
}

/**
* This method returns indices associated for each pit
*/
public Map<String, String[]> getIndicesForPits(List<String> pitIds) {
Map<String, String[]> pitToIndicesMap = new HashMap<>();
for (String pitId : pitIds) {
pitToIndicesMap.put(pitId, SearchContextId.decode(nodeClient.getNamedWriteableRegistry(), pitId).getActualIndices());
}
return pitToIndicesMap;
}

/**
* Get all active point in time contexts
*/
Expand All @@ -156,7 +175,7 @@ public void getAllPits(ActionListener<GetAllPitNodesResponse> getAllPitsListener
DiscoveryNode[] disNodesArr = nodes.toArray(new DiscoveryNode[nodes.size()]);
transportService.sendRequest(
transportService.getLocalNode(),
GetAllPitsAction.NAME,
NodesGetAllPitsAction.NAME,
new GetAllPitNodesRequest(disNodesArr),
new TransportResponseHandler<GetAllPitNodesResponse>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,79 +8,28 @@

package org.opensearch.action.search;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.search.SearchService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;

/**
* Transport action to get all active PIT contexts across all nodes
*/
public class TransportGetAllPitsAction extends TransportNodesAction<
GetAllPitNodesRequest,
GetAllPitNodesResponse,
GetAllPitNodeRequest,
GetAllPitNodeResponse> {
private final SearchService searchService;
public class TransportGetAllPitsAction extends HandledTransportAction<GetAllPitNodesRequest, GetAllPitNodesResponse> {
private final PitService pitService;

@Inject
public TransportGetAllPitsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
SearchService searchService
) {
super(
GetAllPitsAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
GetAllPitNodesRequest::new,
GetAllPitNodeRequest::new,
ThreadPool.Names.SAME,
GetAllPitNodeResponse.class
);
this.searchService = searchService;
}

@Override
protected GetAllPitNodesResponse newResponse(
GetAllPitNodesRequest request,
List<GetAllPitNodeResponse> getAllPitNodeRespons,
List<FailedNodeException> failures
) {
return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeRespons, failures);
}

@Override
protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) {
return new GetAllPitNodeRequest();
}

@Override
protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException {
return new GetAllPitNodeResponse(in);
public TransportGetAllPitsAction(ActionFilters actionFilters, TransportService transportService, PitService pitService) {
super(GetAllPitsAction.NAME, transportService, actionFilters, in -> new GetAllPitNodesRequest(in));
this.pitService = pitService;
}

/**
* This retrieves all active PITs in the node
*/
@Override
protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) {
GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse(
transportService.getLocalNode(),
searchService.getAllPITReaderContexts()
);
return nodeResponse;
protected void doExecute(Task task, GetAllPitNodesRequest request, ActionListener<GetAllPitNodesResponse> listener) {
// If security plugin intercepts the request, it'll replace all PIT IDs with permitted PIT IDs
if (request.getGetAllPitNodesResponse() != null) {
listener.onResponse(request.getGetAllPitNodesResponse());
} else {
pitService.getAllPits(listener);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.search.SearchService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;

/**
* Transport action to get all active PIT contexts across all nodes
*/
public class TransportNodesGetAllPitsAction extends TransportNodesAction<
GetAllPitNodesRequest,
GetAllPitNodesResponse,
GetAllPitNodeRequest,
GetAllPitNodeResponse> {
private final SearchService searchService;

@Inject
public TransportNodesGetAllPitsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
SearchService searchService
) {
super(
NodesGetAllPitsAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
GetAllPitNodesRequest::new,
GetAllPitNodeRequest::new,
ThreadPool.Names.SAME,
GetAllPitNodeResponse.class
);
this.searchService = searchService;
}

@Override
protected GetAllPitNodesResponse newResponse(
GetAllPitNodesRequest request,
List<GetAllPitNodeResponse> getAllPitNodeRespons,
List<FailedNodeException> failures
) {
return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeRespons, failures);
}

@Override
protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) {
return new GetAllPitNodeRequest();
}

@Override
protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException {
return new GetAllPitNodeResponse(in);
}

/**
* This retrieves all active PITs in the node
*/
@Override
protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) {
GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse(
transportService.getLocalNode(),
searchService.getAllPITReaderContexts()
);
return nodeResponse;
}
}
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchRequestBuilder;
import org.opensearch.action.search.MultiSearchResponse;
Expand Down Expand Up @@ -334,6 +336,11 @@ public interface Client extends OpenSearchClient, Releasable {
*/
void createPit(CreatePitRequest createPITRequest, ActionListener<CreatePitResponse> listener);

/**
* Delete one or more point in time contexts
*/
void deletePits(DeletePitRequest deletePITRequest, ActionListener<DeletePitResponse> listener);

/**
* Get information of segments of one or more PITs
*/
Expand Down
12 changes: 0 additions & 12 deletions server/src/main/java/org/opensearch/client/ClusterAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@
import org.opensearch.action.ingest.SimulatePipelineRequest;
import org.opensearch.action.ingest.SimulatePipelineRequestBuilder;
import org.opensearch.action.ingest.SimulatePipelineResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -793,14 +791,4 @@ public interface ClusterAdminClient extends OpenSearchClient {
* Delete specified dangling indices.
*/
ActionFuture<AcknowledgedResponse> deleteDanglingIndex(DeleteDanglingIndexRequest request);

/**
* Delete point in time searches present in cluster
*/
void deletePits(DeletePitRequest request, ActionListener<DeletePitResponse> listener);

/**
* Delete point in time searches present in cluster
*/
ActionFuture<DeletePitResponse> deletePits(DeletePitRequest request);
}
Loading

0 comments on commit 7796ace

Please sign in to comment.