Skip to content

Commit

Permalink
ansible: ansible runner client refactoring
Browse files Browse the repository at this point in the history
eliminate wrong use of singleton's class properties - they get
overwritten by each run. Whole state is now passed through
AnsibleReturnValue object consistently.
  • Loading branch information
michalskrivanek committed Aug 1, 2022
1 parent cc47990 commit 244a04d
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.ovirt.engine.core.bll.tasks.interfaces.CommandCallback;
import org.ovirt.engine.core.common.AuditLogType;
import org.ovirt.engine.core.common.action.AnsibleCommandParameters;
import org.ovirt.engine.core.common.utils.ansible.AnsibleReturnCode;
import org.ovirt.engine.core.common.utils.ansible.AnsibleReturnValue;
import org.ovirt.engine.core.common.utils.ansible.AnsibleRunnerClient;
import org.ovirt.engine.core.common.utils.ansible.AnsibleRunnerLogger;
import org.ovirt.engine.core.compat.CommandStatus;
Expand Down Expand Up @@ -42,8 +44,10 @@ public class AnsibleCallback implements CommandCallback {
public void doPolling(Guid cmdId, List<Guid> childCmdIds) {
CommandBase<AnsibleCommandParameters> command = getCommand(cmdId);
String playUuid = command.getParameters().getPlayUuid();
AnsibleReturnValue ret = new AnsibleReturnValue(AnsibleReturnCode.ERROR);
ret.setPlayUuid(playUuid);
ret.setLastEventId(command.getParameters().getLastEventId());
StringBuilder stdout = command.getParameters().getStringBuilder();
runnerClient.setLogger(new AnsibleRunnerLogger(command.getParameters().getLogFile()));
BiConsumer<String, String> fn = (String taskName, String eventUrl) -> {
AuditLogable logable = createAuditLogable(command, taskName);
auditLogDirector.log(logable, AuditLogType.ANSIBLE_RUNNER_EVENT_NOTIFICATION);
Expand All @@ -64,11 +68,11 @@ public void doPolling(Guid cmdId, List<Guid> childCmdIds) {
// Process the events if the playbook is running:
totalEvents = runnerClient.getTotalEvents(playUuid);

log.debug("LastEventId: {} totalEvents: {} playbookStatus: {}", command.getParameters().getLastEventId(), totalEvents, playbookStatus);
log.debug("LastEventId: {} totalEvents: {} playbookStatus: {}", ret.getLastEventId(), totalEvents, playbookStatus);
if (msg.equalsIgnoreCase("running") || msg.equalsIgnoreCase("successful")
&& command.getParameters().getLastEventId() < totalEvents) {
command.getParameters().setLastEventId(runnerClient.processEvents(
playUuid, command.getParameters().getLastEventId(), fn));
runnerClient.processEvents(ret , fn, new AnsibleRunnerLogger(command.getParameters().getLogFile()));
command.setParameters().setLastEventId(ret.getLastEventId());
return;
} else if (msg.equalsIgnoreCase("successful")) {
log.info("Playbook (Play uuid = {}, command = {}) has completed!",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,8 @@
@Singleton
public class AnsibleClientFactory {

@Inject
private AnsibleCommandLogFileFactory ansibleCommandLogFileFactory;

public AnsibleRunnerClient create(AnsibleCommandConfig command) {
AnsibleRunnerLogger runnerLogger = ansibleCommandLogFileFactory.create(command);
AnsibleRunnerClient client = new AnsibleRunnerClient();
client.setLogger(runnerLogger);
return client;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

/*
* Copyright oVirt Authors
* SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -40,6 +41,9 @@ public class AnsibleExecutor {
@Inject
private AnsibleClientFactory ansibleClientFactory;

@Inject
private AnsibleCommandLogFileFactory ansibleCommandLogFileFactory;

/**
* Executes ansible-playbook command. Default timeout is specified by ANSIBLE_PLAYBOOK_EXEC_DEFAULT_TIMEOUT variable
* in engine.conf.
Expand Down Expand Up @@ -150,10 +154,12 @@ public AnsibleReturnValue runCommand(AnsibleCommandConfig commandConfig, int tim

String playUuid = null;
AnsibleRunnerClient runnerClient = null;
AnsibleRunnerLogger runnerLogger = null;
try {
runnerClient = ansibleClientFactory.create(commandConfig);
runnerLogger = ansibleCommandLogFileFactory.create(commandConfig);
ret.setLogFile(runnerLogger.getLogFile());
playUuid = commandConfig.getUuid().toString();
ret.setLogFile(runnerClient.getLogger().getLogFile());
ret.setPlayUuid(playUuid);
ret.setStdout(String.format("%1$s/%2$s/artifacts/%2$s/stdout", AnsibleConstants.ANSIBLE_RUNNER_PATH, playUuid));
ret.setLastEventId(0);
Expand All @@ -167,7 +173,7 @@ public AnsibleReturnValue runCommand(AnsibleCommandConfig commandConfig, int tim
return ret;
}

ret = runnerClient.artifactHandler(commandConfig.getUuid(), ret.getLastEventId(), timeout, fn);
runnerClient.artifactHandler(ret, timeout, fn, runnerLogger);
} catch (InventoryException ex) {
String message = ex.getMessage();
log.error("Error executing playbook: {}", message);
Expand All @@ -180,8 +186,8 @@ public AnsibleReturnValue runCommand(AnsibleCommandConfig commandConfig, int tim
ret.setStderr(ex.getMessage());
} finally {
// Make sure all events are proccessed even in case of failure:
if (playUuid != null && runnerClient != null && !async) {
runnerClient.processEvents(playUuid, ret.getLastEventId(), fn);
if (playUuid != null && runnerClient != null && runnerLogger != null && !async) {
runnerClient.processEvents(ret, fn, runnerLogger);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public AnsibleReturnValue(AnsibleReturnCode ansibleReturnCode) {
public AnsibleReturnValue(AnsibleReturnCode ansibleReturnCode, String stdout) {
this.ansibleReturnCode = ansibleReturnCode;
this.stdout = stdout;
this.lastEventId = 0;
}

public AnsibleReturnCode getAnsibleReturnCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
Expand All @@ -38,17 +37,14 @@
public class AnsibleRunnerClient {
private static Logger log = LoggerFactory.getLogger(AnsibleRunnerClient.class);
private ObjectMapper mapper;
private AnsibleRunnerLogger runnerLogger;
private static final int POLL_INTERVAL = 3000;
private AnsibleReturnValue returnValue;

public AnsibleRunnerClient() {
this.mapper = JsonMapper
.builder()
.findAndAddModules()
.build()
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
this.returnValue = new AnsibleReturnValue(AnsibleReturnCode.ERROR);
}

public Boolean playHasEnded(String uuid, int lastEventId) {
Expand All @@ -58,32 +54,26 @@ public Boolean playHasEnded(String uuid, int lastEventId) {
return !msg.equalsIgnoreCase("running") && !(lastEventId < getTotalEvents(uuid));
}

public AnsibleReturnValue artifactHandler(UUID uuid, int lastEventID, int timeout, BiConsumer<String, String> fn)
throws Exception {
public void artifactHandler(AnsibleReturnValue returnValue,
int timeout,
BiConsumer<String, String> fn,
AnsibleRunnerLogger runnerLogger) throws Exception {
int executionTime = 0;
setReturnValue(uuid);
while (!playHasEnded(uuid.toString(), lastEventID)) {
lastEventID = processEvents(uuid.toString(), lastEventID, fn);
if (lastEventID == -1) {
return returnValue;
while (!playHasEnded(returnValue.getPlayUuid(), returnValue.getLastEventId())) {
processEvents(returnValue, fn, runnerLogger);
if (returnValue.getLastEventId() == -1) {
return;
}
executionTime += POLL_INTERVAL / 1000;
if (executionTime > timeout * 60) {
// Cancel playbook, and raise exception in case timeout occur:
cancelPlaybook(uuid, timeout);
cancelPlaybook(returnValue.getPlayUuid(), timeout);
throw new TimeoutException(
"Play execution has reached timeout");
}
Thread.sleep(POLL_INTERVAL);
}
returnValue.setAnsibleReturnCode(AnsibleReturnCode.OK);
return returnValue;
}

public void setReturnValue(UUID uuid) {
returnValue.setPlayUuid(uuid.toString());
returnValue.setLogFile(runnerLogger.getLogFile());
returnValue.setStdout(Paths.get(this.getJobEventsDir(uuid.toString()), "../stdout").toString());
}

public String getEventFileName(String playUuid, int eventId) {
Expand All @@ -105,13 +95,13 @@ public String getJobEventsDir(String playUuid) {
return String.format("%1$s/%2$s/artifacts/%2$s/job_events/", AnsibleConstants.ANSIBLE_RUNNER_PATH, playUuid);
}

public int processEvents(String playUuid,
int lastEventId,
BiConsumer<String, String> fn) {
String jobEvents = getJobEventsDir(playUuid);
public void processEvents(AnsibleReturnValue returnValue,
BiConsumer<String, String> fn,
AnsibleRunnerLogger runnerLogger) {
String jobEvents = getJobEventsDir(returnValue.getPlayUuid());
while(true){
// get next event
String event = getEventFileName(playUuid, lastEventId + 1);
String event = getEventFileName(returnValue.getPlayUuid(), returnValue.getLastEventId() + 1);
if (event == null) {
break;
}
Expand All @@ -122,7 +112,8 @@ public int processEvents(String playUuid,
if (RunnerJsonNode.isEventUnreachable(currentNode)) {
runnerLogger.log(currentNode);
returnValue.setAnsibleReturnCode(AnsibleReturnCode.UNREACHABLE);
return -1;
returnValue.setLastEventId(-1);
break;
}

// might need special attention
Expand All @@ -132,7 +123,7 @@ public int processEvents(String playUuid,
}
}

log.debug("Current node event: {} lastEventId: {} ", currentNode.get("event").textValue(), lastEventId);
log.debug("Current node event: {} lastEventId: {} ", currentNode.get("event").textValue(), returnValue.getLastEventId());
// want to log only these kind of events:
if (RunnerJsonNode.isEventStart(currentNode) || RunnerJsonNode.isEventOk(currentNode)
|| RunnerJsonNode.playbookStats(currentNode) || RunnerJsonNode.isEventFailed(currentNode)) {
Expand Down Expand Up @@ -175,10 +166,8 @@ public int processEvents(String playUuid,
}
}
}
lastEventId = Integer.valueOf(event.split("-")[0]);
returnValue.setLastEventId(lastEventId);
returnValue.setLastEventId(Integer.valueOf(event.split("-")[0]));
}
return lastEventId;
}

private Boolean jsonIsValid(String content) {
Expand All @@ -191,7 +180,7 @@ private Boolean jsonIsValid(String content) {
}
}

public void cancelPlaybook(UUID uuid, int timeout) throws Exception {
public void cancelPlaybook(String uuid, int timeout) throws Exception {
File privateDataDir = new File(String.format("%1$s/%2$s/", AnsibleConstants.ANSIBLE_RUNNER_PATH, uuid));
File output = new File(String.format("%1$s/output.log", privateDataDir));
String command = String.format("ansible-runner stop %1$s", privateDataDir);
Expand All @@ -203,7 +192,7 @@ public void cancelPlaybook(UUID uuid, int timeout) throws Exception {
} catch (IOException e) {
log.error(String.format("Failed to execute call to cancel playbook. %1$s, %2$s../stdout ",
output.toString(),
Paths.get(this.getJobEventsDir(uuid.toString()))));
Paths.get(this.getJobEventsDir(uuid))));
log.debug("Exception: ", e);
return;
}
Expand Down Expand Up @@ -352,14 +341,6 @@ public String getCommandStdout(String eventUrl) {
return RunnerJsonNode.getStdout(taskNode);
}

public void setLogger(AnsibleRunnerLogger runnerLogger) {
this.runnerLogger = runnerLogger;
}

public AnsibleRunnerLogger getLogger() {
return this.runnerLogger;
}

public static class PlaybookStatus {
private String status;
private String msg;
Expand Down

0 comments on commit 244a04d

Please sign in to comment.