Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport minor enhancements #1751

Merged
merged 3 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ subprojects {
testCompile 'org.hamcrest:hamcrest-library:1.3'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile 'pl.pragmatists:JUnitParams:1.0.5'
testImplementation 'org.powermock:powermock-api-mockito:1.6.6'
testImplementation 'org.powermock:powermock-module-junit4:1.6.6'
}

ext {
Expand Down
2 changes: 1 addition & 1 deletion digdag-cli/src/main/java/io/digdag/cli/Run.java
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ public void run(TaskRequest request)
{
String fullName = request.getTaskName();
TaskResult result = cmd.skipTaskReports.apply(fullName);
String origThreadName = String.format("[%d:%s]%s", request.getSiteId(), request.getProjectName().or("----"), request.getTaskName());
String origThreadName = String.format("[%d:%s:%d:%d]%s", request.getSiteId(), request.getProjectName().or("----"), request.getSessionId(), request.getAttemptId(), request.getTaskName());
if (result != null) {
try (SetThreadName threadName = new SetThreadName(origThreadName)) {
logger.warn("Skipped");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void shutdown()
public void run(TaskRequest request)
{
long taskId = request.getTaskId();
String origThreadName = String.format("[%d:%s]%s", request.getSiteId(), request.getProjectName().or("----"), request.getTaskName());
String origThreadName = String.format("[%d:%s:%d:%d]%s", request.getSiteId(), request.getProjectName().or("----"), request.getSessionId(), request.getAttemptId(), request.getTaskName());

// set task name to thread name so that logger shows it
try (SetThreadName threadName = new SetThreadName(origThreadName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import io.digdag.client.config.Config;
import io.digdag.spi.OperatorContext;
import io.digdag.spi.TaskExecutionException;
import io.digdag.spi.TaskRequest;
import io.digdag.spi.TaskResult;
import io.digdag.standards.operator.DurationInterval;
import io.digdag.standards.operator.state.TaskState;
import io.digdag.standards.operator.td.TDOperator.JobState;
import io.digdag.standards.operator.td.TDOperator.SystemDefaultConfig;
import io.digdag.util.BaseOperator;
import org.slf4j.Logger;
Expand Down Expand Up @@ -59,6 +61,19 @@ public TaskResult runTask()
}
}

@Override
public void cleanup(TaskRequest request)
{
JobState job = state.params().get("job", JobState.class, JobState.empty());;
Optional<String> jobId = job.jobId();
if (jobId.isPresent()) {
logger.debug("cleanup is called: attempt_id={}, task_id={}, job_id={}", request.getAttemptId(), request.getTaskId(), jobId.get());
try (TDOperator op = TDOperator.fromConfig(clientFactory, systemDefaultConfig, env, params, context.getSecrets().getSecrets("td"))) { // TDClientException
op.client.killJob(jobId.get());
}
}
}

public TaskResult runTask(TDOperator op)
{
Optional<String> doneJobId = state.params().getOptional(DONE_JOB_ID, String.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.digdag.standards.operator.td;

import com.google.common.base.Optional;
import com.treasuredata.client.TDClient;
import com.treasuredata.client.model.TDJobRequest;
import io.digdag.client.config.Config;
import io.digdag.spi.TaskRequest;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.nio.file.Path;
import java.nio.file.Paths;

import static io.digdag.client.config.ConfigUtils.newConfig;
import static io.digdag.core.workflow.OperatorTestingUtils.newContext;
import static io.digdag.core.workflow.OperatorTestingUtils.newTaskRequest;
import static io.digdag.standards.operator.td.TdOperatorTestingUtils.newOperatorFactory;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.times;
import static org.powermock.api.support.membermodification.MemberMatcher.method;
import static org.powermock.api.support.membermodification.MemberModifier.stub;

@RunWith(PowerMockRunner.class)
@PrepareForTest(TDOperator.class)
public class TdBaseTdJobOperatorTest
{
@Mock
TDClient client;

@Mock
TDOperator op;

@Mock
TaskRequest taskRequest;

@Rule
public final ExpectedException exception = ExpectedException.none();

@Before
public void setUp()
{
op.client = client;
when(taskRequest.getProjectId()).thenReturn(2);
when(taskRequest.getProjectName()).thenReturn(Optional.absent());
when(taskRequest.getSessionId()).thenReturn((long) 5);
when(taskRequest.getAttemptId()).thenReturn((long) 4);
when(taskRequest.getWorkflowName()).thenReturn("wf");
when(taskRequest.getTaskName()).thenReturn("t");
}

@Test
public void testCleanup() {
Path projectPath = Paths.get("").normalize().toAbsolutePath();
String stmt = "select 1";
Config config = newConfig()
.set("database", "testdb")
.set("query", stmt)
.set("engine", "presto");

stub(method(TDOperator.class, "fromConfig", BaseTDClientFactory.class, TDOperator.SystemDefaultConfig.class, java.util.Map.class, Config.class, io.digdag.spi.SecretProvider.class )).toReturn(op);
BaseTdJobOperator jobOp = (BaseTdJobOperator) newOperatorFactory(TdOperatorFactory.class)
.newOperator(newContext(projectPath, newTaskRequest().withConfig(config)));
jobOp.state.params().set("job", TDOperator.JobState.empty().withJobId("1"));

jobOp.cleanup(newTaskRequest().withConfig(newConfig().set("job", testTDJobRequestParams(projectPath, config))));

verify(client, times(1)).killJob("1");
}


private TDJobRequest testTDJobRequestParams(Path projectPath, Config config)
{
when(op.submitNewJobWithRetry(any(TDJobRequest.class))).thenReturn("");
ArgumentCaptor<TDJobRequest> captor = ArgumentCaptor.forClass(TDJobRequest.class);
BaseTdJobOperator operator =
(BaseTdJobOperator)newOperatorFactory(TdOperatorFactory.class)
.newOperator(newContext(projectPath, newTaskRequest().withConfig(config)));

operator.startJob(op, "");

Mockito.verify(op).submitNewJobWithRetry(captor.capture());
TDJobRequest jobRequest = captor.getValue();
return jobRequest;

}
}
95 changes: 95 additions & 0 deletions digdag-tests/src/test/java/acceptance/LogIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package acceptance;

import io.digdag.client.DigdagClient;
import io.digdag.client.api.Id;
import io.digdag.client.api.RestSessionAttempt;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import utils.CommandStatus;
import utils.TemporaryDigdagServer;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.regex.Pattern;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertTrue;
import static utils.TestUtils.*;


public class LogIT {
@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Rule
public TemporaryDigdagServer server = TemporaryDigdagServer.of();

private Path config;
private Path projectDir;
private DigdagClient client;

@Before
public void setUp()
throws Exception {
projectDir = folder.getRoot().toPath().resolve("foobar");
Files.createDirectories(projectDir);
config = folder.newFile().toPath();
client = DigdagClient.builder()
.host(server.host())
.port(server.port())
.build();
}

@Test
public void verifyLogWithAttemptIdAndSessionId()
throws Exception {
// Create new project
CommandStatus initStatus = main("init",
"-c", config.toString(),
projectDir.toString());
assertThat(initStatus.errUtf8(), initStatus.code(), is(0));

copyResource("acceptance/basic.dig", projectDir.resolve("basic.dig"));

// Push the project
CommandStatus pushStatus = main("push",
"--project", projectDir.toString(),
"foobar",
"-c", config.toString(),
"-e", server.endpoint());
assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0));

// Start the workflow
Id attemptId;
{
CommandStatus startStatus = main("start",
"-c", config.toString(),
"-e", server.endpoint(),
"foobar", "foobar",
"--session", "now");
assertThat(startStatus.code(), is(0));
attemptId = getAttemptId(startStatus);
}

// Wait for the attempt to complete
{
RestSessionAttempt attempt = null;
for (int i = 0; i < 30; i++) {
attempt = client.getSessionAttempt(attemptId);
if (attempt.getDone()) {
break;
}
Thread.sleep(1000);
}
assertThat(attempt.getSuccess(), is(true));
}

final String logs = getAttemptLogs(client, attemptId);

final String regex = "\\[\\d+:\\w+:\\d+:\\d+]";
assertTrue(Pattern.compile(regex, Pattern.DOTALL).matcher(logs).find());
}
}