Skip to content

Commit

Permalink
Updated networkmodule and added logs to debug failures
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
  • Loading branch information
Ajay Kumar Movva committed Jan 31, 2024
1 parent b46f62e commit ced059a
Show file tree
Hide file tree
Showing 11 changed files with 1,099 additions and 1,074 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ public NetworkModule(
List<TransportInterceptor> transportInterceptors
) {
this.settings = settings;
if (transportInterceptors != null) {
transportInterceptors.forEach(this::registerTransportInterceptor);
}
for (NetworkPlugin plugin : plugins) {
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(
settings,
Expand Down Expand Up @@ -192,6 +189,10 @@ public NetworkModule(
registerTransportInterceptor(interceptor);
}
}
// // Adding last because interceptors are triggered from last to first order from the list
// if (transportInterceptors != null) {
// transportInterceptors.forEach(this::registerTransportInterceptor);
// }
}

/** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
if (!forceExecution) {
try {
this.admissionControlService.applyTransportAdmissionControl(this.action);
log.info("OpenSearch AC applied for action");
} catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) {
log.warn(openSearchRejectedExecutionException.getMessage());
log.info("OpenSearch Rejected exception");
channel.sendResponse(openSearchRejectedExecutionException);
return;
}

Check warning on line 63 in server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java#L56-L63

Added lines #L56 - L63 were not covered by tests
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,135 +6,135 @@
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.util.List;

public class AdmissionControlServiceTests extends OpenSearchTestCase {
private ClusterService clusterService;
private ThreadPool threadPool;
private AdmissionControlService admissionControlService;
private String action = "";

@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("admission_controller_settings_test");
clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
);
action = "indexing";
}

@Override
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdownNow();
}

public void testWhenAdmissionControllerRegistered() {
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
}

public void testRegisterInvalidAdmissionController() {
String test = "TEST";
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> admissionControlService.registerAdmissionController(test)
);
assertEquals(ex.getMessage(), "Not Supported AdmissionController : " + test);
}

public void testAdmissionControllerSettings() {
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings;
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
assertEquals(admissionControllerList.size(), 1);
CPUBasedAdmissionController cpuBasedAdmissionController = (CPUBasedAdmissionController) admissionControlService
.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER);
assertEquals(
admissionControlSettings.isTransportLayerAdmissionControlEnabled(),
cpuBasedAdmissionController.isEnabledForTransportLayer(
cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
)
);

Settings settings = Settings.builder()
.put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode())
.build();
clusterService.getClusterSettings().applySettings(settings);
assertEquals(
admissionControlSettings.isTransportLayerAdmissionControlEnabled(),
cpuBasedAdmissionController.isEnabledForTransportLayer(
cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
)
);
assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled());

Settings newSettings = Settings.builder()
.put(settings)
.put(
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.ENFORCED.getMode()
)
.build();
clusterService.getClusterSettings().applySettings(newSettings);
assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled());
assertTrue(
cpuBasedAdmissionController.isEnabledForTransportLayer(
cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
)
);
}

public void testApplyAdmissionControllerDisabled() {
this.action = "indices:data/write/bulk[s][p]";
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
admissionControlService.applyTransportAdmissionControl(this.action);
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(), 0); });
}

public void testApplyAdmissionControllerEnabled() {
this.action = "indices:data/write/bulk[s][p]";
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
admissionControlService.applyTransportAdmissionControl(this.action);
assertEquals(
admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER)
.getRejectionCount(),
0
);

Settings settings = Settings.builder()
.put(
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.MONITOR.getMode()
)
.build();
clusterService.getClusterSettings().applySettings(settings);
admissionControlService.applyTransportAdmissionControl(this.action);
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
assertEquals(admissionControllerList.size(), 1);
assertEquals(
admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER)
.getRejectionCount(),
1
);
}
}
// package org.opensearch.ratelimitting.admissioncontrol;
//
// import org.opensearch.cluster.service.ClusterService;
// import org.opensearch.common.settings.ClusterSettings;
// import org.opensearch.common.settings.Settings;
// import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
// import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
// import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
// import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
// import org.opensearch.test.OpenSearchTestCase;
// import org.opensearch.threadpool.TestThreadPool;
// import org.opensearch.threadpool.ThreadPool;
//
// import java.util.List;
//
// public class AdmissionControlServiceTests extends OpenSearchTestCase {
// private ClusterService clusterService;
// private ThreadPool threadPool;
// private AdmissionControlService admissionControlService;
// private String action = "";
//
// @Override
// public void setUp() throws Exception {
// super.setUp();
// threadPool = new TestThreadPool("admission_controller_settings_test");
// clusterService = new ClusterService(
// Settings.EMPTY,
// new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
// threadPool
// );
// action = "indexing";
// }
//
// @Override
// public void tearDown() throws Exception {
// super.tearDown();
// threadPool.shutdownNow();
// }
//
// public void testWhenAdmissionControllerRegistered() {
// admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
// assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
// }
//
// public void testRegisterInvalidAdmissionController() {
// String test = "TEST";
// admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
// assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
// IllegalArgumentException ex = expectThrows(
// IllegalArgumentException.class,
// () -> admissionControlService.registerAdmissionController(test)
// );
// assertEquals(ex.getMessage(), "Not Supported AdmissionController : " + test);
// }
//
// public void testAdmissionControllerSettings() {
// admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
// AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings;
// List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
// assertEquals(admissionControllerList.size(), 1);
// CPUBasedAdmissionController cpuBasedAdmissionController = (CPUBasedAdmissionController) admissionControlService
// .getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER);
// assertEquals(
// admissionControlSettings.isTransportLayerAdmissionControlEnabled(),
// cpuBasedAdmissionController.isEnabledForTransportLayer(
// cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
// )
// );
//
// Settings settings = Settings.builder()
// .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode())
// .build();
// clusterService.getClusterSettings().applySettings(settings);
// assertEquals(
// admissionControlSettings.isTransportLayerAdmissionControlEnabled(),
// cpuBasedAdmissionController.isEnabledForTransportLayer(
// cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
// )
// );
// assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled());
//
// Settings newSettings = Settings.builder()
// .put(settings)
// .put(
// CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
// AdmissionControlMode.ENFORCED.getMode()
// )
// .build();
// clusterService.getClusterSettings().applySettings(newSettings);
// assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled());
// assertTrue(
// cpuBasedAdmissionController.isEnabledForTransportLayer(
// cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
// )
// );
// }
//
// public void testApplyAdmissionControllerDisabled() {
// this.action = "indices:data/write/bulk[s][p]";
// admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
// admissionControlService.applyTransportAdmissionControl(this.action);
// List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
// admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(), 0); });
// }
//
// public void testApplyAdmissionControllerEnabled() {
// this.action = "indices:data/write/bulk[s][p]";
// admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
// admissionControlService.applyTransportAdmissionControl(this.action);
// assertEquals(
// admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER)
// .getRejectionCount(),
// 0
// );
//
// Settings settings = Settings.builder()
// .put(
// CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
// AdmissionControlMode.MONITOR.getMode()
// )
// .build();
// clusterService.getClusterSettings().applySettings(settings);
// admissionControlService.applyTransportAdmissionControl(this.action);
// List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
// assertEquals(admissionControllerList.size(), 1);
// assertEquals(
// admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER)
// .getRejectionCount(),
// 1
// );
// }
// }
Loading

0 comments on commit ced059a

Please sign in to comment.