Skip to content

Commit

Permalink
Fixed Job processing
Browse files Browse the repository at this point in the history
Signed-off-by: coduz <alberto.codutti@eurotech.com>
  • Loading branch information
Coduz committed Apr 2, 2019
1 parent e651a78 commit 9bb3577
Show file tree
Hide file tree
Showing 14 changed files with 159 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
*******************************************************************************/
package org.eclipse.kapua.commons.model;

import org.eclipse.kapua.commons.util.PropertiesUtils;
import org.eclipse.kapua.entity.EntityPropertiesReadException;
import org.eclipse.kapua.entity.EntityPropertiesWriteException;
import org.eclipse.kapua.model.KapuaEntity;
import org.eclipse.kapua.model.KapuaUpdatableEntityCreator;
import org.eclipse.kapua.model.id.KapuaId;

import java.io.IOException;
import java.util.Properties;

/**
Expand All @@ -25,12 +29,12 @@
*/
public abstract class AbstractKapuaUpdatableEntityCreator<E extends KapuaEntity> extends AbstractKapuaEntityCreator<E> implements KapuaUpdatableEntityCreator<E> {

protected Properties entityAttributes;
protected String attributes;

/**
* Constructor
* Constructor.
*
* @param scopeId the scope {@link KapuaId}
* @param scopeId The scope {@link KapuaId} to set to in the {@link KapuaUpdatableEntityCreator}
* @since 1.0.0
*/
public AbstractKapuaUpdatableEntityCreator(KapuaId scopeId) {
Expand All @@ -39,11 +43,19 @@ public AbstractKapuaUpdatableEntityCreator(KapuaId scopeId) {

@Override
public Properties getEntityAttributes() {
return entityAttributes;
try {
return PropertiesUtils.readPropertiesFromString(attributes);
} catch (IOException e) {
throw new EntityPropertiesReadException(e, "attributes", attributes);
}
}

@Override
public void setEntityAttributes(Properties entityAttributes) {
this.entityAttributes = entityAttributes;
public void setEntityAttributes(Properties attributes) {
try {
this.attributes = PropertiesUtils.writePropertiesToString(attributes);
} catch (IOException e) {
throw new EntityPropertiesWriteException(e, "attributes", attributes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.kapua.commons.service.event.store.api.EventStoreXmlRegistry;
import org.eclipse.kapua.commons.util.xml.JAXBContextProvider;
import org.eclipse.kapua.event.ServiceEvent;
import org.eclipse.kapua.job.engine.commons.model.JobTargetSublist;
import org.eclipse.kapua.model.config.metatype.KapuaTad;
import org.eclipse.kapua.model.config.metatype.KapuaTicon;
import org.eclipse.kapua.model.config.metatype.KapuaTmetadata;
Expand Down Expand Up @@ -46,7 +47,6 @@
import org.eclipse.kapua.service.device.management.packages.model.install.DevicePackageInstallRequest;
import org.eclipse.kapua.service.device.management.packages.model.uninstall.DevicePackageUninstallRequest;
import org.eclipse.kapua.service.device.management.snapshot.DeviceSnapshots;
import org.eclipse.kapua.job.engine.commons.model.JobTargetSublist;
import org.eclipse.persistence.jaxb.JAXBContextFactory;

import javax.xml.bind.JAXBContext;
Expand All @@ -59,7 +59,7 @@ public class ConsoleJAXBContextProvider implements JAXBContextProvider {
public JAXBContext getJAXBContext() throws KapuaException {
try {
if (context == null) {
context = JAXBContextFactory.createContext(new Class<?>[] {
context = JAXBContextFactory.createContext(new Class<?>[]{
KuraDeviceComponentConfiguration.class,
KuraDeviceConfiguration.class,
KuraDeploymentPackage.class,
Expand Down Expand Up @@ -106,7 +106,7 @@ public JAXBContext getJAXBContext() throws KapuaException {
EventStoreRecordCreator.class,
EventStoreRecordListResult.class,
EventStoreRecordQuery.class,
EventStoreXmlRegistry.class
EventStoreXmlRegistry.class,

}, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.KapuaService;

import javax.validation.constraints.NotNull;

/**
* {@link JobEngineService} definition.
*
Expand All @@ -30,7 +32,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when starting the job
* @since 1.0.0
*/
void startJob(KapuaId scopeId, KapuaId jobId) throws KapuaException;
void startJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) throws KapuaException;

/**
* Starts the {@link org.eclipse.kapua.service.job.Job} with the given {@link JobStartOptions}.
Expand All @@ -41,7 +43,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when starting the job
* @since 1.0.0
*/
void startJob(KapuaId scopeId, KapuaId jobId, JobStartOptions jobStartOptions) throws KapuaException;
void startJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, @NotNull JobStartOptions jobStartOptions) throws KapuaException;

/**
* Checks whether or not the {@link org.eclipse.kapua.service.job.Job} is running.
Expand All @@ -52,7 +54,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when checking the status of the job
* @since 1.0.0
*/
boolean isRunning(KapuaId scopeId, KapuaId jobId) throws KapuaException;
boolean isRunning(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) throws KapuaException;

/**
* Stops all the running {@link org.eclipse.kapua.service.job.execution.JobExecution} of the {@link org.eclipse.kapua.service.job.Job}.
Expand All @@ -68,7 +70,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when checking the status of the job
* @since 1.0.0
*/
void stopJob(KapuaId scopeId, KapuaId jobId) throws KapuaException;
void stopJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) throws KapuaException;

/**
* Stops the {@link org.eclipse.kapua.service.job.execution.JobExecution}.
Expand All @@ -85,7 +87,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when checking the status of the job
* @since 1.1.0
*/
void stopJobExecution(KapuaId scopeId, KapuaId jobId, KapuaId jobExecutionId) throws KapuaException;
void stopJobExecution(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, @NotNull KapuaId jobExecutionId) throws KapuaException;

/**
* Resumes the {@link org.eclipse.kapua.service.job.execution.JobExecution}.
Expand All @@ -96,7 +98,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException If something goes bad when resuming the {@link org.eclipse.kapua.service.job.execution.JobExecution}
* @since 1.1.0
*/
void resumeJobExecution(KapuaId scopeId, KapuaId jobId, KapuaId jobExecutionId) throws KapuaException;
void resumeJobExecution(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, @NotNull KapuaId jobExecutionId) throws KapuaException;

/**
* Cleans all the Job related data from the data structures supporting the {@link JobEngineService}
Expand All @@ -106,5 +108,5 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when checking the status of the job
* @since 1.0.0
*/
void cleanJobData(KapuaId scopeId, KapuaId jobId) throws KapuaException;
void cleanJobData(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) throws KapuaException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,32 @@
package org.eclipse.kapua.job.engine.remote;

import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.commons.model.id.IdGenerator;
import org.eclipse.kapua.commons.util.ArgumentValidator;
import org.eclipse.kapua.job.engine.JobEngineService;
import org.eclipse.kapua.job.engine.JobStartOptions;
import org.eclipse.kapua.locator.KapuaLocator;
import org.eclipse.kapua.locator.KapuaProvider;
import org.eclipse.kapua.model.domain.Actions;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.model.id.KapuaIdFactory;
import org.eclipse.kapua.service.authorization.AuthorizationService;
import org.eclipse.kapua.service.authorization.permission.PermissionFactory;
import org.eclipse.kapua.service.job.JobDomains;
import org.eclipse.kapua.service.scheduler.quartz.utils.QuartzTriggerUtils;
import org.quartz.JobDataMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@KapuaProvider
public class JobEngineServiceRemote implements JobEngineService {

private static final Logger LOG = LoggerFactory.getLogger(JobEngineServiceRemote.class);

private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();

private static final KapuaIdFactory KAPUA_ID_FACTORY = LOCATOR.getFactory(KapuaIdFactory.class);

private static final AuthorizationService AUTHORIZATION_SERVICE = LOCATOR.getService(AuthorizationService.class);
private static final PermissionFactory PERMISSION_FACTORY = LOCATOR.getFactory(PermissionFactory.class);

Expand All @@ -53,17 +61,16 @@ public void startJob(KapuaId scopeId, KapuaId jobId, JobStartOptions jobStartOpt
//
// Seed the trigger
try {
LOG.info("Scheduling remote job start! ScopeId: {} - JobId: {} - JobOptions: {}", scopeId, jobId, jobStartOptions.getTargetIdSublist().size());

// Build job data map
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("scopeId", scopeId);
jobDataMap.put("jobId", jobId);

if (jobStartOptions != null) {
jobDataMap.put("jobStartOptions", jobStartOptions);
}
jobDataMap.put("jobStartOptions", jobStartOptions);

// Create the trigger
QuartzTriggerUtils.createQuartzTrigger(scopeId, jobId, jobDataMap);
QuartzTriggerUtils.createQuartzTrigger(scopeId, jobId, KAPUA_ID_FACTORY.newKapuaId(IdGenerator.generate()), jobDataMap);
} catch (Exception e) {
KapuaException.internalError(e, "Error!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public static void startJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, @N

//
// Retrieve job XML definition file. Delete it if exist
File jobXmlDefinitionFile = new File(jobTempDirectory, jobId.toCompactId().concat(".xml"));
File jobXmlDefinitionFile = new File(jobTempDirectory, jobId.toCompactId().concat("-").concat(String.valueOf(System.currentTimeMillis())).concat(".xml"));
if (jobXmlDefinitionFile.exists() && !jobXmlDefinitionFile.delete()) {
throw new CannotCleanJobDefFileDriverException(jobName, jobXmlDefinitionFile.getAbsolutePath());
}
Expand Down Expand Up @@ -254,10 +254,6 @@ public static void stopJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, Kap
try {
runningExecutions.forEach((runningExecution -> {
JOB_OPERATOR.stop(runningExecution.getExecutionId());

// if (JOB_ENGINE_SETTING.getBoolean(JobEngineSettingKeys.JOB_ENGINE_STOP_WAIT_CHECK)) {
// JbatchUtil.waitForStop(runningExecution, () -> JOB_OPERATOR.abandon(runningExecution.getExecutionId()));
// }
}));
} catch (NoSuchJobExecutionException e) {
throw new ExecutionNotFoundDriverException(e, jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionStatus;
import org.eclipse.kapua.locator.KapuaLocator;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.model.query.predicate.AttributePredicate;
import org.eclipse.kapua.service.job.Job;
import org.eclipse.kapua.service.job.execution.JobExecution;
import org.eclipse.kapua.service.job.execution.JobExecutionAttributes;
Expand All @@ -54,6 +55,7 @@
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Timer;

Expand Down Expand Up @@ -125,10 +127,10 @@ public void beforeJob() throws Exception {
jobLogger.info("Creating job execution...");
try {
jobExecution = createJobExecution(
jobContextWrapper.getScopeId(),
jobContextWrapper.getJobId(),
jobContextWrapper.getTargetSublist(),
jobContextWrapper.getExecutionId());
jobContextWrapper.getScopeId(),
jobContextWrapper.getJobId(),
jobContextWrapper.getTargetSublist(),
jobContextWrapper.getExecutionId());
} catch (Exception e) {
jobLogger.error(e, "Creating job execution... ERROR!");
throw e;
Expand All @@ -143,6 +145,7 @@ public void beforeJob() throws Exception {
JobExecution runningJobExecution = getAnotherJobExecutionRunning(
jobExecution.getScopeId(),
jobExecution.getJobId(),
jobExecution.getId(),
jobContextWrapper.getJobName(),
jobExecution.getTargetIds());

Expand Down Expand Up @@ -221,11 +224,15 @@ public void afterJob() throws Exception {
* @since 1.1.0
*/
private JobExecution createJobExecution(KapuaId scopeId, KapuaId jobId, JobTargetSublist jobTargetSublist, Long jBatchExecutionId) throws KapuaException {

Properties jobExecutionProperties = new Properties();
jobExecutionProperties.put(JBATCH_EXECUTION_ID, Long.toString(jBatchExecutionId));

JobExecutionCreator jobExecutionCreator = JOB_EXECUTION_FACTORY.newCreator(scopeId);

jobExecutionCreator.setJobId(jobId);
jobExecutionCreator.setStartedOn(new Date());
jobExecutionCreator.getEntityAttributes().put(JBATCH_EXECUTION_ID, Long.toString(jBatchExecutionId));
jobExecutionCreator.setEntityAttributes(jobExecutionProperties);

if (jobTargetSublist.isEmpty()) {
JobTargetQuery jobTargetQuery = JOB_TARGET_FACTORY.newQuery(scopeId);
Expand Down Expand Up @@ -262,15 +269,16 @@ private JobExecution createJobExecution(KapuaId scopeId, KapuaId jobId, JobTarge
* <p>
* In other all other cases returns {@code null}.
*
* @param scopeId The current {@link JobExecution#getScopeId()}
* @param jobId The current {@link JobExecution#getJobId()}
* @param jobName The current {@link JobContextWrapper#getJobName()}
* @param jobTargetIdSubset The current {@link JobExecution#getTargetIds()} }
* @return The current running {@link JobExecution} or {@code null} if there is no running {@link JobExecution}
* @param scopeId The current {@link JobExecution#getScopeId()}.
* @param jobId The current {@link JobExecution#getJobId()}.
* @param currentJobExecutionId The current {@link JobExecution#getId()}.
* @param jobName The current {@link JobContextWrapper#getJobName()}.
* @param jobTargetIdSubset The current {@link JobExecution#getTargetIds()} }.
* @return The other running {@link JobExecution} or {@code null} if there is no other running {@link JobExecution}.
* @throws KapuaException If any error happens during the processing.
* @since 1.1.0
*/
private JobExecution getAnotherJobExecutionRunning(KapuaId scopeId, KapuaId jobId, String jobName, Set<KapuaId> jobTargetIdSubset) throws KapuaException {
private JobExecution getAnotherJobExecutionRunning(KapuaId scopeId, KapuaId jobId, KapuaId currentJobExecutionId, String jobName, Set<KapuaId> jobTargetIdSubset) throws KapuaException {
List<Long> runningExecutionsIds = BatchRuntime.getJobOperator().getRunningExecutions(jobName);
if (runningExecutionsIds.size() > 1) {

Expand All @@ -279,6 +287,7 @@ private JobExecution getAnotherJobExecutionRunning(KapuaId scopeId, KapuaId jobI
jobExecutionQuery.setPredicate(
jobExecutionQuery.andPredicate(
jobExecutionQuery.attributePredicate(JobExecutionAttributes.JOB_ID, jobId),
jobExecutionQuery.attributePredicate(JobExecutionAttributes.ENTITY_ID, currentJobExecutionId, AttributePredicate.Operator.NOT_EQUAL),
jobExecutionQuery.attributePredicate(JobExecutionAttributes.ENDED_ON, null),
jobExecutionQuery.attributePredicate(JobExecutionAttributes.TARGET_IDS, jobTargetIdSubset.toArray())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ jobEngine.stop.wait.check.time.max=60000
jobEngine.stop.wait.check.time.interval=5000


jobEngine.queue.check.delay=4000
jobEngine.queue.processing.run.delay=500
jobEngine.queue.check.delay=1000
jobEngine.queue.processing.run.delay=700
Loading

0 comments on commit 9bb3577

Please sign in to comment.