Skip to content

Commit

Permalink
Added QueueJobExecution.status field
Browse files Browse the repository at this point in the history
Signed-off-by: coduz <alberto.codutti@eurotech.com>
  • Loading branch information
Coduz committed Feb 15, 2019
1 parent 9c0b22b commit b96f163
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ default String getType() {
KapuaId getWaitForJobExecutionId();

void setWaitForJobExecutionId(KapuaId waitForJobExecutionId);

QueuedJobExecutionStatus getStatus();

void setStatus(QueuedJobExecutionStatus status);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ public interface QueuedJobExecutionCreator extends KapuaUpdatableEntityCreator<Q
KapuaId getWaitForJobExecutionId();

void setWaitForJobExecutionId(KapuaId waitForJobExecutionId);

QueuedJobExecutionStatus getStatus();

void setStatus(QueuedJobExecutionStatus status);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*******************************************************************************
* Copyright (c) 2019 Eurotech and/or its affiliates and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.job.engine.queue;

/**
* The status of the {@link QueuedJobExecution}
*
* @since 1.1.0
*/
public enum QueuedJobExecutionStatus {

/**
* The {@link QueuedJobExecution} has been enqueued and it is waiting to be resumed.
* <p>
* This is the initial status in which a newly created {@link QueuedJobExecution} should be in.
*/
QUEUED,

/**
* The {@link QueuedJobExecution} has been resumed and fired.
*/
PROCESSED
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionCreator;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionFactory;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionService;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionStatus;
import org.eclipse.kapua.locator.KapuaLocator;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.job.Job;
Expand Down Expand Up @@ -299,6 +300,7 @@ private QueuedJobExecution enqueueJobExecution(KapuaId scopeId, KapuaId jobId, K
queuedJobExecutionCreator.setJobId(jobId);
queuedJobExecutionCreator.setJobExecutionId(jobExecutionId);
queuedJobExecutionCreator.setWaitForJobExecutionId(runningJobExecutionId);
queuedJobExecutionCreator.setStatus(QueuedJobExecutionStatus.QUEUED);

return KapuaSecurityUtils.doPrivileged(() -> QUEUED_JOB_SERVICE.create(queuedJobExecutionCreator));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionListResult;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionQuery;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionService;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionStatus;
import org.eclipse.kapua.locator.KapuaLocator;
import org.eclipse.kapua.model.id.KapuaId;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,6 +78,9 @@ public void run() {

try {
KapuaSecurityUtils.doPrivileged(() -> JOB_ENGINE_SERVICE.resumeJobExecution(qje.getScopeId(), qje.getJobId(), qje.getJobExecutionId()));

qje.setStatus(QueuedJobExecutionStatus.PROCESSED);
KapuaSecurityUtils.doPrivileged(() -> QUEUED_JOB_EXECUTION_SERVICE.update(qje));
} catch (Exception e) {
LOG.error("Resuming Job Execution: {}... ERROR!", qje.getJobExecutionId(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.eclipse.kapua.commons.model.AbstractKapuaUpdatableEntityCreator;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecution;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionCreator;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionStatus;
import org.eclipse.kapua.locator.KapuaProvider;
import org.eclipse.kapua.model.id.KapuaId;

Expand All @@ -25,6 +26,7 @@ public class QueuedJobExecutionCreatorImpl extends AbstractKapuaUpdatableEntityC
private KapuaId jobId;
private KapuaId jobExecutionId;
private KapuaId waitForJobExecutionId;
private QueuedJobExecutionStatus status;

protected QueuedJobExecutionCreatorImpl(KapuaId scopeId) {
super(scopeId);
Expand Down Expand Up @@ -59,4 +61,14 @@ public KapuaId getWaitForJobExecutionId() {
public void setWaitForJobExecutionId(KapuaId waitForJobExecutionId) {
this.waitForJobExecutionId = waitForJobExecutionId;
}

@Override
public QueuedJobExecutionStatus getStatus() {
return status;
}

@Override
public void setStatus(QueuedJobExecutionStatus status) {
this.status = status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public static QueuedJobExecution create(EntityManager em, QueuedJobExecutionCrea
queuedJobExecutionImpl.setJobId(queuedJobExecutionCreator.getJobId());
queuedJobExecutionImpl.setJobExecutionId(queuedJobExecutionCreator.getJobExecutionId());
queuedJobExecutionImpl.setWaitForJobExecutionId(queuedJobExecutionCreator.getWaitForJobExecutionId());
queuedJobExecutionImpl.setStatus(queuedJobExecutionCreator.getStatus());

return ServiceDAO.create(em, queuedJobExecutionImpl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
import org.eclipse.kapua.commons.model.AbstractKapuaUpdatableEntity;
import org.eclipse.kapua.commons.model.id.KapuaEid;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecution;
import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionStatus;
import org.eclipse.kapua.model.id.KapuaId;

import javax.persistence.AttributeOverride;
import javax.persistence.AttributeOverrides;
import javax.persistence.Column;
import javax.persistence.Embedded;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Table;

@Entity(name = "QueuedJobExecution")
Expand All @@ -47,6 +50,10 @@ public class QueuedJobExecutionImpl extends AbstractKapuaUpdatableEntity impleme
})
private KapuaEid waitForJobExecutionId;

@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false, updatable = true)
private QueuedJobExecutionStatus status;

public QueuedJobExecutionImpl() {
}

Expand Down Expand Up @@ -84,5 +91,13 @@ public void setWaitForJobExecutionId(KapuaId waitForJobExecutionId) {
this.waitForJobExecutionId = KapuaEid.parseKapuaId(waitForJobExecutionId);
}

@Override
public QueuedJobExecutionStatus getStatus() {
return status;
}

@Override
public void setStatus(QueuedJobExecutionStatus status) {
this.status = status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
<column name="wait_for_job_execution_id" type="bigint(21) unsigned">
<constraints nullable="false"/>
</column>
<column name="status" type="varchar(64)">
<constraints nullable="false"/>
</column>

<column name="optlock" type="int unsigned"/>
<column name="attributes" type="text"/>
Expand Down

0 comments on commit b96f163

Please sign in to comment.