Skip to content

Commit

Permalink
Fixed Job processing
Browse files Browse the repository at this point in the history
Signed-off-by: coduz <alberto.codutti@eurotech.com>
  • Loading branch information
Coduz committed Mar 27, 2019
1 parent 52bfc37 commit 0f676a4
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
*******************************************************************************/
package org.eclipse.kapua.commons.model;

import org.eclipse.kapua.commons.util.PropertiesUtils;
import org.eclipse.kapua.entity.EntityPropertiesReadException;
import org.eclipse.kapua.entity.EntityPropertiesWriteException;
import org.eclipse.kapua.model.KapuaEntity;
import org.eclipse.kapua.model.KapuaUpdatableEntityCreator;
import org.eclipse.kapua.model.id.KapuaId;

import java.io.IOException;
import java.util.Properties;

/**
Expand All @@ -25,12 +29,12 @@
*/
public abstract class AbstractKapuaUpdatableEntityCreator<E extends KapuaEntity> extends AbstractKapuaEntityCreator<E> implements KapuaUpdatableEntityCreator<E> {

protected Properties entityAttributes;
protected String attributes;

/**
* Constructor
* Constructor.
*
* @param scopeId the scope {@link KapuaId}
* @param scopeId The scope {@link KapuaId} to set to in the {@link KapuaUpdatableEntityCreator}
* @since 1.0.0
*/
public AbstractKapuaUpdatableEntityCreator(KapuaId scopeId) {
Expand All @@ -39,11 +43,19 @@ public AbstractKapuaUpdatableEntityCreator(KapuaId scopeId) {

@Override
public Properties getEntityAttributes() {
return entityAttributes;
try {
return PropertiesUtils.readPropertiesFromString(attributes);
} catch (IOException e) {
throw new EntityPropertiesReadException(e, "attributes", attributes);
}
}

@Override
public void setEntityAttributes(Properties entityAttributes) {
this.entityAttributes = entityAttributes;
public void setEntityAttributes(Properties attributes) {
try {
this.attributes = PropertiesUtils.writePropertiesToString(attributes);
} catch (IOException e) {
throw new EntityPropertiesWriteException(e, "attributes", attributes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.kapua.commons.service.event.store.api.EventStoreXmlRegistry;
import org.eclipse.kapua.commons.util.xml.JAXBContextProvider;
import org.eclipse.kapua.event.ServiceEvent;
import org.eclipse.kapua.job.engine.commons.model.JobTargetSublist;
import org.eclipse.kapua.model.config.metatype.KapuaTad;
import org.eclipse.kapua.model.config.metatype.KapuaTicon;
import org.eclipse.kapua.model.config.metatype.KapuaTmetadata;
Expand Down Expand Up @@ -46,7 +47,6 @@
import org.eclipse.kapua.service.device.management.packages.model.install.DevicePackageInstallRequest;
import org.eclipse.kapua.service.device.management.packages.model.uninstall.DevicePackageUninstallRequest;
import org.eclipse.kapua.service.device.management.snapshot.DeviceSnapshots;
import org.eclipse.kapua.job.engine.commons.model.JobTargetSublist;
import org.eclipse.persistence.jaxb.JAXBContextFactory;

import javax.xml.bind.JAXBContext;
Expand All @@ -59,7 +59,7 @@ public class ConsoleJAXBContextProvider implements JAXBContextProvider {
public JAXBContext getJAXBContext() throws KapuaException {
try {
if (context == null) {
context = JAXBContextFactory.createContext(new Class<?>[] {
context = JAXBContextFactory.createContext(new Class<?>[]{
KuraDeviceComponentConfiguration.class,
KuraDeviceConfiguration.class,
KuraDeploymentPackage.class,
Expand Down Expand Up @@ -106,7 +106,7 @@ public JAXBContext getJAXBContext() throws KapuaException {
EventStoreRecordCreator.class,
EventStoreRecordListResult.class,
EventStoreRecordQuery.class,
EventStoreXmlRegistry.class
EventStoreXmlRegistry.class,

}, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.KapuaService;

import javax.validation.constraints.NotNull;

public interface JobEngineService extends KapuaService {

/**
Expand All @@ -25,7 +27,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when starting the job
* @since 1.0.0
*/
void startJob(KapuaId scopeId, KapuaId jobId) throws KapuaException;
void startJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) throws KapuaException;

/**
* Starts the {@link org.eclipse.kapua.service.job.Job} with the given {@link JobStartOptions}.
Expand All @@ -36,7 +38,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when starting the job
* @since 1.0.0
*/
void startJob(KapuaId scopeId, KapuaId jobId, JobStartOptions jobStartOptions) throws KapuaException;
void startJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, @NotNull JobStartOptions jobStartOptions) throws KapuaException;

/**
* Checks whether or not the {@link org.eclipse.kapua.service.job.Job} is running.
Expand All @@ -47,7 +49,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when checking the status of the job
* @since 1.0.0
*/
boolean isRunning(KapuaId scopeId, KapuaId jobId) throws KapuaException;
boolean isRunning(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) throws KapuaException;

/**
* Stops all the running {@link org.eclipse.kapua.service.job.execution.JobExecution} of the {@link org.eclipse.kapua.service.job.Job}.
Expand All @@ -63,7 +65,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when checking the status of the job
* @since 1.0.0
*/
void stopJob(KapuaId scopeId, KapuaId jobId) throws KapuaException;
void stopJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) throws KapuaException;

/**
* Stops the {@link org.eclipse.kapua.service.job.execution.JobExecution}.
Expand All @@ -80,7 +82,7 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when checking the status of the job
* @since 1.1.0
*/
void stopJobExecution(KapuaId scopeId, KapuaId jobId, KapuaId jobExecutionId) throws KapuaException;
void stopJobExecution(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, @NotNull KapuaId jobExecutionId) throws KapuaException;

void resumeJobExecution(KapuaId scopeId, KapuaId jobId, KapuaId jobExecutionId) throws KapuaException;

Expand All @@ -92,5 +94,5 @@ public interface JobEngineService extends KapuaService {
* @throws KapuaException if something goes bad when checking the status of the job
* @since 1.0.0
*/
void cleanJobData(KapuaId scopeId, KapuaId jobId) throws KapuaException;
void cleanJobData(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) throws KapuaException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
package org.eclipse.kapua.job.engine.remote;

import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.commons.model.id.IdGenerator;
import org.eclipse.kapua.commons.util.ArgumentValidator;
import org.eclipse.kapua.job.engine.JobEngineService;
import org.eclipse.kapua.job.engine.JobStartOptions;
import org.eclipse.kapua.locator.KapuaLocator;
import org.eclipse.kapua.locator.KapuaProvider;
import org.eclipse.kapua.model.domain.Actions;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.model.id.KapuaIdFactory;
import org.eclipse.kapua.service.authorization.AuthorizationService;
import org.eclipse.kapua.service.authorization.permission.PermissionFactory;
import org.eclipse.kapua.service.job.JobDomains;
Expand All @@ -30,6 +32,8 @@ public class JobEngineServiceRemote implements JobEngineService {

private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();

private static final KapuaIdFactory KAPUA_ID_FACTORY = LOCATOR.getFactory(KapuaIdFactory.class);

private static final AuthorizationService AUTHORIZATION_SERVICE = LOCATOR.getService(AuthorizationService.class);
private static final PermissionFactory PERMISSION_FACTORY = LOCATOR.getFactory(PermissionFactory.class);

Expand Down Expand Up @@ -57,13 +61,10 @@ public void startJob(KapuaId scopeId, KapuaId jobId, JobStartOptions jobStartOpt
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("scopeId", scopeId);
jobDataMap.put("jobId", jobId);

if (jobStartOptions != null) {
jobDataMap.put("jobStartOptions", jobStartOptions);
}
jobDataMap.put("jobStartOptions", jobStartOptions);

// Create the trigger
QuartzTriggerUtils.createQuartzTrigger(scopeId, jobId, jobDataMap);
QuartzTriggerUtils.createQuartzTrigger(scopeId, jobId, KAPUA_ID_FACTORY.newKapuaId(IdGenerator.generate()), jobDataMap);
} catch (Exception e) {
KapuaException.internalError(e, "Error!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public static void startJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, @N

//
// Retrieve job XML definition file. Delete it if exist
File jobXmlDefinitionFile = new File(jobTempDirectory, jobId.toCompactId().concat(".xml"));
File jobXmlDefinitionFile = new File(jobTempDirectory, jobId.toCompactId().concat("-").concat(String.valueOf(System.currentTimeMillis())).concat(".xml"));
if (jobXmlDefinitionFile.exists() && !jobXmlDefinitionFile.delete()) {
throw new CannotCleanJobDefFileDriverException(jobName, jobXmlDefinitionFile.getAbsolutePath());
}
Expand Down Expand Up @@ -252,10 +252,6 @@ public static void stopJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, Kap
try {
runningExecutions.forEach((runningExecution -> {
JOB_OPERATOR.stop(runningExecution.getExecutionId());

// if (JOB_ENGINE_SETTING.getBoolean(JobEngineSettingKeys.JOB_ENGINE_STOP_WAIT_CHECK)) {
// JbatchUtil.waitForStop(runningExecution, () -> JOB_OPERATOR.abandon(runningExecution.getExecutionId()));
// }
}));
} catch (NoSuchJobExecutionException e) {
throw new ExecutionNotFoundDriverException(e, jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionStatus;
import org.eclipse.kapua.locator.KapuaLocator;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.model.query.predicate.AttributePredicate;
import org.eclipse.kapua.service.job.Job;
import org.eclipse.kapua.service.job.execution.JobExecution;
import org.eclipse.kapua.service.job.execution.JobExecutionAttributes;
Expand All @@ -56,6 +57,7 @@
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Timer;

Expand Down Expand Up @@ -138,6 +140,7 @@ public void beforeJob() throws Exception {
JobExecution runningJobExecution = getAnotherJobExecutionRunning(
jobExecution.getScopeId(),
jobExecution.getJobId(),
jobExecution.getId(),
jobContextWrapper.getJobName(),
jobExecution.getTargetIds());

Expand Down Expand Up @@ -216,11 +219,15 @@ public void afterJob() throws Exception {
* @throws KapuaException If any error happens during the processing
*/
private JobExecution createJobExecution(KapuaId scopeId, KapuaId jobId, JobTargetSublist jobTargetSublist, Long jBatchExecutionId) throws KapuaException {

Properties jobExecutionProperties = new Properties();
jobExecutionProperties.put(JBATCH_EXECUTION_ID, Long.toString(jBatchExecutionId));

JobExecutionCreator jobExecutionCreator = JOB_EXECUTION_FACTORY.newCreator(scopeId);

jobExecutionCreator.setJobId(jobId);
jobExecutionCreator.setStartedOn(new Date());
jobExecutionCreator.getEntityAttributes().put(JBATCH_EXECUTION_ID, Long.toString(jBatchExecutionId));
jobExecutionCreator.setEntityAttributes(jobExecutionProperties);

if (jobTargetSublist.isEmpty()) {
JobTargetQuery jobTargetQuery = JOB_TARGET_FACTORY.newQuery(scopeId);
Expand Down Expand Up @@ -257,24 +264,26 @@ private JobExecution createJobExecution(KapuaId scopeId, KapuaId jobId, JobTarge
* <p>
* In other all other cases returns {@code null}.
*
* @param scopeId The current {@link JobExecution#getScopeId()}
* @param jobId The current {@link JobExecution#getJobId()}
* @param jobName The current {@link JobContextWrapper#getJobName()}
* @param jobTargetIdSubset The current {@link JobExecution#getTargetIds()} }
* @return The current running {@link JobExecution} or {@code null} if there is no running {@link JobExecution}
* @param scopeId The current {@link JobExecution#getScopeId()}.
* @param jobId The current {@link JobExecution#getJobId()}.
* @param currentJobExecutionId The current {@link JobExecution#getId()}.
* @param jobName The current {@link JobContextWrapper#getJobName()}.
* @param jobTargetIdSubset The current {@link JobExecution#getTargetIds()} }.
* @return The other running {@link JobExecution} or {@code null} if there is no other running {@link JobExecution}.
* @throws KapuaException If any error happens during the processing.
*/
private JobExecution getAnotherJobExecutionRunning(KapuaId scopeId, KapuaId jobId, String jobName, Set<KapuaId> jobTargetIdSubset) throws KapuaException {
private JobExecution getAnotherJobExecutionRunning(KapuaId scopeId, KapuaId jobId, KapuaId currentJobExecutionId, String jobName, Set<KapuaId> jobTargetIdSubset) throws KapuaException {
List<Long> runningExecutionsIds = BatchRuntime.getJobOperator().getRunningExecutions(jobName);
if (runningExecutionsIds.size() > 1) {

JobExecutionQuery jobExecutionQuery = JOB_EXECUTION_FACTORY.newQuery(scopeId);

jobExecutionQuery.setPredicate(
new AndPredicateImpl(
new AttributePredicateImpl<>(JobExecutionAttributes.JOB_ID, jobId),
new AttributePredicateImpl<>(JobExecutionAttributes.ENDED_ON, null),
new AttributePredicateImpl<>(JobExecutionAttributes.TARGET_IDS, jobTargetIdSubset.toArray())
jobExecutionQuery.andPredicate(
jobExecutionQuery.attributePredicate(JobExecutionAttributes.JOB_ID, jobId),
jobExecutionQuery.attributePredicate(JobExecutionAttributes.ENTITY_ID, currentJobExecutionId, AttributePredicate.Operator.NOT_EQUAL),
jobExecutionQuery.attributePredicate(JobExecutionAttributes.ENDED_ON, null),
jobExecutionQuery.attributePredicate(JobExecutionAttributes.TARGET_IDS, jobTargetIdSubset.toArray())
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ jobEngine.stop.wait.check.time.max=60000
jobEngine.stop.wait.check.time.interval=5000


jobEngine.queue.check.delay=4000
jobEngine.queue.processing.run.delay=500
jobEngine.queue.check.delay=1000
jobEngine.queue.processing.run.delay=700
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
*******************************************************************************/
package org.eclipse.kapua.service.device.management.job.internal;

import org.eclipse.kapua.KapuaEntityCloneException;
import org.eclipse.kapua.locator.KapuaProvider;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.device.management.job.JobDeviceManagementOperation;
Expand Down Expand Up @@ -46,4 +47,13 @@ public JobDeviceManagementOperationQuery newQuery(KapuaId scopeId) {
public JobDeviceManagementOperationListResult newListResult() {
return new JobDeviceManagementOperationListResultImpl();
}

@Override
public JobDeviceManagementOperation clone(JobDeviceManagementOperation jobDeviceManagementOperation) {
try {
return new JobDeviceManagementOperationImpl(jobDeviceManagementOperation);
} catch (Exception e) {
throw new KapuaEntityCloneException(e, JobDeviceManagementOperation.TYPE, jobDeviceManagementOperation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,37 @@ public class JobDeviceManagementOperationImpl extends AbstractKapuaUpdatableEnti
})
private KapuaEid deviceManagementOperationId;

/**
* Constructor.
*
* @since 1.1.0
*/
public JobDeviceManagementOperationImpl() {
}

/**
* Constructor.
*
* @param scopeId The scope {@link KapuaId} to set into the {@link JobDeviceManagementOperation}.
* @since 1.1.0
*/
public JobDeviceManagementOperationImpl(KapuaId scopeId) {
super(scopeId);
}

/**
* Clone constructor.
*
* @param jobDeviceManagementOperation The {@link JobDeviceManagementOperation} to clone.
* @since 1.1.0
*/
public JobDeviceManagementOperationImpl(JobDeviceManagementOperation jobDeviceManagementOperation) {
super(jobDeviceManagementOperation);

setJobId(jobDeviceManagementOperation.getJobId());
setDeviceManagementOperationId(jobDeviceManagementOperation.getDeviceManagementOperationId());
}

@Override
public KapuaId getJobId() {
return jobId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
*******************************************************************************/
package org.eclipse.kapua.service.device.management.job.internal;

import org.eclipse.kapua.commons.model.query.predicate.AbstractKapuaQuery;
import org.eclipse.kapua.commons.model.query.AbstractKapuaQuery;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.device.management.job.JobDeviceManagementOperation;
import org.eclipse.kapua.service.device.management.job.JobDeviceManagementOperationQuery;
Expand All @@ -24,9 +24,10 @@
public class JobDeviceManagementOperationQueryImpl extends AbstractKapuaQuery<JobDeviceManagementOperation> implements JobDeviceManagementOperationQuery {

/**
* Constructor
* Constructor.
*
* @param scopeId
* @since 1.1.0
*/
public JobDeviceManagementOperationQueryImpl(KapuaId scopeId) {
super(scopeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public KapuaJobLauncher() {
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
org.eclipse.kapua.service.job.Job job = KapuaSecurityUtils.doPrivileged(() -> jobService.find(getScopeId(), getJobId()));

if (job == null) {
throw new KapuaEntityNotFoundException(org.eclipse.kapua.service.job.Job.class.getName(), jobId);
}
Expand Down
Loading

0 comments on commit 0f676a4

Please sign in to comment.