Skip to content

Commit

Permalink
#30669 refactor JobQueueManagerHelper
Browse files Browse the repository at this point in the history
  • Loading branch information
valentinogiardino committed Nov 21, 2024
1 parent 7a076c1 commit 6f4deeb
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,54 +17,49 @@
* Helper class for managing job queue processors in the JobQueueManagerAPI.
* This class is responsible for discovering job processors, registering them with
* the JobQueueManagerAPI, and shutting down the JobQueueManagerAPI when needed.
* <p>
* It utilizes the {@link JobProcessorScanner} to discover available job processors
* and the {@link JobQueueManagerAPI} to register them for processing jobs in the queue.
* <p>
* The class is annotated with {@link ApplicationScoped} to indicate that it is
* a singleton managed by the CDI container.
*/
@ApplicationScoped
public class JobQueueManagerHelper {

private JobQueueManagerAPI jobQueueManagerAPI;
private JobProcessorScanner scanner;

/**
* Constructor that injects the {@link JobProcessorScanner} instance.
* Constructor that injects the {@link JobProcessorScanner} and {@link JobQueueManagerAPI}.
*
* @param scanner The JobProcessorScanner to discover job processors
* @param jobQueueManagerAPI The JobQueueManagerAPI instance to register processors with
*/
@Inject
public JobQueueManagerHelper(final JobProcessorScanner scanner) {
public JobQueueManagerHelper(final JobProcessorScanner scanner, final JobQueueManagerAPI jobQueueManagerAPI) {
this.scanner = scanner;
this.jobQueueManagerAPI = jobQueueManagerAPI;
}

/**
* Default constructor required by CDI.
*/
public JobQueueManagerHelper() {
// Default constructor required by CDI
}

/**
* Registers all discovered job processors with the provided JobQueueManagerAPI.
* Registers all discovered job processors with the JobQueueManagerAPI.
* If the JobQueueManagerAPI is not started, it starts the API before registering the processors.
*
* @param jobQueueManagerAPI The JobQueueManagerAPI instance to register processors with
*/
public void registerProcessors(final JobQueueManagerAPI jobQueueManagerAPI) {
public void registerProcessors() {
if (!jobQueueManagerAPI.isStarted()) {
jobQueueManagerAPI.start();
Logger.info(this.getClass(), "JobQueueManagerAPI started");
}

// Discover job processors and attempt to register them
List<Class<? extends JobProcessor>> processors = scanner.discoverJobProcessors();
processors.forEach(processor -> {
try {
if (!testInstantiation(processor)) {
return;
}
registerProcessor(jobQueueManagerAPI, processor);
registerProcessor(processor);
} catch (Exception e) {
Logger.error(this.getClass(), "Unable to register JobProcessor ", e);
}
Expand Down Expand Up @@ -94,10 +89,9 @@ private boolean testInstantiation(final Class<? extends JobProcessor> processor)
* in the {@link Queue} annotation, if present. If no annotation is found, the processor's
* class name is used as the queue name.
*
* @param jobQueueManagerAPI the JobQueueManagerAPI instance to register the processor with
* @param processor the processor class to register
*/
private void registerProcessor(final JobQueueManagerAPI jobQueueManagerAPI, final Class<? extends JobProcessor> processor) {
private void registerProcessor(final Class<? extends JobProcessor> processor) {
Queue queue = AnnotationUtils.getBeanAnnotation(processor, Queue.class);
if (Objects.nonNull(queue)) {
jobQueueManagerAPI.registerProcessor(queue.value(), processor);
Expand All @@ -107,13 +101,11 @@ private void registerProcessor(final JobQueueManagerAPI jobQueueManagerAPI, fina
}

/**
* Shuts down the provided JobQueueManagerAPI if it is currently started.
* Shuts down the JobQueueManagerAPI if it is currently started.
* If the JobQueueManagerAPI is started, it attempts to close it gracefully.
* In case of an error during the shutdown process, the error is logged.
*
* @param jobQueueManagerAPI the JobQueueManagerAPI instance to shut down
*/
public void shutdown(final JobQueueManagerAPI jobQueueManagerAPI) {
public void shutdown() {
if (jobQueueManagerAPI.isStarted()) {
try {
jobQueueManagerAPI.close();
Expand All @@ -123,4 +115,4 @@ public void shutdown(final JobQueueManagerAPI jobQueueManagerAPI) {
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.dotcms.rest.api.v1.contentImport;

import com.dotcms.jobs.business.api.JobProcessorScanner;
import com.dotcms.jobs.business.api.JobQueueManagerAPI;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.Queue;
import com.dotcms.rest.api.v1.JobQueueManagerHelper;
import com.dotcms.rest.api.v1.temp.DotTempFile;
import com.dotmarketing.business.APILocator;
Expand All @@ -19,9 +16,7 @@
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@ApplicationScoped
Expand All @@ -42,12 +37,12 @@ public ContentImportHelper() {

@PostConstruct
public void onInit() {
jobQueueManagerHelper.registerProcessors(jobQueueManagerAPI);
jobQueueManagerHelper.registerProcessors();
}

@PreDestroy
public void onDestroy() {
jobQueueManagerHelper.shutdown(jobQueueManagerAPI);
jobQueueManagerHelper.shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import static com.dotcms.jobs.business.util.JobUtil.roundedProgress;

import com.dotcms.jobs.business.api.JobProcessorScanner;
import com.dotcms.jobs.business.api.JobQueueManagerAPI;
import com.dotcms.jobs.business.error.JobProcessorNotFoundException;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.Queue;
import com.dotcms.rest.api.v1.JobQueueManagerHelper;
import com.dotcms.rest.api.v1.temp.DotTempFile;
import com.dotcms.rest.api.v1.temp.TempFileAPI;
Expand All @@ -23,10 +21,8 @@
import com.liferay.portal.model.User;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
Expand Down Expand Up @@ -71,12 +67,12 @@ void registerProcessor(final String queueName, final Class<? extends JobProcesso

@PostConstruct
public void onInit() {
jobQueueManagerHelper.registerProcessors(jobQueueManagerAPI);
jobQueueManagerHelper.registerProcessors();
}

@PreDestroy
public void onDestroy() {
jobQueueManagerHelper.shutdown(jobQueueManagerAPI);
jobQueueManagerHelper.shutdown();
}

/**
Expand Down

0 comments on commit 6f4deeb

Please sign in to comment.