Skip to content

Commit

Permalink
Merge pull request #1720 from treasure-data/backport-api-workflows
Browse files Browse the repository at this point in the history
Backport  enhancements on /api/workflows
  • Loading branch information
yoyama authored Apr 15, 2022
2 parents a1d842c + 039b90e commit 9687645
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ public StoredRevision getRevisionOfWorkflowDefinition(long wfId)
"revision of workflow definition id=%s", wfId);
}

private static String makeLastIdCond(Optional<Long> lastId, boolean ascending)
{
String signIneq = ascending ? "\\>" : "\\<";
Long lastIdValue = lastId.or( () -> ascending ? 0L : Long.MAX_VALUE);
return String.format("%s %d", signIneq, lastIdValue);
}



private class DatabaseProjectStore
implements ProjectStore
{
Expand Down Expand Up @@ -294,15 +303,21 @@ public StoredWorkflowDefinitionWithProject getLatestWorkflowDefinitionByName(int
public List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(
int pageSize,
Optional<Long> lastId,
boolean ascending,
Optional<String> namePattern,
boolean searchProjectName,
AccessController.ListFilter acFilter)
throws ResourceNotFoundException
{
String projectNamePattern = searchProjectName ? generatePartialMatchPattern(namePattern) : "";
String ascDesc = ascending ? "asc" : "desc";
return autoCommit((handle, dao) -> dao.getLatestActiveWorkflowDefinitions(
siteId,
pageSize,
lastId.or(0L),
makeLastIdCond(lastId, ascending),
generatePartialMatchPattern(namePattern),
projectNamePattern,
ascDesc,
acFilter.getSql())
);
}
Expand Down Expand Up @@ -617,21 +632,23 @@ List<StoredProjectWithRevision> getProjectsWithLatestRevision(
" join revisions rev on a.revision_id = rev.id" +
" join projects proj on a.project_id = proj.id" +
" join workflow_configs wc on wc.id = wd.config_id" +
" where wd.id \\> :lastId" +
" where wd.id <lastIdCond>" +
// `workflow_definitions` table has a composite index
// for `revision_id` and `name` (`workflow_definitions_on_revision_id_and_name`).
// And the index is used for filter by `revision_id` and `name`.
// Since this query always limits the records by `revision_id` (the latest revision's one),
// partial matching of `name` (e.g. '%test%') can be accepted.
" and wd.name like :namePattern" +
" and ( wd.name like :namePattern or proj.name like :projectNamePattern )" +
" and <acFilter>" +
" order by wd.id" +
" order by wd.id <orderDirection>" +
" limit :limit")
List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(
@Bind("siteId") int siteId,
@Bind("limit") int limit,
@Bind("lastId") long lastId,
@Define("lastIdCond") String lastIdCond,
@Bind("namePattern") String namePattern,
@Bind("projectNamePattern") String projectNamePattern,
@Define("orderDirection") String orderDirection,
@Define("acFilter") String acFilter);
}

Expand Down Expand Up @@ -688,26 +705,28 @@ List<StoredProjectWithRevision> getProjectsWithLatestRevision(
" and p.deleted_at is null" +
" group by r.project_id" +
" )) " +
" and wf.id \\> :lastId" +
" and wf.id <lastIdCond>" +
// `workflow_definitions` table has a composite index
// for `revision_id` and `name` (`workflow_definitions_on_revision_id_and_name`).
// And the index is used for filter by `revision_id` and `name`.
// Since this query always limits the records by `revision_id` (the latest revision's one),
// partial matching of `name` (e.g. '%test%') can be accepted.
" and wf.name like :namePattern" +
" and ( wf.name like :namePattern or proj.name like :projectNamePattern )" +
" and <acFilter>" +
" order by wf.id" +
" order by wf.id <orderDirection>" +
" limit :limit" +
") wd" +
" join revisions r on r.id = wd.revision_id" +
" join projects p on p.id = r.project_id" +
" join workflow_configs wc on wc.id = wd.config_id" +
" order by wd.id")
" order by wd.id <orderDirection>")
List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(
@Bind("siteId") int siteId,
@Bind("limit") int limit,
@Bind("lastId") long lastId,
@Define("lastIdCond") String lastIdCond,
@Bind("namePattern") String namePattern,
@Bind("projectNamePattern") String projectNamePattern,
@Define("orderDirection") String orderDirection,
@Define("acFilter") String acFilter);
}

