Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[admin-tool] Add gRPC endpoint for updateAdminOperationProtocolVersion #1535

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3279,7 +3279,8 @@ private static void updateAdminOperationProtocolVersion(CommandLine cmd) throws
getRequiredArgument(cmd, Arg.ADMIN_OPERATION_PROTOCOL_VERSION, Command.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION);
long protocolVersion =
Utils.parseLongFromString(protocolVersionInString, Arg.ADMIN_OPERATION_PROTOCOL_VERSION.name());
ControllerResponse response = controllerClient.updateAdminOperationProtocolVersion(clusterName, protocolVersion);
AdminTopicMetadataResponse response =
controllerClient.updateAdminOperationProtocolVersion(clusterName, protocolVersion);
printObject(response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,7 @@ public AdminTopicMetadataResponse getAdminTopicMetadata(Optional<String> storeNa
return request(ControllerRoute.GET_ADMIN_TOPIC_METADATA, params, AdminTopicMetadataResponse.class);
}

public ControllerResponse updateAdminTopicMetadata(
public AdminTopicMetadataResponse updateAdminTopicMetadata(
long executionId,
Optional<String> storeName,
Optional<Long> offset,
Expand All @@ -1360,15 +1360,15 @@ public ControllerResponse updateAdminTopicMetadata(
.add(NAME, storeName)
.add(OFFSET, offset)
.add(UPSTREAM_OFFSET, upstreamOffset);
return request(ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA, params, ControllerResponse.class);
return request(ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA, params, AdminTopicMetadataResponse.class);
}

public ControllerResponse updateAdminOperationProtocolVersion(
public AdminTopicMetadataResponse updateAdminOperationProtocolVersion(
String clusterName,
Long adminOperationProtocolVersion) {
QueryParams params =
newParams().add(CLUSTER, clusterName).add(ADMIN_OPERATION_PROTOCOL_VERSION, adminOperationProtocolVersion);
return request(ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION, params, ControllerResponse.class);
return request(ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION, params, AdminTopicMetadataResponse.class);
}

public ControllerResponse deleteKafkaTopic(String topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.venice.protocols.controller.ClusterAdminOpsGrpcServiceGrpc;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import io.grpc.Context;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -88,4 +89,17 @@ public void updateAdminTopicMetadata(
return requestHandler.updateAdminTopicMetadata(request);
}, responseObserver, metadata.getClusterName(), metadata.hasStoreName() ? metadata.getStoreName() : null);
}

@Override
public void updateAdminOperationProtocolVersion(
UpdateAdminOperationProtocolVersionGrpcRequest request,
StreamObserver<AdminTopicMetadataGrpcResponse> responseObserver) {
LOGGER.debug("Received updateAdminOperationProtocolVersion request: {}", request);
ControllerGrpcServerUtils.handleRequest(
ClusterAdminOpsGrpcServiceGrpc.getUpdateAdminOperationProtocolVersionMethod(),
() -> requestHandler.updateAdminOperationProtocolVersion(request),
responseObserver,
request.getClusterName(),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,8 @@ public boolean startInner() throws Exception {
UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getPath(),
new VeniceParentControllerRegionStateHandler(
admin,
adminTopicMetadataRoutes.updateAdminOperationProtocolVersion(admin)));
adminTopicMetadataRoutes
.updateAdminOperationProtocolVersion(admin, requestHandler.getClusterAdminOpsRequestHandler())));
httpService.post(
DELETE_KAFKA_TOPIC.getPath(),
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.deleteKafkaTopic(admin)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.venice.protocols.controller.AdminTopicGrpcMetadata;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import java.util.Optional;
import org.apache.http.HttpStatus;
Expand Down Expand Up @@ -105,7 +106,7 @@ public Route updateAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler
};
}

public Route updateAdminOperationProtocolVersion(Admin admin) {
public Route updateAdminOperationProtocolVersion(Admin admin, ClusterAdminOpsRequestHandler requestHandler) {
return (request, response) -> {
AdminTopicMetadataResponse responseObject = new AdminTopicMetadataResponse();
response.type(HttpConstants.JSON);
Expand All @@ -120,11 +121,15 @@ public Route updateAdminOperationProtocolVersion(Admin admin) {
AdminSparkServer.validateParams(request, UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getParams(), admin);
String clusterName = request.queryParams(CLUSTER);
Long adminOperationProtocolVersion = Long.parseLong(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION));
AdminTopicMetadataGrpcResponse internalResponse = requestHandler.updateAdminOperationProtocolVersion(
UpdateAdminOperationProtocolVersionGrpcRequest.newBuilder()
.setClusterName(clusterName)
.setAdminOperationProtocolVersion(adminOperationProtocolVersion)
.build());

responseObject.setCluster(clusterName);
responseObject.setAdminOperationProtocolVersion(adminOperationProtocolVersion);

admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion);
responseObject.setCluster(internalResponse.getMetadata().getClusterName());
responseObject
.setAdminOperationProtocolVersion(internalResponse.getMetadata().getAdminOperationProtocolVersion());
} catch (Throwable e) {
responseObject.setError(e);
AdminSparkServer.handleError(new VeniceException(e), request, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.utils.Pair;
import java.util.Map;
Expand Down Expand Up @@ -148,4 +149,24 @@ public AdminTopicMetadataGrpcResponse updateAdminTopicMetadata(UpdateAdminTopicM
AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build());
return responseBuilder.build();
}

public AdminTopicMetadataGrpcResponse updateAdminOperationProtocolVersion(
UpdateAdminOperationProtocolVersionGrpcRequest request) {
String clusterName = request.getClusterName();
long adminOperationProtocolVersion = request.getAdminOperationProtocolVersion();
ControllerRequestParamValidator
.validateAdminOperationProtocolVersionRequest(clusterName, adminOperationProtocolVersion);

LOGGER.info(
"Updating admin operation protocol version for cluster: {} to version: {}",
clusterName,
adminOperationProtocolVersion);

admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion);

AdminTopicGrpcMetadata.Builder adminMetadataBuilder = AdminTopicGrpcMetadata.newBuilder()
.setClusterName(clusterName)
.setAdminOperationProtocolVersion(adminOperationProtocolVersion);
return AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminMetadataBuilder.build()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,14 @@ public static void validateAdminCommandExecutionRequest(String clusterName, long
throw new IllegalArgumentException("Admin command execution id with positive value is required");
}
}

public static void validateAdminOperationProtocolVersionRequest(String clusterName, long protocolVersion) {
if (StringUtils.isBlank(clusterName)) {
throw new IllegalArgumentException("Cluster name is required for updating admin operation protocol version");
}
if (protocolVersion == 0 || protocolVersion < -1) {
throw new IllegalArgumentException(
"Admin operation protocol version is required and must be -1 or greater than 0");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.venice.protocols.controller.ClusterAdminOpsGrpcServiceGrpc.ClusterAdminOpsGrpcServiceBlockingStub;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.VeniceControllerGrpcErrorInfo;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -191,4 +192,25 @@ public void testUpdateAdminTopicMetadataUnauthorized() {
errorInfo.getErrorMessage().contains(ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX),
"Actual error message: " + errorInfo.getErrorMessage());
}

@Test
public void testUpdateAdminOperationProtocolVersionSuccess() {
AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder =
AdminTopicGrpcMetadata.newBuilder().setClusterName(TEST_CLUSTER).setAdminOperationProtocolVersion(1L);
AdminTopicMetadataGrpcResponse response =
AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()).build();
doReturn(response).when(requestHandler)
.updateAdminOperationProtocolVersion(any(UpdateAdminOperationProtocolVersionGrpcRequest.class));
doReturn(true).when(accessManager).isAllowListUser(anyString(), any());

UpdateAdminOperationProtocolVersionGrpcRequest request = UpdateAdminOperationProtocolVersionGrpcRequest.newBuilder()
.setClusterName(TEST_CLUSTER)
.setAdminOperationProtocolVersion(1L)
.build();

AdminTopicMetadataGrpcResponse actualResponse = blockingStub.updateAdminOperationProtocolVersion(request);
assertNotNull(actualResponse);
assertEquals(actualResponse.getMetadata().getClusterName(), TEST_CLUSTER);
assertEquals(actualResponse.getMetadata().getAdminOperationProtocolVersion(), 1L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controllerapi.AdminTopicMetadataResponse;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.protocols.controller.AdminTopicGrpcMetadata;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.utils.ObjectMapperFactory;
import java.security.cert.X509Certificate;
Expand Down Expand Up @@ -158,8 +158,8 @@ public void testUpdateAdminTopicMetadataSuccess() throws Exception {

Route route =
new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminTopicMetadata(mockAdmin, requestHandler);
ControllerResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class);
AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, times(1)).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class));
assertEquals(responseObject.getCluster(), TEST_CLUSTER);
Expand Down Expand Up @@ -189,8 +189,8 @@ public void testUpdateAdminTopicMetadataHandlesUnauthorizedAccess() throws Excep
when(request.queryParams(STORE_NAME)).thenReturn(TEST_STORE);
Route route = new AdminTopicMetadataRoutes(false, Optional.of(accessController))
.updateAdminTopicMetadata(mockAdmin, requestHandler);
ControllerResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class);
AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, never()).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class));
assertNotNull(responseObject.getError());
Expand All @@ -212,8 +212,8 @@ public void testUpdateAdminTopicMetadataHandlesException() throws Exception {

Route route =
new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminTopicMetadata(mockAdmin, requestHandler);
ControllerResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class);
AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, times(1)).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class));
assertNotNull(responseObject.getError());
Expand All @@ -223,20 +223,33 @@ public void testUpdateAdminTopicMetadataHandlesException() throws Exception {
@Test
public void testUpdateAdminOperationProtocolVersion() throws Exception {
QueryParamsMap paramsMap = mock(QueryParamsMap.class);
String adminOperationProtocolVersion = "1";
long adminOperationProtocolVersion = 1L;
doReturn(new HashMap<>()).when(paramsMap).toMap();
doReturn(paramsMap).when(request).queryMap();

when(request.queryParams(CLUSTER)).thenReturn(TEST_CLUSTER);
when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).thenReturn(adminOperationProtocolVersion);
when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION))
.thenReturn(String.valueOf(adminOperationProtocolVersion));

Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminOperationProtocolVersion(mockAdmin);
AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder = AdminTopicGrpcMetadata.newBuilder()
.setClusterName(TEST_CLUSTER)
.setAdminOperationProtocolVersion(adminOperationProtocolVersion);
AdminTopicMetadataGrpcResponse grpcResponse =
AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()).build();

