Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ansible runner client refactoring #565

Merged
merged 1 commit into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.ovirt.engine.core.bll.tasks.interfaces.CommandCallback;
import org.ovirt.engine.core.common.AuditLogType;
import org.ovirt.engine.core.common.action.AnsibleCommandParameters;
import org.ovirt.engine.core.common.utils.ansible.AnsibleReturnCode;
import org.ovirt.engine.core.common.utils.ansible.AnsibleReturnValue;
import org.ovirt.engine.core.common.utils.ansible.AnsibleRunnerClient;
import org.ovirt.engine.core.common.utils.ansible.AnsibleRunnerLogger;
import org.ovirt.engine.core.compat.CommandStatus;
Expand Down Expand Up @@ -42,8 +44,10 @@ public class AnsibleCallback implements CommandCallback {
public void doPolling(Guid cmdId, List<Guid> childCmdIds) {
CommandBase<AnsibleCommandParameters> command = getCommand(cmdId);
String playUuid = command.getParameters().getPlayUuid();
AnsibleReturnValue ret = new AnsibleReturnValue(AnsibleReturnCode.ERROR);
ret.setPlayUuid(playUuid);
ret.setLastEventId(command.getParameters().getLastEventId());
StringBuilder stdout = command.getParameters().getStringBuilder();
runnerClient.setLogger(new AnsibleRunnerLogger(command.getParameters().getLogFile()));
BiConsumer<String, String> fn = (String taskName, String eventUrl) -> {
AuditLogable logable = createAuditLogable(command, taskName);
auditLogDirector.log(logable, AuditLogType.ANSIBLE_RUNNER_EVENT_NOTIFICATION);
Expand All @@ -64,11 +68,11 @@ public void doPolling(Guid cmdId, List<Guid> childCmdIds) {
// Process the events if the playbook is running:
totalEvents = runnerClient.getTotalEvents(playUuid);

log.debug("LastEventId: {} totalEvents: {} playbookStatus: {}", command.getParameters().getLastEventId(), totalEvents, playbookStatus);
log.debug("LastEventId: {} totalEvents: {} playbookStatus: {}", ret.getLastEventId(), totalEvents, playbookStatus);
if (msg.equalsIgnoreCase("running") || msg.equalsIgnoreCase("successful")
&& command.getParameters().getLastEventId() < totalEvents) {
command.getParameters().setLastEventId(runnerClient.processEvents(
playUuid, command.getParameters().getLastEventId(), fn));
runnerClient.processEvents(ret , fn, new AnsibleRunnerLogger(command.getParameters().getLogFile()));
command.getParameters().setLastEventId(ret.getLastEventId());
return;
} else if (msg.equalsIgnoreCase("successful")) {
log.info("Playbook (Play uuid = {}, command = {}) has completed!",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package org.ovirt.engine.core.common.utils.ansible;

import javax.inject.Inject;
import javax.inject.Singleton;

@Singleton
public class AnsibleClientFactory {

@Inject
private AnsibleCommandLogFileFactory ansibleCommandLogFileFactory;

public AnsibleRunnerClient create(AnsibleCommandConfig command) {
AnsibleRunnerLogger runnerLogger = ansibleCommandLogFileFactory.create(command);
AnsibleRunnerClient client = new AnsibleRunnerClient();
client.setLogger(runnerLogger);
return client;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

/*
* Copyright oVirt Authors
* SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -40,6 +41,9 @@ public class AnsibleExecutor {
@Inject
private AnsibleClientFactory ansibleClientFactory;

@Inject
private AnsibleCommandLogFileFactory ansibleCommandLogFileFactory;

/**
* Executes ansible-playbook command. Default timeout is specified by ANSIBLE_PLAYBOOK_EXEC_DEFAULT_TIMEOUT variable
* in engine.conf.
Expand Down Expand Up @@ -150,10 +154,12 @@ public AnsibleReturnValue runCommand(AnsibleCommandConfig commandConfig, int tim

String playUuid = null;
AnsibleRunnerClient runnerClient = null;
AnsibleRunnerLogger runnerLogger = null;
try {
runnerClient = ansibleClientFactory.create(commandConfig);
runnerLogger = ansibleCommandLogFileFactory.create(commandConfig);
ret.setLogFile(runnerLogger.getLogFile());
playUuid = commandConfig.getUuid().toString();
ret.setLogFile(runnerClient.getLogger().getLogFile());
ret.setPlayUuid(playUuid);
ret.setStdout(String.format("%1$s/%2$s/artifacts/%2$s/stdout", AnsibleConstants.ANSIBLE_RUNNER_PATH, playUuid));
ret.setLastEventId(0);
Expand All @@ -167,7 +173,7 @@ public AnsibleReturnValue runCommand(AnsibleCommandConfig commandConfig, int tim
return ret;
}

ret = runnerClient.artifactHandler(commandConfig.getUuid(), ret.getLastEventId(), timeout, fn);
runnerClient.artifactHandler(ret, timeout, fn, runnerLogger);
} catch (InventoryException ex) {
String message = ex.getMessage();
log.error("Error executing playbook: {}", message);
Expand All @@ -180,8 +186,8 @@ public AnsibleReturnValue runCommand(AnsibleCommandConfig commandConfig, int tim
ret.setStderr(ex.getMessage());
} finally {
// Make sure all events are proccessed even in case of failure:
if (playUuid != null && runnerClient != null && !async) {
runnerClient.processEvents(playUuid, ret.getLastEventId(), fn);
if (playUuid != null && runnerClient != null && runnerLogger != null && !async) {
runnerClient.processEvents(ret, fn, runnerLogger);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public AnsibleReturnValue(AnsibleReturnCode ansibleReturnCode) {
public AnsibleReturnValue(AnsibleReturnCode ansibleReturnCode, String stdout) {
this.ansibleReturnCode = ansibleReturnCode;
this.stdout = stdout;
this.lastEventId = 0;
}

public AnsibleReturnCode getAnsibleReturnCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
Expand All @@ -38,17 +37,14 @@
public class AnsibleRunnerClient {
private static Logger log = LoggerFactory.getLogger(AnsibleRunnerClient.class);
private ObjectMapper mapper;
private AnsibleRunnerLogger runnerLogger;
private static final int POLL_INTERVAL = 3000;
private AnsibleReturnValue returnValue;

public AnsibleRunnerClient() {
this.mapper = JsonMapper
.builder()
.findAndAddModules()
.build()
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
this.returnValue = new AnsibleReturnValue(AnsibleReturnCode.ERROR);
}

public Boolean playHasEnded(String uuid, int lastEventId) {
Expand All @@ -58,32 +54,26 @@ public Boolean playHasEnded(String uuid, int lastEventId) {
return !msg.equalsIgnoreCase("running") && !(lastEventId < getTotalEvents(uuid));
}

public AnsibleReturnValue artifactHandler(UUID uuid, int lastEventID, int timeout, BiConsumer<String, String> fn)
throws Exception {
public void artifactHandler(AnsibleReturnValue returnValue,
int timeout,
BiConsumer<String, String> fn,
AnsibleRunnerLogger runnerLogger) throws Exception {
int executionTime = 0;
setReturnValue(uuid);
while (!playHasEnded(uuid.toString(), lastEventID)) {
lastEventID = processEvents(uuid.toString(), lastEventID, fn);
if (lastEventID == -1) {
return returnValue;
while (!playHasEnded(returnValue.getPlayUuid(), returnValue.getLastEventId())) {
processEvents(returnValue, fn, runnerLogger);
if (returnValue.getLastEventId() == -1) {
return;
}
executionTime += POLL_INTERVAL / 1000;
if (executionTime > timeout * 60) {
// Cancel playbook, and raise exception in case timeout occur:
cancelPlaybook(uuid, timeout);
cancelPlaybook(returnValue.getPlayUuid(), timeout);
throw new TimeoutException(
"Play execution has reached timeout");
}
Thread.sleep(POLL_INTERVAL);
}
returnValue.setAnsibleReturnCode(AnsibleReturnCode.OK);
return returnValue;
}

public void setReturnValue(UUID uuid) {
returnValue.setPlayUuid(uuid.toString());
returnValue.setLogFile(runnerLogger.getLogFile());
returnValue.setStdout(Paths.get(this.getJobEventsDir(uuid.toString()), "../stdout").toString());
}

public String getEventFileName(String playUuid, int eventId) {
Expand All @@ -105,13 +95,13 @@ public String getJobEventsDir(String playUuid) {
return String.format("%1$s/%2$s/artifacts/%2$s/job_events/", AnsibleConstants.ANSIBLE_RUNNER_PATH, playUuid);
}

public int processEvents(String playUuid,
int lastEventId,
BiConsumer<String, String> fn) {
String jobEvents = getJobEventsDir(playUuid);
public void processEvents(AnsibleReturnValue returnValue,
BiConsumer<String, String> fn,
AnsibleRunnerLogger runnerLogger) {
String jobEvents = getJobEventsDir(returnValue.getPlayUuid());
while(true){
// get next event
String event = getEventFileName(playUuid, lastEventId + 1);
String event = getEventFileName(returnValue.getPlayUuid(), returnValue.getLastEventId() + 1);
if (event == null) {
break;
}
Expand All @@ -122,7 +112,8 @@ public int processEvents(String playUuid,
if (RunnerJsonNode.isEventUnreachable(currentNode)) {
runnerLogger.log(currentNode);
returnValue.setAnsibleReturnCode(AnsibleReturnCode.UNREACHABLE);
return -1;
returnValue.setLastEventId(-1);
break;
}

// might need special attention
Expand All @@ -132,7 +123,7 @@ public int processEvents(String playUuid,
}
}

log.debug("Current node event: {} lastEventId: {} ", currentNode.get("event").textValue(), lastEventId);
log.debug("Current node event: {} lastEventId: {} ", currentNode.get("event").textValue(), returnValue.getLastEventId());
// want to log only these kind of events:
if (RunnerJsonNode.isEventStart(currentNode) || RunnerJsonNode.isEventOk(currentNode)
|| RunnerJsonNode.playbookStats(currentNode) || RunnerJsonNode.isEventFailed(currentNode)) {
Expand Down Expand Up @@ -175,10 +166,8 @@ public int processEvents(String playUuid,
}
}
}
lastEventId = Integer.valueOf(event.split("-")[0]);
returnValue.setLastEventId(lastEventId);
returnValue.setLastEventId(Integer.valueOf(event.split("-")[0]));
}
return lastEventId;
}

private Boolean jsonIsValid(String content) {
Expand All @@ -191,7 +180,7 @@ private Boolean jsonIsValid(String content) {
}
}

public void cancelPlaybook(UUID uuid, int timeout) throws Exception {
public void cancelPlaybook(String uuid, int timeout) throws Exception {
File privateDataDir = new File(String.format("%1$s/%2$s/", AnsibleConstants.ANSIBLE_RUNNER_PATH, uuid));
File output = new File(String.format("%1$s/output.log", privateDataDir));
String command = String.format("ansible-runner stop %1$s", privateDataDir);
Expand All @@ -203,7 +192,7 @@ public void cancelPlaybook(UUID uuid, int timeout) throws Exception {
} catch (IOException e) {
log.error(String.format("Failed to execute call to cancel playbook. %1$s, %2$s../stdout ",
output.toString(),
Paths.get(this.getJobEventsDir(uuid.toString()))));
Paths.get(this.getJobEventsDir(uuid))));
log.debug("Exception: ", e);
return;
}
Expand Down Expand Up @@ -352,14 +341,6 @@ public String getCommandStdout(String eventUrl) {
return RunnerJsonNode.getStdout(taskNode);
}

public void setLogger(AnsibleRunnerLogger runnerLogger) {
this.runnerLogger = runnerLogger;
}

public AnsibleRunnerLogger getLogger() {
return this.runnerLogger;
}

public static class PlaybookStatus {
private String status;
private String msg;
Expand Down