Expand Down Expand Up @@ -820,11 +839,27 @@ List<StoredProjectWithRevision> getProjectsWithLatestRevision(
" limit 1")
StoredWorkflowDefinitionWithProject getLatestWorkflowDefinitionByName(@Bind("siteId") int siteId, @Bind("projId") int projId, @Bind("name") String name);

List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(int siteId, int limit, long lastId, String namePattern, String acFilter);
default List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(int siteId, int limit, long lastId, String namePattern, String acFilter)
{
// projects.name must be non-empty or null(for deleted projects). So empty string "" will never match.
return getLatestActiveWorkflowDefinitions(siteId, limit, makeLastIdCond(Optional.of(lastId), true), namePattern, "", "asc", acFilter);
}

/**
*
* @param siteId Target site_id
* @param limit Number of workflows to be returned
* @param lastIdCond Pagination based on workflow id. Must {@literal "> n" or "< n}"
* @param namePattern Search by workflow name with partial match. namePattern and projectNamePattern is "OR" search.
* @param projectNamePattern Search by project name with partial match. namePattern and projectNamePattern is "OR" search.
* @param orderDirection Order based on workflow id. "asc" or "desc". The parameter must be validated before calling to avoid SQL injection.
* @param acFilter AccessControl filter clause. The parameter must be validated before calling to avoid SQL injection.
* @return
*/
List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(int siteId, int limit, String lastIdCond, String namePattern, String projectNamePattern, String orderDirection, String acFilter);

// getWorkflowDetailsById is same with getWorkflowDetailsByIdInternal
// excepting site_id check