when(requestHandler.updateAdminOperationProtocolVersion(any(UpdateAdminOperationProtocolVersionGrpcRequest.class)))
.thenReturn(grpcResponse);

Route route = new AdminTopicMetadataRoutes(false, Optional.empty())
.updateAdminOperationProtocolVersion(mockAdmin, requestHandler);

AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, times(1))
.updateAdminOperationProtocolVersion(any(UpdateAdminOperationProtocolVersionGrpcRequest.class));
assertEquals(responseObject.getCluster(), TEST_CLUSTER);
assertEquals(responseObject.getAdminOperationProtocolVersion(), 1L);
assertEquals(responseObject.getAdminOperationProtocolVersion(), adminOperationProtocolVersion);
assertNull(responseObject.getError());
}

Expand All @@ -262,11 +275,12 @@ public void testUpdateAdminOperationProtocolVersionHandlesUnauthorizedAccess() t
when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).thenReturn(adminOperationProtocolVersion);

Route route = new AdminTopicMetadataRoutes(false, Optional.of(accessController))
.updateAdminOperationProtocolVersion(mockAdmin);
.updateAdminOperationProtocolVersion(mockAdmin, requestHandler);

AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, never()).updateAdminOperationProtocolVersion(any());
assertNotNull(responseObject.getError());
assertTrue(responseObject.getError().contains("Only admin users are allowed"));
}
Expand All @@ -279,11 +293,12 @@ public void testUpdateAdminOperationProtocolVersionHandlesMissingParams() throws

when(request.queryParams(CLUSTER)).thenReturn(null); // Missing cluster parameter

Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminOperationProtocolVersion(mockAdmin);
Route route = new AdminTopicMetadataRoutes(false, Optional.empty())
.updateAdminOperationProtocolVersion(mockAdmin, requestHandler);
AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, never()).getAdminTopicMetadata(any());
verify(requestHandler, never()).updateAdminOperationProtocolVersion(any());
assertNotNull(responseObject.getError());
assertTrue(responseObject.getError().contains("cluster_name is a required parameter"));
}
Expand Down
Loading
Loading