From 0eee34636b376851e49f3ffd1913de44f4a562ef Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Thu, 13 Feb 2025 15:09:15 -0800 Subject: [PATCH 1/6] Add grpc endpoint for updateAdminOperationProtocolVersion --- .../ClusterAdminOpsGrpcServiceImpl.java | 14 +++++++++++ .../server/ClusterAdminOpsRequestHandler.java | 24 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImpl.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImpl.java index b248752e98..9fb84c9fd0 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImpl.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImpl.java @@ -15,6 +15,7 @@ import com.linkedin.venice.protocols.controller.ClusterAdminOpsGrpcServiceGrpc; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse; +import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; import io.grpc.Context; import io.grpc.stub.StreamObserver; @@ -88,4 +89,17 @@ public void updateAdminTopicMetadata( return requestHandler.updateAdminTopicMetadata(request); }, responseObserver, metadata.getClusterName(), metadata.hasStoreName() ? metadata.getStoreName() : null); } + + @Override + public void updateAdminOperationProtocolVersion( + UpdateAdminOperationProtocolVersionGrpcRequest request, + StreamObserver responseObserver) { + LOGGER.debug("Received updateAdminOperationProtocolVersion request: {}", request); + ControllerGrpcServerUtils.handleRequest( + ClusterAdminOpsGrpcServiceGrpc.getUpdateAdminOperationProtocolVersionMethod(), + () -> requestHandler.updateAdminOperationProtocolVersion(request), + responseObserver, + request.getClusterName(), + null); + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java index ece228b282..5942f39025 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java @@ -14,6 +14,7 @@ import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse; +import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; import com.linkedin.venice.utils.Pair; import java.util.Map; @@ -148,4 +149,27 @@ public AdminTopicMetadataGrpcResponse updateAdminTopicMetadata(UpdateAdminTopicM AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()); return responseBuilder.build(); } + + public AdminTopicMetadataGrpcResponse updateAdminOperationProtocolVersion( + UpdateAdminOperationProtocolVersionGrpcRequest request) { + String clusterName = request.getClusterName(); + if (StringUtils.isBlank(clusterName)) { + throw new IllegalArgumentException("Cluster name is required for updating admin operation protocol version"); + } + + long adminOperationProtocolVersion = request.getAdminOperationProtocolVersion(); + + LOGGER.info( + "Updating admin operation protocol version for cluster: {} to version: {}", + clusterName, + adminOperationProtocolVersion); + + // TODO: Call the actual method to update the admin operation protocol version (#1418) + // admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); + + AdminTopicGrpcMetadata.Builder adminMetadataBuilder = AdminTopicGrpcMetadata.newBuilder() + .setClusterName(clusterName) + .setAdminOperationProtocolVersion(adminOperationProtocolVersion); + return AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminMetadataBuilder.build()).build(); + } } From 84a7f83db17bc9b0b17d82acee0e6fd64f8485cd Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Fri, 14 Feb 2025 11:13:04 -0800 Subject: [PATCH 2/6] Enable the actual call inside request handler --- .../controller/server/ClusterAdminOpsRequestHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java index 5942f39025..306ad08042 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java @@ -164,8 +164,7 @@ public AdminTopicMetadataGrpcResponse updateAdminOperationProtocolVersion( clusterName, adminOperationProtocolVersion); - // TODO: Call the actual method to update the admin operation protocol version (#1418) - // admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); + admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); AdminTopicGrpcMetadata.Builder adminMetadataBuilder = AdminTopicGrpcMetadata.newBuilder() .setClusterName(clusterName) From 43b7b12dc2d9ff80098adb66930c5000931032eb Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Fri, 14 Feb 2025 11:52:12 -0800 Subject: [PATCH 3/6] Modify the spark server to accept the request handler --- .../java/com/linkedin/venice/AdminTool.java | 3 +- .../controllerapi/ControllerClient.java | 8 ++-- .../controller/server/AdminSparkServer.java | 3 +- .../server/AdminTopicMetadataRoutes.java | 15 ++++--- .../server/AdminTopicMetadataRoutesTest.java | 43 +++++++++++++------ 5 files changed, 47 insertions(+), 25 deletions(-) diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index e1bca6e6eb..db94a41825 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -3279,7 +3279,8 @@ private static void updateAdminOperationProtocolVersion(CommandLine cmd) throws getRequiredArgument(cmd, Arg.ADMIN_OPERATION_PROTOCOL_VERSION, Command.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION); long protocolVersion = Utils.parseLongFromString(protocolVersionInString, Arg.ADMIN_OPERATION_PROTOCOL_VERSION.name()); - ControllerResponse response = controllerClient.updateAdminOperationProtocolVersion(clusterName, protocolVersion); + AdminTopicMetadataResponse response = + controllerClient.updateAdminOperationProtocolVersion(clusterName, protocolVersion); printObject(response); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java index bc49f285c9..63001dd832 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java @@ -1351,7 +1351,7 @@ public AdminTopicMetadataResponse getAdminTopicMetadata(Optional storeNa return request(ControllerRoute.GET_ADMIN_TOPIC_METADATA, params, AdminTopicMetadataResponse.class); } - public ControllerResponse updateAdminTopicMetadata( + public AdminTopicMetadataResponse updateAdminTopicMetadata( long executionId, Optional storeName, Optional offset, @@ -1360,15 +1360,15 @@ public ControllerResponse updateAdminTopicMetadata( .add(NAME, storeName) .add(OFFSET, offset) .add(UPSTREAM_OFFSET, upstreamOffset); - return request(ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA, params, ControllerResponse.class); + return request(ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA, params, AdminTopicMetadataResponse.class); } - public ControllerResponse updateAdminOperationProtocolVersion( + public AdminTopicMetadataResponse updateAdminOperationProtocolVersion( String clusterName, Long adminOperationProtocolVersion) { QueryParams params = newParams().add(CLUSTER, clusterName).add(ADMIN_OPERATION_PROTOCOL_VERSION, adminOperationProtocolVersion); - return request(ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION, params, ControllerResponse.class); + return request(ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION, params, AdminTopicMetadataResponse.class); } public ControllerResponse deleteKafkaTopic(String topicName) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java index 8b3d8d58ff..ed0b37f506 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java @@ -641,7 +641,8 @@ public boolean startInner() throws Exception { UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getPath(), new VeniceParentControllerRegionStateHandler( admin, - adminTopicMetadataRoutes.updateAdminOperationProtocolVersion(admin))); + adminTopicMetadataRoutes + .updateAdminOperationProtocolVersion(admin, requestHandler.getClusterAdminOpsRequestHandler()))); httpService.post( DELETE_KAFKA_TOPIC.getPath(), new VeniceParentControllerRegionStateHandler(admin, storesRoutes.deleteKafkaTopic(admin))); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java index 12e9fe2e37..159447090b 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java @@ -20,6 +20,7 @@ import com.linkedin.venice.protocols.controller.AdminTopicGrpcMetadata; import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest; import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse; +import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; import java.util.Optional; import org.apache.http.HttpStatus; @@ -105,7 +106,7 @@ public Route updateAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler }; } - public Route updateAdminOperationProtocolVersion(Admin admin) { + public Route updateAdminOperationProtocolVersion(Admin admin, ClusterAdminOpsRequestHandler requestHandler) { return (request, response) -> { AdminTopicMetadataResponse responseObject = new AdminTopicMetadataResponse(); response.type(HttpConstants.JSON); @@ -120,11 +121,15 @@ public Route updateAdminOperationProtocolVersion(Admin admin) { AdminSparkServer.validateParams(request, UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getParams(), admin); String clusterName = request.queryParams(CLUSTER); Long adminOperationProtocolVersion = Long.parseLong(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)); + AdminTopicMetadataGrpcResponse internalResponse = requestHandler.updateAdminOperationProtocolVersion( + UpdateAdminOperationProtocolVersionGrpcRequest.newBuilder() + .setClusterName(clusterName) + .setAdminOperationProtocolVersion(adminOperationProtocolVersion) + .build()); - responseObject.setCluster(clusterName); - responseObject.setAdminOperationProtocolVersion(adminOperationProtocolVersion); - - admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); + responseObject.setCluster(internalResponse.getMetadata().getClusterName()); + responseObject + .setAdminOperationProtocolVersion(internalResponse.getMetadata().getAdminOperationProtocolVersion()); } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(new VeniceException(e), request, response); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java index f8dce51224..cd220d0a45 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java @@ -21,10 +21,10 @@ import com.linkedin.venice.acl.DynamicAccessController; import com.linkedin.venice.controller.Admin; import com.linkedin.venice.controllerapi.AdminTopicMetadataResponse; -import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.protocols.controller.AdminTopicGrpcMetadata; import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest; import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse; +import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; import com.linkedin.venice.utils.ObjectMapperFactory; import java.security.cert.X509Certificate; @@ -158,8 +158,8 @@ public void testUpdateAdminTopicMetadataSuccess() throws Exception { Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminTopicMetadata(mockAdmin, requestHandler); - ControllerResponse responseObject = - OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class); + AdminTopicMetadataResponse responseObject = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); verify(requestHandler, times(1)).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class)); assertEquals(responseObject.getCluster(), TEST_CLUSTER); @@ -189,8 +189,8 @@ public void testUpdateAdminTopicMetadataHandlesUnauthorizedAccess() throws Excep when(request.queryParams(STORE_NAME)).thenReturn(TEST_STORE); Route route = new AdminTopicMetadataRoutes(false, Optional.of(accessController)) .updateAdminTopicMetadata(mockAdmin, requestHandler); - ControllerResponse responseObject = - OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class); + AdminTopicMetadataResponse responseObject = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); verify(requestHandler, never()).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class)); assertNotNull(responseObject.getError()); @@ -212,8 +212,8 @@ public void testUpdateAdminTopicMetadataHandlesException() throws Exception { Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminTopicMetadata(mockAdmin, requestHandler); - ControllerResponse responseObject = - OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class); + AdminTopicMetadataResponse responseObject = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); verify(requestHandler, times(1)).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class)); assertNotNull(responseObject.getError()); @@ -223,20 +223,33 @@ public void testUpdateAdminTopicMetadataHandlesException() throws Exception { @Test public void testUpdateAdminOperationProtocolVersion() throws Exception { QueryParamsMap paramsMap = mock(QueryParamsMap.class); - String adminOperationProtocolVersion = "1"; + long adminOperationProtocolVersion = 1L; doReturn(new HashMap<>()).when(paramsMap).toMap(); doReturn(paramsMap).when(request).queryMap(); when(request.queryParams(CLUSTER)).thenReturn(TEST_CLUSTER); - when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).thenReturn(adminOperationProtocolVersion); + when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)) + .thenReturn(String.valueOf(adminOperationProtocolVersion)); - Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminOperationProtocolVersion(mockAdmin); + AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder = AdminTopicGrpcMetadata.newBuilder() + .setClusterName(TEST_CLUSTER) + .setAdminOperationProtocolVersion(adminOperationProtocolVersion); + AdminTopicMetadataGrpcResponse grpcResponse = + AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()).build(); + + when(requestHandler.updateAdminOperationProtocolVersion(any(UpdateAdminOperationProtocolVersionGrpcRequest.class))) + .thenReturn(grpcResponse); + + Route route = new AdminTopicMetadataRoutes(false, Optional.empty()) + .updateAdminOperationProtocolVersion(mockAdmin, requestHandler); AdminTopicMetadataResponse responseObject = OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); + verify(requestHandler, times(1)) + .updateAdminOperationProtocolVersion(any(UpdateAdminOperationProtocolVersionGrpcRequest.class)); assertEquals(responseObject.getCluster(), TEST_CLUSTER); - assertEquals(responseObject.getAdminOperationProtocolVersion(), 1L); + assertEquals(responseObject.getAdminOperationProtocolVersion(), adminOperationProtocolVersion); assertNull(responseObject.getError()); } @@ -262,11 +275,12 @@ public void testUpdateAdminOperationProtocolVersionHandlesUnauthorizedAccess() t when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).thenReturn(adminOperationProtocolVersion); Route route = new AdminTopicMetadataRoutes(false, Optional.of(accessController)) - .updateAdminOperationProtocolVersion(mockAdmin); + .updateAdminOperationProtocolVersion(mockAdmin, requestHandler); AdminTopicMetadataResponse responseObject = OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); + verify(requestHandler, never()).updateAdminOperationProtocolVersion(any()); assertNotNull(responseObject.getError()); assertTrue(responseObject.getError().contains("Only admin users are allowed")); } @@ -279,11 +293,12 @@ public void testUpdateAdminOperationProtocolVersionHandlesMissingParams() throws when(request.queryParams(CLUSTER)).thenReturn(null); // Missing cluster parameter - Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminOperationProtocolVersion(mockAdmin); + Route route = new AdminTopicMetadataRoutes(false, Optional.empty()) + .updateAdminOperationProtocolVersion(mockAdmin, requestHandler); AdminTopicMetadataResponse responseObject = OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); - verify(requestHandler, never()).getAdminTopicMetadata(any()); + verify(requestHandler, never()).updateAdminOperationProtocolVersion(any()); assertNotNull(responseObject.getError()); assertTrue(responseObject.getError().contains("cluster_name is a required parameter")); } From fa932b244d6205fc89997b76e809149445226690 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Fri, 14 Feb 2025 12:25:41 -0800 Subject: [PATCH 4/6] Add tests for handler --- .../server/ClusterAdminOpsRequestHandler.java | 4 ++ .../ClusterAdminOpsRequestHandlerTest.java | 45 +++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java index 306ad08042..4788ebbb4f 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java @@ -158,6 +158,10 @@ public AdminTopicMetadataGrpcResponse updateAdminOperationProtocolVersion( } long adminOperationProtocolVersion = request.getAdminOperationProtocolVersion(); + if (adminOperationProtocolVersion == 0 || adminOperationProtocolVersion < -1) { + throw new IllegalArgumentException( + "Admin operation protocol version is required and must be -1 or greater than 0"); + } LOGGER.info( "Updating admin operation protocol version for cluster: {} to version: {}", diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandlerTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandlerTest.java index 0116344150..0570522268 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandlerTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandlerTest.java @@ -23,6 +23,7 @@ import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse; +import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; import java.util.Collections; import java.util.HashMap; @@ -265,4 +266,48 @@ public void testUpdateAdminTopicMetadataInvalidInputs() { exception.getMessage().contains("Updating offsets is not allowed for store-level admin topic metadata"), "Actual message: " + exception.getMessage()); } + + @Test + public void testUpdateAdminOperationProtocolVersionSuccess() { + String clusterName = "test-cluster"; + long version = 12345L; + UpdateAdminOperationProtocolVersionGrpcRequest request = UpdateAdminOperationProtocolVersionGrpcRequest.newBuilder() + .setClusterName(clusterName) + .setAdminOperationProtocolVersion(version) + .build(); + AdminTopicMetadataGrpcResponse response = handler.updateAdminOperationProtocolVersion(request); + + assertNotNull(response); + assertEquals(response.getMetadata().getClusterName(), clusterName); + assertEquals(response.getMetadata().getAdminOperationProtocolVersion(), version); + } + + @Test + public void testUpdateAdminOperationProtocolVersionInvalidInputs() { + String clusterName = "test-cluster"; + long version = 12345L; + + // No cluster name + UpdateAdminOperationProtocolVersionGrpcRequest request1 = + UpdateAdminOperationProtocolVersionGrpcRequest.newBuilder() + .setClusterName("") + .setAdminOperationProtocolVersion(version) + .build(); + Exception exception = + expectThrows(IllegalArgumentException.class, () -> handler.updateAdminOperationProtocolVersion(request1)); + assertTrue( + exception.getMessage().contains("Cluster name is required for updating admin operation protocol version")); + + // Invalid version + UpdateAdminOperationProtocolVersionGrpcRequest request2 = + UpdateAdminOperationProtocolVersionGrpcRequest.newBuilder() + .setClusterName(clusterName) + .setAdminOperationProtocolVersion(0) + .build(); + exception = + expectThrows(IllegalArgumentException.class, () -> handler.updateAdminOperationProtocolVersion(request2)); + assertTrue( + exception.getMessage() + .contains("Admin operation protocol version is required and must be -1 or greater than 0")); + } } From 737547b01822de8bfd4fc524a6552f37ad079559 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Fri, 14 Feb 2025 15:33:31 -0800 Subject: [PATCH 5/6] Cut the validation into validator class --- .../server/ClusterAdminOpsRequestHandler.java | 10 ++-------- .../server/ControllerRequestParamValidator.java | 10 ++++++++++ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java index 4788ebbb4f..24945c04e1 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java @@ -153,15 +153,9 @@ public AdminTopicMetadataGrpcResponse updateAdminTopicMetadata(UpdateAdminTopicM public AdminTopicMetadataGrpcResponse updateAdminOperationProtocolVersion( UpdateAdminOperationProtocolVersionGrpcRequest request) { String clusterName = request.getClusterName(); - if (StringUtils.isBlank(clusterName)) { - throw new IllegalArgumentException("Cluster name is required for updating admin operation protocol version"); - } - long adminOperationProtocolVersion = request.getAdminOperationProtocolVersion(); - if (adminOperationProtocolVersion == 0 || adminOperationProtocolVersion < -1) { - throw new IllegalArgumentException( - "Admin operation protocol version is required and must be -1 or greater than 0"); - } + ControllerRequestParamValidator + .validateAdminOperationProtocolVersionRequest(clusterName, adminOperationProtocolVersion); LOGGER.info( "Updating admin operation protocol version for cluster: {} to version: {}", diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java index 6f2864e9f9..4f4d2219fa 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java @@ -45,4 +45,14 @@ public static void validateAdminCommandExecutionRequest(String clusterName, long throw new IllegalArgumentException("Admin command execution id with positive value is required"); } } + + public static void validateAdminOperationProtocolVersionRequest(String clusterName, long protocolVersion) { + if (StringUtils.isBlank(clusterName)) { + throw new IllegalArgumentException("Cluster name is required for updating admin operation protocol version"); + } + if (protocolVersion == 0 || protocolVersion < -1) { + throw new IllegalArgumentException( + "Admin operation protocol version is required and must be -1 or greater than 0"); + } + } } From c5ab1215a7e45b40b6b031d78d71ecdd58e80224 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Fri, 14 Feb 2025 15:55:03 -0800 Subject: [PATCH 6/6] Add test for ClusterAdminOpsGrpcServiceImplTest --- .../ClusterAdminOpsGrpcServiceImplTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImplTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImplTest.java index dcfe281f51..fd40bcd758 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImplTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ClusterAdminOpsGrpcServiceImplTest.java @@ -25,6 +25,7 @@ import com.linkedin.venice.protocols.controller.ClusterAdminOpsGrpcServiceGrpc.ClusterAdminOpsGrpcServiceBlockingStub; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest; import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse; +import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest; import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest; import com.linkedin.venice.protocols.controller.VeniceControllerGrpcErrorInfo; import io.grpc.ManagedChannel; @@ -191,4 +192,25 @@ public void testUpdateAdminTopicMetadataUnauthorized() { errorInfo.getErrorMessage().contains(ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX), "Actual error message: " + errorInfo.getErrorMessage()); } + + @Test + public void testUpdateAdminOperationProtocolVersionSuccess() { + AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder = + AdminTopicGrpcMetadata.newBuilder().setClusterName(TEST_CLUSTER).setAdminOperationProtocolVersion(1L); + AdminTopicMetadataGrpcResponse response = + AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()).build(); + doReturn(response).when(requestHandler) + .updateAdminOperationProtocolVersion(any(UpdateAdminOperationProtocolVersionGrpcRequest.class)); + doReturn(true).when(accessManager).isAllowListUser(anyString(), any()); + + UpdateAdminOperationProtocolVersionGrpcRequest request = UpdateAdminOperationProtocolVersionGrpcRequest.newBuilder() + .setClusterName(TEST_CLUSTER) + .setAdminOperationProtocolVersion(1L) + .build(); + + AdminTopicMetadataGrpcResponse actualResponse = blockingStub.updateAdminOperationProtocolVersion(request); + assertNotNull(actualResponse); + assertEquals(actualResponse.getMetadata().getClusterName(), TEST_CLUSTER); + assertEquals(actualResponse.getMetadata().getAdminOperationProtocolVersion(), 1L); + } }