Skip to content

Commit

Permalink
#30669 fix extract jobQueueManagerAPI init and destroy
Browse files Browse the repository at this point in the history
  • Loading branch information
valentinogiardino committed Nov 19, 2024
1 parent 40311d9 commit c6e620f
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 113 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.dotcms.rest.api.v1;

import com.dotcms.jobs.business.api.JobProcessorScanner;
import com.dotcms.jobs.business.api.JobQueueManagerAPI;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.Queue;
import com.dotmarketing.util.Logger;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.lang.reflect.Constructor;
import java.util.List;

@ApplicationScoped
public class JobQueueManagerHelper {

private JobProcessorScanner scanner;

@Inject
public JobQueueManagerHelper(JobProcessorScanner scanner) {
this.scanner = scanner;
}

public JobQueueManagerHelper() {
}

public void registerProcessors(JobQueueManagerAPI jobQueueManagerAPI) {
if (!jobQueueManagerAPI.isStarted()) {
jobQueueManagerAPI.start();
Logger.info(this.getClass(), "JobQueueManagerAPI started");
}

List<Class<? extends JobProcessor>> processors = scanner.discoverJobProcessors();
processors.forEach(processor -> {
try {
if (!testInstantiation(processor)) {
return;
}
registerProcessor(jobQueueManagerAPI, processor);
} catch (Exception e) {
Logger.error(this.getClass(), "Unable to register JobProcessor ", e);
}
});
}

/**
* Test if a processor can be instantiated
* @param processor The processor to tested
* @return true if the processor can be instantiated, false otherwise
*/
private boolean testInstantiation(Class<? extends JobProcessor> processor) {
try {
Constructor<? extends JobProcessor> declaredConstructor = processor.getDeclaredConstructor();
declaredConstructor.newInstance();
return true;
} catch (Exception e) {
Logger.error(this.getClass(), String.format(" JobProcessor [%s] cannot be instantiated and will be ignored.", processor.getName()), e);
}
return false;
}

private void registerProcessor(JobQueueManagerAPI jobQueueManagerAPI, Class<? extends JobProcessor> processor) {
if (processor.isAnnotationPresent(Queue.class)) {
Queue queue = processor.getAnnotation(Queue.class);
jobQueueManagerAPI.registerProcessor(queue.value(), processor);
} else {
jobQueueManagerAPI.registerProcessor(processor.getName(), processor);
}
}

public void shutdown(JobQueueManagerAPI jobQueueManagerAPI) {
if (jobQueueManagerAPI.isStarted()) {
try {
jobQueueManagerAPI.close();
Logger.info(this.getClass(), "JobQueueManagerAPI successfully closed");
} catch (Exception e) {
Logger.error(this.getClass(), e.getMessage(), e);
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.dotcms.jobs.business.api.JobQueueManagerAPI;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.Queue;
import com.dotcms.rest.api.v1.JobQueueManagerHelper;
import com.dotcms.rest.api.v1.temp.DotTempFile;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.business.web.WebAPILocator;
Expand All @@ -29,73 +30,26 @@ public class ContentImportHelper {
private static final String CMD_PREVIEW = "preview";
private static final String CMD_PUBLISH = "publish";

JobQueueManagerAPI jobQueueManagerAPI;
JobProcessorScanner scanner;
private JobQueueManagerAPI jobQueueManagerAPI;
private JobQueueManagerHelper jobQueueManagerHelper;

@Inject
public ContentImportHelper(
JobQueueManagerAPI jobQueueManagerAPI,
JobProcessorScanner scanner) {
public ContentImportHelper(JobQueueManagerAPI jobQueueManagerAPI, JobQueueManagerHelper jobQueueManagerHelper) {
this.jobQueueManagerAPI = jobQueueManagerAPI;
this.scanner = scanner;
this.jobQueueManagerHelper = jobQueueManagerHelper;
}

public ContentImportHelper() {
}

@PostConstruct
public void onInit() {

if(!jobQueueManagerAPI.isStarted()){
jobQueueManagerAPI.start();
Logger.info(this.getClass(), "JobQueueManagerAPI started");
}
final List<Class<? extends JobProcessor>> processors = scanner.discoverJobProcessors();
processors.forEach(processor -> {
try {
if(!testInstantiation(processor)){
return;
}
//registering the processor with the jobQueueManagerAPI
// lower case it to avoid case
if(processor.isAnnotationPresent(Queue.class)){
final Queue queue = processor.getAnnotation(Queue.class);
jobQueueManagerAPI.registerProcessor(queue.value(), processor);
} else {
jobQueueManagerAPI.registerProcessor(processor.getName(), processor);
}
}catch (Exception e){
Logger.error(this.getClass(), "Unable to register JobProcessor ", e);
}
});
}

/**
* Test if a processor can be instantiated
* @param processor The processor to tested
* @return true if the processor can be instantiated, false otherwise
*/
private boolean testInstantiation(Class<? extends JobProcessor> processor) {
try {
final Constructor<? extends JobProcessor> declaredConstructor = processor.getDeclaredConstructor();
declaredConstructor.newInstance();
return true;
} catch (Exception e) {
Logger.error(this.getClass(), String.format(" JobProcessor [%s] can not be instantiated and will be ignored.",processor.getName()), e);
}
return false;
jobQueueManagerHelper.registerProcessors(jobQueueManagerAPI);
}

@PreDestroy
public void onDestroy() {
if(jobQueueManagerAPI.isStarted()){
try {
jobQueueManagerAPI.close();
Logger.info(this.getClass(), "JobQueueManagerAPI successfully closed");
} catch (Exception e) {
Logger.error(this.getClass(), e.getMessage(), e);
}
}
jobQueueManagerHelper.shutdown(jobQueueManagerAPI);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.Queue;
import com.dotcms.rest.api.v1.JobQueueManagerHelper;
import com.dotcms.rest.api.v1.temp.DotTempFile;
import com.dotcms.rest.api.v1.temp.TempFileAPI;
import com.dotmarketing.business.APILocator;
Expand Down Expand Up @@ -45,73 +46,17 @@
@ApplicationScoped
public class JobQueueHelper {

JobQueueManagerAPI jobQueueManagerAPI;

JobProcessorScanner scanner;
private JobQueueManagerAPI jobQueueManagerAPI;
private JobQueueManagerHelper jobQueueManagerHelper;

public JobQueueHelper() {
//default constructor Mandatory for CDI
}

@PostConstruct
public void onInit() {

if(!jobQueueManagerAPI.isStarted()){
jobQueueManagerAPI.start();
Logger.info(this.getClass(), "JobQueueManagerAPI started");
}
final List<Class<? extends JobProcessor>> processors = scanner.discoverJobProcessors();
processors.forEach(processor -> {
try {
if(!testInstantiation(processor)){
return;
}
//registering the processor with the jobQueueManagerAPI
// lower case it to avoid case
if(processor.isAnnotationPresent(Queue.class)){
final Queue queue = processor.getAnnotation(Queue.class);
jobQueueManagerAPI.registerProcessor(queue.value(), processor);
} else {
jobQueueManagerAPI.registerProcessor(processor.getName(), processor);
}
}catch (Exception e){
Logger.error(this.getClass(), "Unable to register JobProcessor ", e);
}
});
}

/**
* Test if a processor can be instantiated
* @param processor The processor to tested
* @return true if the processor can be instantiated, false otherwise
*/
private boolean testInstantiation(Class<? extends JobProcessor> processor) {
try {
final Constructor<? extends JobProcessor> declaredConstructor = processor.getDeclaredConstructor();
declaredConstructor.newInstance();
return true;
} catch (Exception e) {
Logger.error(this.getClass(), String.format(" JobProcessor [%s] can not be instantiated and will be ignored.",processor.getName()), e);
}
return false;
}

@PreDestroy
public void onDestroy() {
if(jobQueueManagerAPI.isStarted()){
try {
jobQueueManagerAPI.close();
Logger.info(this.getClass(), "JobQueueManagerAPI successfully closed");
} catch (Exception e) {
Logger.error(this.getClass(), e.getMessage(), e);
}
}
}

@Inject
public JobQueueHelper(JobQueueManagerAPI jobQueueManagerAPI, JobProcessorScanner scanner) {
public JobQueueHelper(JobQueueManagerAPI jobQueueManagerAPI, JobQueueManagerHelper jobQueueManagerHelper) {
this.jobQueueManagerAPI = jobQueueManagerAPI;
this.scanner = scanner;
this.jobQueueManagerHelper = jobQueueManagerHelper;
}

/**
Expand Down

0 comments on commit c6e620f

Please sign in to comment.