Skip to content

Commit

Permalink
Added JobStartOption.restartStepIndex property
Browse files Browse the repository at this point in the history
Signed-off-by: coduz <alberto.codutti@eurotech.com>
  • Loading branch information
Coduz committed Apr 4, 2019
1 parent d2c0f22 commit a68f2f3
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void stopExecution(String gwtScopeId, String gwtJobId, String gwtJobExecu
@Override
public void restart(String gwtScopeId, String gwtJobId) throws GwtKapuaException {
GwtJobStartOptions gwtJobStartOptions = new GwtJobStartOptions();
gwtJobStartOptions.setResetStepIndex(true);
gwtJobStartOptions.setFromStepIndex(0);
gwtJobStartOptions.setTargetIdSublist(new ArrayList<String>());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2018 Eurotech and/or its affiliates and others
* Copyright (c) 2018, 2019 Eurotech and/or its affiliates and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
Expand All @@ -18,6 +18,7 @@
public class GwtJobStartOptions implements Serializable {

private List<String> targetIdSublist = new ArrayList<String>();
private boolean resetStepIndex;
private Integer fromStepIndex;

public List<String> getTargetIdSublist() {
Expand All @@ -28,6 +29,14 @@ public void setTargetIdSublist(List<String> targetIdSublist) {
this.targetIdSublist = targetIdSublist;
}

public boolean getResetStepIndex() {
return resetStepIndex;
}

public void setResetStepIndex(boolean resetStepIndex) {
this.resetStepIndex = resetStepIndex;
}

public Integer getFromStepIndex() {
return fromStepIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ public static JobExecutionQuery convertJobExecutionQuery(PagingLoadConfig paging
public static JobStartOptions convertJobStartOptions(GwtJobStartOptions gwtJobStartOptions) {
JobStartOptions jobStartOptions = JOB_ENGINE_FACTORY.newJobStartOptions();
jobStartOptions.setTargetIdSublist(convertTargetIdSublist(gwtJobStartOptions.getTargetIdSublist()));
jobStartOptions.setResetStepIndex(gwtJobStartOptions.getResetStepIndex());
jobStartOptions.setFromStepIndex(gwtJobStartOptions.getFromStepIndex());
return jobStartOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.eclipse.kapua.KapuaSerializable;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.model.id.KapuaIdAdapter;
import org.eclipse.kapua.service.job.targets.JobTarget;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
Expand Down Expand Up @@ -72,6 +73,22 @@ public interface JobStartOptions extends KapuaSerializable {
@XmlTransient
void addTargetIdToSublist(KapuaId targetId);

/**
* Gets whether or not the {@link JobTarget#getStepIndex()} needs to be reset to the given {@link #getFromStepIndex()}.
*
* @return {@code true} if the {@link JobTarget#getStepIndex()} needs to be reset to the given {@link #getFromStepIndex()}, {@code false} otherwise.
* @since 1.1.0
*/
boolean getResetStepIndex();

/**
* Sets whether or not the {@link JobTarget#getStepIndex()} needs to be reset to the given {@link #getFromStepIndex()}.
*
* @param resetStepIndex {@code true} if the {@link JobTarget#getStepIndex()} needs to be reset to the given {@link #getFromStepIndex()}, {@code false} otherwise.
* @since 1.1.0
*/
void setResetStepIndex(boolean resetStepIndex);

/**
* Gets the starting {@link org.eclipse.kapua.service.job.step.JobStep} index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public Object readItem() throws Exception {
* @param andPredicate The {@link org.eclipse.kapua.model.query.predicate.AndPredicate} where to apply {@link org.eclipse.kapua.model.query.predicate.QueryPredicate}
* @since 1.0.0
*/
protected void stepIndexFiltering(JobContextWrapper jobContextWrapper, StepContextWrapper stepContextWrapper, KapuaQuery query, AndPredicate andPredicate) {
protected void stepIndexFiltering1(JobContextWrapper jobContextWrapper, StepContextWrapper stepContextWrapper, KapuaQuery query, AndPredicate andPredicate) {
Integer fromStepIndex = jobContextWrapper.getFromStepIndex();
if (fromStepIndex == null || fromStepIndex < stepContextWrapper.getStepIndex()) {
andPredicate.and(query.attributePredicate(JobTargetAttributes.STEP_INDEX, stepContextWrapper.getStepIndex()));
Expand All @@ -153,6 +153,20 @@ protected void stepIndexFiltering(JobContextWrapper jobContextWrapper, StepConte
}
}

protected void stepIndexFiltering(JobContextWrapper jobContextWrapper, StepContextWrapper stepContextWrapper, KapuaQuery query, AndPredicate andPredicate) {

// Select all targets that aren't in PROCESS_OK status
andPredicate.and(query.attributePredicate(JobTargetAttributes.STATUS, JobTargetStatus.PROCESS_OK, AttributePredicate.Operator.NOT_EQUAL));

// Select all target that are at the current step
andPredicate.and(query.attributePredicate(JobTargetAttributes.STEP_INDEX, stepContextWrapper.getStepIndex()));

// Select all targets at or after the given fromStepIndex (if specified)
if (jobContextWrapper.getFromStepIndex() != null) {
andPredicate.and(query.attributePredicate(JobTargetAttributes.STEP_INDEX, jobContextWrapper.getFromStepIndex(), AttributePredicate.Operator.GREATER_THAN_OR_EQUAL));
}
}

/**
* This method apply {@link AttributePredicate}s according to the parameters contained into the {@link JobContextWrapper#getTargetSublist()}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,48 @@
*
* @since 1.0.0
*/
public interface JobContextPropertyNames {
public class JobContextPropertyNames {

private JobContextPropertyNames() {
}

/**
* @since 1.0.0
*/
String JOB_SCOPE_ID = "job.scopeId";
public static final String JOB_SCOPE_ID = "job.scopeId";

/**
* @since 1.0.0
*/
String JOB_ID = "job.id";
public static final String JOB_ID = "job.id";

/**
* @since 1.0.0
*/
String JOB_TARGET_SUBLIST = "job.target.sublist";
public static final String JOB_TARGET_SUBLIST = "job.target.sublist";

/**
* @since 1.1.0
*/
String RESUMED_KAPUA_EXECUTION_ID = "job.execution.resumedId";
public static final String RESUMED_KAPUA_EXECUTION_ID = "job.execution.resumedId";

/**
* since 1.1.0
*/
public static final String RESET_STEP_INDEX = "job.step.resetIndex";

/**
* @since 1.0.0
*/
String JOB_STEP_FROM_INDEX = "job.step.fromIndex";
public static final String JOB_STEP_FROM_INDEX = "job.step.fromIndex";

/**
* @since 1.1.0
*/
String ENQUEUE = "job.enqueue";
public static final String ENQUEUE = "job.enqueue";

/**
* @since 1.1.0
*/
String KAPUA_EXECUTION_ID = "job.execution.id";
public static final String KAPUA_EXECUTION_ID = "job.execution.id";
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ public KapuaId getResumedJobExecutionId() {
return Strings.isNullOrEmpty(resumedKapuaExecutionIdString) ? null : KapuaEid.parseCompactId(resumedKapuaExecutionIdString);
}

/**
* Gets whether or not the {@link org.eclipse.kapua.service.job.targets.JobTarget}s needs to be reset to the given {@link #getFromStepIndex()}.
*
* @return {@code true} if the {@link org.eclipse.kapua.service.job.targets.JobTarget}s needs to be reset to the given {@link #getFromStepIndex()}, {@code false} otherwise.
* @since 1.1.0
*/
public boolean getResetStepIndex() {
String resetFromIndexString = getProperties().getProperty(JobContextPropertyNames.RESET_STEP_INDEX);
return resetFromIndexString != null && Boolean.valueOf(resetFromIndexString);
}

/**
* Gets the start step index of the {@link org.eclipse.kapua.service.job.execution.JobExecution}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public class JobStartOptionsRemote implements JobStartOptions {

private Set<KapuaId> targetIdSublist;
private boolean resetStepIndex;
private Integer fromStepIndex;
private boolean enqueue;

Expand Down Expand Up @@ -58,6 +59,16 @@ public void removeTargetIdToSublist(KapuaId targetId) {
getTargetIdSublist().remove(targetId);
}

@Override
public boolean getResetStepIndex() {
return resetStepIndex;
}

@Override
public void setResetStepIndex(boolean resetStepIndex) {
this.resetStepIndex = resetStepIndex;
}

@Override
public Integer getFromStepIndex() {
return fromStepIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public class JobStartOptionsImpl implements JobStartOptions {

private Set<KapuaId> targetIdSublist;
private boolean resetStepIndex;
private Integer fromStepIndex;
private boolean enqueue;

Expand All @@ -46,6 +47,7 @@ public JobStartOptionsImpl(JobStartOptions jobStartOptions) {
this();

setTargetIdSublist(jobStartOptions.getTargetIdSublist());
setResetStepIndex(jobStartOptions.getResetStepIndex());
setFromStepIndex(jobStartOptions.getFromStepIndex());
setEnqueue(jobStartOptions.getEnqueue());
}
Expand Down Expand Up @@ -74,6 +76,16 @@ public void removeTargetIdToSublist(KapuaId targetId) {
getTargetIdSublist().remove(targetId);
}

@Override
public boolean getResetStepIndex() {
return resetStepIndex;
}

@Override
public void setResetStepIndex(boolean resetStepIndex) {
this.resetStepIndex = resetStepIndex;
}

@Override
public Integer getFromStepIndex() {
return fromStepIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public static JSLProperties buildJobProperties(@NotNull KapuaId scopeId, @NotNul
resumedJobExecutionIdProperty.setValue("#{jobParameters['" + JobContextPropertyNames.RESUMED_KAPUA_EXECUTION_ID + "']}");
jslPropertyList.add(resumedJobExecutionIdProperty);

// Reset target step index
Property resetSterIndexProperty = new Property();
resetSterIndexProperty.setName(JobContextPropertyNames.RESET_STEP_INDEX);
resetSterIndexProperty.setValue(String.valueOf(jobStartOptions.getResetStepIndex()));
jslPropertyList.add(resetSterIndexProperty);

// Job from step index
if (jobStartOptions.getFromStepIndex() != null) {
Property stepFromIndexProperty = new Property();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,24 +177,26 @@ public void beforeJob() throws Exception {
}
}

if (jobContextWrapper.getFromStepIndex() != null) {
jobLogger.info("Resetting targets to step index: {}...", jobContextWrapper.getFromStepIndex());
if (jobContextWrapper.getResetStepIndex()) {
int resetToStepIndex = jobContextWrapper.getFromStepIndex() != null ? jobContextWrapper.getFromStepIndex() : 0;

try {
jobLogger.info("Resetting {} targets to step index: {}...", jobExecution.getTargetIds().size(), resetToStepIndex);

for (KapuaId jobTargetId : jobExecution.getTargetIds()) {
JobTarget jobTarget = KapuaSecurityUtils.doPrivileged(() -> JOB_TARGET_SERVICE.find(jobExecution.getScopeId(), jobTargetId));

jobTarget.setStepIndex(jobContextWrapper.getFromStepIndex());
jobTarget.setStepIndex(resetToStepIndex);
jobTarget.setStatus(JobTargetStatus.PROCESS_AWAITING);
jobTarget.setStatusMessage(null);
jobTarget.setException(null);

KapuaSecurityUtils.doPrivileged(() -> JOB_TARGET_SERVICE.update(jobTarget));
}
} catch (KapuaException e) {
jobLogger.error(e, "Resetting targets to step index: {}... ERROR!", jobContextWrapper.getFromStepIndex());
jobLogger.error(e, "Resetting {} targets to step index: {}... ERROR!", jobExecution.getTargetIds().size(), resetToStepIndex);
}
jobLogger.info("Resetting targets to step index: {}... DONE!", jobContextWrapper.getFromStepIndex());
jobLogger.info("Resetting {} targets to step index: {}... DONE!", jobExecution.getTargetIds().size(), resetToStepIndex);
}

jobLogger.info("Running before job... DONE!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class JobTargetImpl extends AbstractKapuaUpdatableEntity implements JobTa

@Basic
@Column(name = "step_index", nullable = false, updatable = true)
private int stepIndex;
private Integer stepIndex;

@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false, updatable = true)
Expand Down

0 comments on commit a68f2f3

Please sign in to comment.