Skip to content

Commit

Permalink
Jun/few improvements (#87)
Browse files Browse the repository at this point in the history
* Make the step action tiemout and check interval to be configurable.

* Improve the error message when the step id does not exist in the referenced params.
  • Loading branch information
jun-he authored Feb 9, 2025
1 parent fd3b451 commit 6080d88
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.maestro.engine.execution.WorkflowSummary;
import com.netflix.maestro.engine.jobevents.StepInstanceUpdateJobEvent;
import com.netflix.maestro.engine.jobevents.StepInstanceWakeUpEvent;
import com.netflix.maestro.engine.properties.StepActionProperties;
import com.netflix.maestro.engine.publisher.MaestroJobEventPublisher;
import com.netflix.maestro.engine.utils.ObjectHelper;
import com.netflix.maestro.engine.utils.TimeUtils;
Expand Down Expand Up @@ -69,9 +70,6 @@
*/
@Slf4j
public class MaestroStepInstanceActionDao extends AbstractDatabaseDao {
private static final long ACTION_TIMEOUT = 30 * 1000L; // 30 sec
private static final long CHECK_INTERVAL = 1000L; // 1 sec

private static final String INSERT_ACTION_QUERY =
"INSERT INTO maestro_step_instance_action (payload) VALUES (?) ON CONFLICT DO NOTHING";
private static final String INSTANCE_CONDITION =
Expand All @@ -89,18 +87,23 @@ public class MaestroStepInstanceActionDao extends AbstractDatabaseDao {

private final MaestroStepInstanceDao stepInstanceDao;
private final MaestroJobEventPublisher eventPublisher;
private final long actionTimeout;
private final long checkInterval;

/** step instance action dao constructor. */
public MaestroStepInstanceActionDao(
DataSource dataSource,
ObjectMapper objectMapper,
DatabaseConfiguration config,
StepActionProperties stepActionProperties,
MaestroStepInstanceDao stepInstanceDao,
MaestroJobEventPublisher eventPublisher,
MaestroMetrics metrics) {
super(dataSource, objectMapper, config, metrics);
this.stepInstanceDao = stepInstanceDao;
this.eventPublisher = eventPublisher;
this.actionTimeout = stepActionProperties.getActionTimeout();
this.checkInterval = stepActionProperties.getCheckInterval();
}

/** restart a restartable step instance in a non-terminal workflow instance. */
Expand Down Expand Up @@ -300,7 +303,7 @@ private RunResponse waitResponseWithTimeout(StepInstance stepInstance, StepActio
try {
long startTime = System.currentTimeMillis();
boolean isForeachStepRunning = isForeachStepRunningAndRestartable(stepInstance);
while (System.currentTimeMillis() - startTime < ACTION_TIMEOUT) {
while (System.currentTimeMillis() - startTime < actionTimeout) {
StepInstance stepView =
stepInstanceDao.getStepInstanceView(
stepInstance.getWorkflowId(),
Expand All @@ -327,7 +330,7 @@ private RunResponse waitResponseWithTimeout(StepInstance stepInstance, StepActio
}
}

TimeUtils.sleep(CHECK_INTERVAL);
TimeUtils.sleep(checkInterval);
}
} finally {
deleteAction(stepInstance, action.getAction());
Expand All @@ -342,7 +345,7 @@ private StepInstanceActionResponse waitBypassStepDependenciesResponseWithTimeout
StepInstance stepInstance, StepAction action) {
try {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < ACTION_TIMEOUT) {
while (System.currentTimeMillis() - startTime < actionTimeout) {

StepRuntimeState state =
stepInstanceDao.getStepInstanceRuntimeState(
Expand All @@ -352,7 +355,7 @@ private StepInstanceActionResponse waitBypassStepDependenciesResponseWithTimeout
stepInstance.getStepId(),
Long.toString(stepInstance.getStepAttemptId()));
if (state.getStatus() == StepInstance.Status.WAITING_FOR_SIGNALS) {
TimeUtils.sleep(CHECK_INTERVAL);
TimeUtils.sleep(checkInterval);
} else {
return createActionResponseFrom(stepInstance, state, action.toTimelineEvent());
}
Expand Down Expand Up @@ -424,7 +427,7 @@ public StepInstanceActionResponse terminate(

if (blocking) {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < ACTION_TIMEOUT) {
while (System.currentTimeMillis() - startTime < actionTimeout) {
StepRuntimeState state =
stepInstanceDao.getStepInstanceRuntimeState(
stepInstance.getWorkflowId(),
Expand All @@ -435,7 +438,7 @@ public StepInstanceActionResponse terminate(
if (!state.getStatus().shouldWakeup()) {
return createActionResponseFrom(stepInstance, state, stepAction.toTimelineEvent());
}
TimeUtils.sleep(CHECK_INTERVAL);
TimeUtils.sleep(checkInterval);
}

throw new MaestroTimeoutException(
Expand Down Expand Up @@ -539,7 +542,7 @@ private Optional<StepAction> getAction(WorkflowSummary summary, String stepId) {
if (action.getAction() != null && action.getAction().isUsingUpstream()) {
stepAction = action;
break;
} else if (System.currentTimeMillis() - action.getCreateTime() < ACTION_TIMEOUT
} else if (System.currentTimeMillis() - action.getCreateTime() < actionTimeout
&& action.getWorkflowId().equals(self.getWorkflowId())
&& action.getWorkflowInstanceId() == self.getInstanceId()
&& action.getWorkflowRunId() == self.getRunId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.netflix.maestro.engine.execution.StepRuntimeSummary;
import com.netflix.maestro.engine.utils.StepHelper;
import com.netflix.maestro.exceptions.MaestroInternalError;
import com.netflix.maestro.exceptions.MaestroInvalidExpressionException;
import com.netflix.maestro.exceptions.MaestroRuntimeException;
import com.netflix.maestro.exceptions.MaestroUnprocessableEntityException;
import com.netflix.maestro.exceptions.MaestroValidationException;
Expand Down Expand Up @@ -157,7 +158,8 @@ private Map<String, Parameter> getReferencedParams(
} else if (refParamName.contains(STEP_PARAM_SEPARATOR)) {
// here it might be from signal triggers
Map.Entry<String, String> pair = parseReferenceName(refParamName, Collections.emptyMap());
refParams.put(refParamName, getReferenceSignalParam(pair.getKey(), pair.getValue()));
refParams.put(
refParamName, getReferenceSignalParam(refParamName, pair.getKey(), pair.getValue()));
} else {
throw new MaestroValidationException(
"Param [%s] referenced a non-existing param [%s] in workflow [%s]",
Expand Down Expand Up @@ -408,14 +410,17 @@ private Parameter getReferenceParam(

// here it might be from signal triggers or signal dependencies (not supported yet)
if (!allStepOutputData.containsKey(refStepId)) {
return getReferenceSignalParam(refStepId, refParamName);
return getReferenceSignalParam(refParam, refStepId, refParamName);
}

Map<String, Object> refStepData =
Checks.notNull(
allStepOutputData.get(refStepId),
"Error: reference a non-existing step [%s] in the expression.",
refStepId);
"Error: param [%s] in step [%s] referenced a non-existing step [%s] in the expression [%s]",
paramName,
stepId,
refStepId,
refParam);
StepRuntimeSummary refRuntimeSummary =
StepHelper.retrieveRuntimeSummary(objectMapper, refStepData);
Parameter refStepParam =
Expand All @@ -439,15 +444,26 @@ private Parameter getReferenceParam(
* Extract signal param value from the signal and wrap it into a Parameter. Currently, it only
* returns a StringParameter.
*/
private Parameter getReferenceSignalParam(String signalName, String paramName) {
String expr = String.format(SIGNAL_EXPRESSION_TEMPLATE, signalName, paramName);
Object val = exprEvaluator.eval(expr, Collections.emptyMap());
Parameter param =
ParamHelper.deriveTypedParameter(
paramName, expr, val, null, ParamMode.IMMUTABLE, Collections.emptyMap());
param.setEvaluatedResult(val);
param.setEvaluatedTime(System.currentTimeMillis());
return param;
@SuppressWarnings({"PMD.PreserveStackTrace"})
private Parameter getReferenceSignalParam(String refParam, String signalName, String paramName) {
try {
String expr = String.format(SIGNAL_EXPRESSION_TEMPLATE, signalName, paramName);
Object val = exprEvaluator.eval(expr, Collections.emptyMap());
Parameter param =
ParamHelper.deriveTypedParameter(
paramName, expr, val, null, ParamMode.IMMUTABLE, Collections.emptyMap());
param.setEvaluatedResult(val);
param.setEvaluatedTime(System.currentTimeMillis());
return param;
} catch (MaestroRuntimeException e) {
LOG.warn(
"Failed to evaluate [{}] as a param within signal triggers or dependencies due to",
refParam,
e);
throw new MaestroInvalidExpressionException(
"Failed to evaluate the param with a definition: [%s]. Please check if there is a typo in the expression.",
refParam);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.netflix.maestro.engine.properties;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

/**
* Step Action properties. Please check {@link
* com.netflix.maestro.engine.dao.MaestroStepInstanceActionDao} about how they are used.
*/
@Getter
@AllArgsConstructor
@ToString
@Builder
public class StepActionProperties {
private final long actionTimeout;
private final long checkInterval;
}
Loading

0 comments on commit 6080d88

Please sign in to comment.