Skip to content

Commit

Permalink
Added DeleteComnmand as well as tested & fixed what we found. (#104)
Browse files Browse the repository at this point in the history
Co-authored-by: Ziad Othman <ziadsadek999@gmail.com>
Co-authored-by: Elshimaa Betah <79271594+ShimaaBetah@users.noreply.github.com>
  • Loading branch information
3 people authored May 18, 2024
1 parent 9fa0f03 commit c5062e0
Show file tree
Hide file tree
Showing 24 changed files with 1,198 additions and 346 deletions.
37 changes: 31 additions & 6 deletions controller/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,41 @@
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.workup</groupId>
<artifactId>contracts</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.workup</groupId>
<artifactId>payments</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.workup</groupId>
<artifactId>users</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
41 changes: 15 additions & 26 deletions controller/src/main/java/com/workup/controller/CLIHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import asg.cliche.CLIException;
import asg.cliche.Command;
import com.workup.shared.commands.controller.*;
import com.workup.shared.commands.jobs.requests.CreateJobRequest;
import com.workup.shared.enums.ControllerQueueNames;
import com.workup.shared.enums.ServiceQueueNames;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -57,6 +55,11 @@ public String maxdb(String app, int maxDBConn) {
return "Command Sent!";
}

@Command(description = "Adds a new command")
public String addCommand(String app, String commandName, String className) throws Exception {
return updateCommand(app, commandName, className);
}

@Command(description = "starts a specific app")
public String start(String app) {
app = app.toLowerCase();
Expand All @@ -78,12 +81,7 @@ public String freeze(String app) {
return "Command sent";
}

@Command(description = "stops a specific app")
public String setmq(String app, int appNum) {
return "setmq";
}

@Command(description = "stops a specific app")
@Command(description = "sets a logging level")
public String setLoggingLevel(String app, String level) {
app = app.toLowerCase();
if (!appQueueMap.containsKey(app)) {
Expand All @@ -96,17 +94,6 @@ public String setLoggingLevel(String app, String level) {
return "Command sent!!";
}

@Command(description = "test")
public void test() {
CreateJobRequest request = CreateJobRequest.builder().withTitle("Ziko").build();
rabbitTemplate.convertSendAndReceive(ServiceQueueNames.JOBS, request);
}

@Command(description = "Creates a new command")
public String addcommand(String app, String commandName, String className) {
return "Add command";
}

@Command(description = "Updates an existing command")
public String updateCommand(String app, String commandName, String className) throws Exception {
app = app.toLowerCase();
Expand All @@ -120,6 +107,7 @@ public String updateCommand(String app, String commandName, String className) th
"",
UpdateCommandRequest.builder()
.withCommandName(commandName)
.withClassName(className)
.withByteCode(byteArray)
.build());
} catch (Exception ex) {
Expand All @@ -142,12 +130,13 @@ private byte[] getByteCode(String commandName, String className)
}

@Command(description = "Deletes an existing command")
public String deletecommand(String app, String commandName, String className) {
return "Delete command";
}

@Command(description = "stops a specific app")
public String updateClass(String app, int appNum) {
return "update class";
public String deleteCommand(String app, String commandName) {
app = app.toLowerCase();
if (!appQueueMap.containsKey(app)) {
return "Error: app can only be jobs, users, contracts or payments!";
}
rabbitTemplate.convertAndSend(
appQueueMap.get(app), "", DeleteCommandRequest.builder().commandName(commandName).build());
return "Command sent";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.workup.shared.enums.ControllerQueueNames;
import com.workup.shared.enums.ServiceQueueNames;
import com.workup.shared.enums.ThreadPoolSize;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
Expand Down Expand Up @@ -79,8 +80,9 @@ public Binding fanoutBinding(FanoutExchange fanout, Queue controllerQueue) {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(50);
executor.setMaxPoolSize(50);
executor.setCorePoolSize(ThreadPoolSize.POOL_SIZE);
executor.setMaxPoolSize(ThreadPoolSize.POOL_SIZE);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("contracts-");
executor.initialize();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,138 @@
package com.workup.contracts;

import com.workup.contracts.commands.ContractCommand;
import com.workup.contracts.commands.ContractCommandMap;
import com.workup.shared.commands.Command;
import com.workup.shared.commands.CommandRequest;
import com.workup.shared.commands.CommandResponse;
import com.workup.shared.commands.controller.ContinueRequest;
import com.workup.shared.commands.controller.DeleteCommandRequest;
import com.workup.shared.commands.controller.FreezeRequest;
import com.workup.shared.commands.controller.SetLoggingLevelRequest;
import com.workup.shared.commands.controller.SetMaxThreadsRequest;
import java.lang.reflect.Field;
import com.workup.shared.commands.controller.UpdateCommandRequest;
import com.workup.shared.enums.ServiceQueueNames;
import com.workup.shared.enums.ThreadPoolSize;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Configurator;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = "#{controllerQueue.name}")
@RabbitListener(queues = "#{controllerQueue.name}", id = "#{controllerQueue.name}")
public class ControllerMQListener {

@Autowired public ContractCommandMap commandMap;
@Autowired public ThreadPoolTaskExecutor taskExecutor;

@Autowired private ApplicationContext context;
@Autowired private RabbitListenerEndpointRegistry registry;

@RabbitHandler
public void receive(SetMaxThreadsRequest in) throws Exception {
try {
ThreadPoolTaskExecutor myBean = context.getBean(ThreadPoolTaskExecutor.class);
Field maxPoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("maxPoolSize");
maxPoolSize.setAccessible(true);
maxPoolSize.set(myBean, in.getMaxThreads());
Field corePoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("corePoolSize");
corePoolSize.setAccessible(true);
corePoolSize.set(myBean, in.getMaxThreads());
System.out.println("Max threads is: " + taskExecutor.getMaxPoolSize());
setThreads(in.getMaxThreads());
ThreadPoolSize.POOL_SIZE = taskExecutor.getMaxPoolSize();
System.out.println("Max threads set to: " + taskExecutor.getMaxPoolSize());
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

@RabbitHandler
public void receive(SetLoggingLevelRequest in) throws Exception {
try {
Logger logger = LogManager.getLogger("com.workup.contracts");
Configurator.setAllLevels(logger.getName(), Level.valueOf(in.getLevel()));
System.out.println("Logging level set to: " + in.getLevel());
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

@RabbitHandler
public void receive(FreezeRequest in) throws Exception {
try {
registry.getListenerContainer(ServiceQueueNames.CONTRACTS).stop();
setThreads(1);
System.out.println("Stopped all threads.");
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

@RabbitHandler
public void receive(ContinueRequest in) throws Exception {
try {
registry.getListenerContainer(ServiceQueueNames.CONTRACTS).start();
setThreads(ThreadPoolSize.POOL_SIZE);
System.out.println("Continued all threads.");
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

@SuppressWarnings("unchecked")
@RabbitHandler
public void receive(UpdateCommandRequest in) throws Exception {
try {
byte[] byteArray = in.getByteCode();
Class<?> clazz =
(Class<?>)
(new MyClassLoader(this.getClass().getClassLoader())
.loadClass(byteArray, in.getClassName()));

commandMap.replaceCommand(
in.getCommandName(),
(Class<? extends ContractCommand<? extends CommandRequest, ? extends CommandResponse>>)
((Command<?, ?>) clazz.newInstance()).getClass());

System.out.println("Updated command: " + in.getCommandName());
// clazz.newInstance().Run(null);
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

@RabbitHandler
public void receive(DeleteCommandRequest in) throws Exception {
try {
commandMap.removeCommand(in.getCommandName());
System.out.println("Deleted command: " + in.getCommandName());
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

static class MyClassLoader extends ClassLoader {
public MyClassLoader(ClassLoader classLoader) {
super(classLoader);
}

public Class<?> loadClass(byte[] byteCode, String className) {
return defineClass(className, byteCode, 0, byteCode.length);
}
}

private void setThreads(int threads) throws NoSuchFieldException, IllegalAccessException {
if (threads > taskExecutor.getCorePoolSize()) {
taskExecutor.setMaxPoolSize(threads);
taskExecutor.setCorePoolSize(threads);
} else {
taskExecutor.setCorePoolSize(threads);
taskExecutor.setMaxPoolSize(threads);
}
}
}
Loading

0 comments on commit c5062e0

Please sign in to comment.