@SqlQuery("select wd.*, wc.config, wc.timezone," +
" proj.id as proj_id, proj.name as proj_name, proj.deleted_name as proj_deleted_name, proj.deleted_at as proj_deleted_at, proj.site_id, proj.created_at as proj_created_at," +
" rev.name as rev_name, rev.default_params as rev_default_params" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,18 @@ StoredWorkflowDefinitionWithProject getWorkflowDefinitionById(long wfId)
StoredWorkflowDefinitionWithProject getLatestWorkflowDefinitionByName(int projId, String name)
throws ResourceNotFoundException;

List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(int pageSize, Optional<Long> lastId, Optional<String> namePattern, AccessController.ListFilter acFilter)
// For backward compatibility
default List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(
int pageSize, Optional<Long> lastId, Optional<String> namePattern, AccessController.ListFilter acFilter)
throws ResourceNotFoundException
{
// ascending is true and searchProjectName is disabled.
return getLatestActiveWorkflowDefinitions(pageSize, lastId, true, namePattern, false, acFilter);
}

List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(
int pageSize, Optional<Long> lastId, boolean ascending, Optional<String> namePattern,
boolean searchProjectName, AccessController.ListFilter acFilter)
throws ResourceNotFoundException;

TimeZoneMap getWorkflowTimeZonesByIdList(List<Long> defIdList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,12 @@ public void testGetActiveWorkflows()
store.getLatestActiveWorkflowDefinitions(100, Optional.absent(), Optional.fromNullable("%_"), () -> "true"));
assertEquals(ImmutableList.of(),
store.getLatestActiveWorkflowDefinitions(100, Optional.absent(), Optional.fromNullable("*"), () -> "true"));
// Check ascending parameter.
assertEquals(ImmutableList.of(wfDetails6, wfDetails5, wfDetails4, wfDetails3, wfDetails2, wfDetails1),
store.getLatestActiveWorkflowDefinitions(100, Optional.absent(), false, Optional.absent(), false, () -> "true"));
// Check searchProjectName parameter.
assertEquals(ImmutableList.of(wfDetails6, wfDetails5, wfDetails4, wfDetails3, wfDetails2, wfDetails1),
store.getLatestActiveWorkflowDefinitions(100, Optional.absent(), false, Optional.fromNullable("proj1"), true, () -> "true"));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;

import javax.ws.rs.DefaultValue;
import javax.ws.rs.Produces;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand Down Expand Up @@ -110,23 +111,30 @@ public RestWorkflowDefinition getWorkflowDefinition(
@Path("/api/workflows")
@ApiOperation("List workflows")
public RestWorkflowDefinitionCollection getWorkflowDefinitions(
@ApiParam(value="list workflows whose id is grater than this id for pagination", required=false)
@ApiParam(value="pagination. return workflows which id are greater than last_id with order 'asc', which id are less than the last_id with order 'desc'", required=false)
@QueryParam("last_id") Long lastId,
@ApiParam(value="number of workflows to return", required=false)
@QueryParam("count") Integer count,
@ApiParam(value="Sort order. 'asc' or 'desc'", defaultValue = "asc", required=false)
@DefaultValue("asc") @QueryParam("order") String orderDirection,
@ApiParam(value="name pattern to be partially matched", required=false)
@QueryParam("name_pattern") String namePattern
@QueryParam("name_pattern") String namePattern,
@ApiParam(value="name pattern to be partially matched", required=false)
@DefaultValue("false") @QueryParam("search_project_name") Boolean searchProjectName
)
throws ResourceNotFoundException, AccessControlException
{
final SiteTarget siteTarget = SiteTarget.of(getSiteId());
ac.checkListWorkflowsOfSite(siteTarget, getAuthenticatedUser()); // AccessControl

return tm.<RestWorkflowDefinitionCollection, ResourceNotFoundException, AccessControlException>begin(() -> {
List<StoredWorkflowDefinitionWithProject> defs =
rm.getProjectStore(getSiteId())
.getLatestActiveWorkflowDefinitions(Optional.fromNullable(count).or(100), Optional.fromNullable(lastId), // check NotFound first
.getLatestActiveWorkflowDefinitions(
Optional.fromNullable(count).or(100),
Optional.fromNullable(lastId), // check NotFound first
orderAscending(orderDirection),
Optional.fromNullable(namePattern),
searchProjectName,
ac.getListWorkflowsFilterOfSite(
SiteTarget.of(getSiteId()),
getAuthenticatedUser()));
Expand All @@ -135,6 +143,19 @@ public RestWorkflowDefinitionCollection getWorkflowDefinitions(
}, ResourceNotFoundException.class, AccessControlException.class);
}

private boolean orderAscending(String orderDirection)
{
if (orderDirection == null || orderDirection.equals("asc")) {
return true;
}
else if (orderDirection.equals("desc")) {
return false;
}
else {
throw new IllegalArgumentException("parameter 'order' must be either 'asc' or 'desc'");
}
}

@DigdagTimed(category = "api", value = "getWorkflowDefinitionById")
@GET
@Path("/api/workflows/{id}")
Expand Down
143 changes: 143 additions & 0 deletions digdag-tests/src/test/java/acceptance/ApiWorkflowsIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package acceptance;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.guava.GuavaModule;
import io.digdag.client.api.JacksonTimeModule;
import io.digdag.client.api.RestWorkflowDefinitionCollection;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import utils.CommandStatus;
import utils.TemporaryDigdagServer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static utils.TestUtils.copyResource;
import static utils.TestUtils.main;

public class ApiWorkflowsIT
{
@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Rule
public TemporaryDigdagServer server = TemporaryDigdagServer.of();

private Path config;
private Path projectDir;
private ObjectMapper objectMapper;

@Before
public void setUp()
throws Exception
{
objectMapper = new ObjectMapper()
.registerModule(new GuavaModule())
.registerModule(new JacksonTimeModule())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// To deserialize Config class, need ObjectMapper.
objectMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, objectMapper));

projectDir = folder.getRoot().toPath().resolve("foobar");
Files.createDirectory(projectDir);
config = folder.newFile().toPath();

copyResource("acceptance/basic.dig", projectDir.resolve("wf1abc.dig"));
copyResource("acceptance/basic.dig", projectDir.resolve("wf2def.dig"));

// Push first project. (named: prj1foo)
{
CommandStatus pushStatus = main(
"push",
"--project", projectDir.toString(),
"prj1foo",
"-c", config.toString(),
"-e", server.endpoint());
assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0));
}

// Push second project. (named: prj2bar)
{
CommandStatus pushStatus = main(
"push",
"--project", projectDir.toString(),
"prj2bar",
"-c", config.toString(),
"-e", server.endpoint());
assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0));
}
}

@Test
public void showWorkflows()
throws Exception
{
{ // No params
RestWorkflowDefinitionCollection restWorkflows = callGetWorkflows("");
assertThat("Num of workflows", restWorkflows.getWorkflows().size(), is(4));
String ids = restWorkflows.getWorkflows().stream().map((n) -> {
return n.getId().get();
}).collect(Collectors.joining(","));
assertThat("List of workflow ids", ids, is("1,2,3,4"));
}
{ // order=desc count=2
RestWorkflowDefinitionCollection restWorkflows = callGetWorkflows("?order=desc&count=2");
assertThat("Num of workflows", restWorkflows.getWorkflows().size(), is(2));
String ids = restWorkflows.getWorkflows().stream().map((n) -> {
return n.getId().get();
}).collect(Collectors.joining(","));
assertThat("List of workflow ids", ids, is("4,3"));
}
{ // order=desc last_id=3
RestWorkflowDefinitionCollection restWorkflows = callGetWorkflows("?order=desc&last_id=3");
assertThat("Num of workflows", restWorkflows.getWorkflows().size(), is(2));
String ids = restWorkflows.getWorkflows().stream().map((n) -> {
return n.getId().get();
}).collect(Collectors.joining(","));
assertThat("List of workflow ids", ids, is("2,1"));
}
{ // order=desc count=2 name_pattern=f1a
RestWorkflowDefinitionCollection restWorkflows = callGetWorkflows("?order=desc&count=2&name_pattern=f1a");
assertThat("Num of workflows", restWorkflows.getWorkflows().size(), is(2));
String ids = restWorkflows.getWorkflows().stream().map((n) -> {
return n.getId().toString() + "-" + n.getName();
}).collect(Collectors.joining(","));
assertThat("List of workflows", ids, is("3-wf1abc,1-wf1abc"));
}

{ // order=desc count=2 name_pattern=f1a search_project_name=true
RestWorkflowDefinitionCollection restWorkflows = callGetWorkflows("?order=desc&count=2&name_pattern=prj2&search_project_name=true");
assertThat("Num of workflows", restWorkflows.getWorkflows().size(), is(2));
String ids = restWorkflows.getWorkflows().stream().map((n) -> {
return n.getId().toString() + "-" + n.getProject().getName() + "-" + n.getName();
}).collect(Collectors.joining(","));
assertThat("List of workflows", ids, is("4-prj2bar-wf2def,3-prj2bar-wf1abc"));
}
}

private RestWorkflowDefinitionCollection callGetWorkflows(String queryParams)
throws IOException
{
OkHttpClient client = new OkHttpClient();

Response response = client.newCall(new Request.Builder()
.url(server.endpoint() + "/api/workflows" + queryParams)
.build()).execute();
assertThat("Response failed.", response.isSuccessful(), is(true));
JsonNode json = objectMapper.readTree(response.body().string());
RestWorkflowDefinitionCollection restWorkflows = objectMapper.treeToValue(json, RestWorkflowDefinitionCollection.class);
return restWorkflows;
}
}

0 comments on commit 9687645

Please sign in to comment.