diff --git a/.circleci/config.yml b/.circleci/config.yml index cd3c922446..731e2578c7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -18,7 +18,7 @@ jobs: build-api: working_directory: ~/marquez machine: - image: ubuntu-2004:202010-01 + image: ubuntu-2004:current environment: TESTCONTAINERS_RYUK_DISABLED: true steps: @@ -44,7 +44,8 @@ jobs: build-image-api: working_directory: ~/marquez - machine: true + machine: + image: ubuntu-2004:current steps: - checkout - run: docker build --no-cache --tag "marquezproject/marquez:${CIRCLE_SHA1}" . @@ -54,7 +55,8 @@ jobs: build-image-web: working_directory: ~/marquez/web - machine: true + machine: + image: ubuntu-2004:current steps: - *checkout_project_root - run: docker build --no-cache --tag "marquezproject/marquez-web:${CIRCLE_SHA1}" . @@ -85,7 +87,7 @@ jobs: build-client-java: working_directory: ~/marquez machine: - image: ubuntu-2004:202010-01 + image: ubuntu-2004:current steps: - checkout - restore_cache: @@ -142,7 +144,7 @@ jobs: release-java: working_directory: ~/marquez machine: - image: ubuntu-2004:202010-01 + image: ubuntu-2004:current steps: - checkout - run: ./.circleci/get-jdk17.sh @@ -165,7 +167,8 @@ jobs: release-docker: working_directory: ~/marquez - machine: true + machine: + image: ubuntu-2004:current steps: - checkout - run: ./docker/login.sh diff --git a/.github/workflows/test-chart.yaml b/.github/workflows/test-chart.yaml index 63860d7f72..36584d84ee 100644 --- a/.github/workflows/test-chart.yaml +++ b/.github/workflows/test-chart.yaml @@ -16,7 +16,7 @@ jobs: fetch-depth: 0 - name: Setup Helm - uses: azure/setup-helm@v2.0 + uses: azure/setup-helm@v2.1 - name: Setup Python uses: actions/setup-python@v3 diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f48f9dd8e..cf87226fa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ * Add support for `LifecycleStateChangeFacet` with an ability to softly delete datasets [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Enable pod specific annotations in Marquez Helm Chart via `marquez.podAnnotations` [@wslulciuc](https://github.com/wslulciuc) +* Add support for job renaming/redirection via symlink [@collado-mike](https://github.com/collado-mike) + +### Changed + +* Upgrade Flyway to v7.6.0 [@dakshin-k](https://github.com/dakshin-k) ## [0.21.0](https://github.com/MarquezProject/marquez/compare/0.20.0...0.21.0) - 2022-03-03 diff --git a/RELEASING.md b/RELEASING.md index 6ba83297b5..53c56f2352 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -18,4 +18,16 @@ 6. Before closing the project board for the _current_ release, move any open issues to the project board created in **step 5** 7. Draft a [new release](https://github.com/MarquezProject/marquez/releases/new) using the release notes for `X.Y.Z` in **step 1** as the release description: - ![](./docs/assets/images/new-release.png) \ No newline at end of file + ![](./docs/assets/images/new-release.png) + +# Voting on Releases + +Anyone may request a new release of the project in the #general Slack channel. + +After one is proposed, committers have 48 hours to give a +1 or -1. + +A total of three +1s, taking into account -1s and excluding votes by the proposer, authorize the release. + +Alternatively, if after 2 days the release has received at least one +1 and no -1s, the release is also authorized. + +If the proposed release receives no +1s in two days, it is not authorized and the proposer must make a new request to reset the clock. \ No newline at end of file diff --git a/api/build.gradle b/api/build.gradle index a2f72aba60..03e8747158 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -23,8 +23,8 @@ plugins { ext { jdbi3Version = '3.28.0' prometheusVersion = '0.15.0' - testcontainersVersion = '1.16.3' - sentryVersion = '5.7.2' + testcontainersVersion = '1.17.1' + sentryVersion = '5.7.3' } dependencies { @@ -43,9 +43,9 @@ dependencies { implementation "org.jdbi:jdbi3-postgres:${jdbi3Version}" implementation "org.jdbi:jdbi3-sqlobject:${jdbi3Version}" implementation 'com.google.guava:guava:31.1-jre' - implementation 'org.dhatim:dropwizard-sentry:2.0.28-10' + implementation 'org.dhatim:dropwizard-sentry:2.0.29' implementation "io.sentry:sentry:${sentryVersion}" - implementation 'org.flywaydb:flyway-core:6.5.7' + implementation 'org.flywaydb:flyway-core:8.5.10' implementation "org.postgresql:postgresql:${postgresqlVersion}" implementation 'com.graphql-java:graphql-java:18.0' implementation 'com.graphql-java-kickstart:graphql-java-servlet:12.0.0' diff --git a/api/src/main/java/marquez/MarquezApp.java b/api/src/main/java/marquez/MarquezApp.java index 5ca39396f5..1697acfd61 100644 --- a/api/src/main/java/marquez/MarquezApp.java +++ b/api/src/main/java/marquez/MarquezApp.java @@ -23,6 +23,7 @@ import javax.sql.DataSource; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import marquez.api.filter.JobRedirectFilter; import marquez.cli.SeedCommand; import marquez.common.Utils; import marquez.db.DbMigration; @@ -115,9 +116,10 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) { env.jersey().register(new TracingContainerResponseFilter()); } - registerResources(config, env, source); + MarquezContext marquezContext = buildMarquezContext(config, env, (ManagedDataSource) source); + registerResources(config, env, marquezContext); registerServlets(env); - registerFilters(env); + registerFilters(env, marquezContext); } private boolean isSentryEnabled(MarquezConfig config) { @@ -126,11 +128,26 @@ private boolean isSentryEnabled(MarquezConfig config) { } public void registerResources( - @NonNull MarquezConfig config, @NonNull Environment env, @NonNull DataSource source) { + @NonNull MarquezConfig config, @NonNull Environment env, MarquezContext context) { + + if (config.getGraphql().isEnabled()) { + env.servlets() + .addServlet("api/v1-beta/graphql", context.getGraphqlServlet()) + .addMapping("/api/v1-beta/graphql", "/api/v1/schema.json"); + } + + log.debug("Registering resources..."); + for (final Object resource : context.getResources()) { + env.jersey().register(resource); + } + } + + private MarquezContext buildMarquezContext( + MarquezConfig config, Environment env, ManagedDataSource source) { final JdbiFactory factory = new JdbiFactory(); final Jdbi jdbi = factory - .build(env, config.getDataSourceFactory(), (ManagedDataSource) source, DB_POSTGRES) + .build(env, config.getDataSourceFactory(), source, DB_POSTGRES) .installPlugin(new SqlObjectPlugin()) .installPlugin(new PostgresPlugin()); SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics()); @@ -141,17 +158,7 @@ public void registerResources( final MarquezContext context = MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build(); - - if (config.getGraphql().isEnabled()) { - env.servlets() - .addServlet("api/v1-beta/graphql", context.getGraphqlServlet()) - .addMapping("/api/v1-beta/graphql", "/api/v1/schema.json"); - } - - log.debug("Registering resources..."); - for (final Object resource : context.getResources()) { - env.jersey().register(resource); - } + return context; } private void registerServlets(@NonNull Environment env) { @@ -161,7 +168,10 @@ private void registerServlets(@NonNull Environment env) { env.servlets().addServlet(PROMETHEUS, new MetricsServlet()).addMapping(PROMETHEUS_ENDPOINT); } - private void registerFilters(@NonNull Environment env) { + private void registerFilters(@NonNull Environment env, MarquezContext marquezContext) { env.jersey().getResourceConfig().register(new LoggingMdcFilter()); + env.jersey() + .getResourceConfig() + .register(new JobRedirectFilter(marquezContext.getJobService())); } } diff --git a/api/src/main/java/marquez/api/filter/JobRedirectFilter.java b/api/src/main/java/marquez/api/filter/JobRedirectFilter.java new file mode 100644 index 0000000000..d60614b53d --- /dev/null +++ b/api/src/main/java/marquez/api/filter/JobRedirectFilter.java @@ -0,0 +1,92 @@ +package marquez.api.filter; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; +import lombok.extern.slf4j.Slf4j; +import marquez.common.models.JobId; +import marquez.db.models.JobRow; +import marquez.service.JobService; +import marquez.service.models.Job; +import org.glassfish.jersey.server.ExtendedUriInfo; +import org.glassfish.jersey.uri.UriComponent; +import org.glassfish.jersey.uri.UriComponent.Type; +import org.glassfish.jersey.uri.UriTemplate; + +/** + * Filters requests that reference a job that has been symlinked to another job. This filter + * redirects such requests to the URL with the symlink target's name using a 301 status code. + */ +@Slf4j +public class JobRedirectFilter implements ContainerRequestFilter { + + public static final String JOB_PATH_PARAM = "job"; + public static final String NAMESPACE_PATH_PARAM = "namespace"; + private final JobService jobService; + + public JobRedirectFilter(JobService jobService) { + this.jobService = jobService; + } + + @Override + public void filter(ContainerRequestContext requestContext) throws IOException { + MultivaluedMap pathParams = requestContext.getUriInfo().getPathParameters(); + if (!pathParams.containsKey(NAMESPACE_PATH_PARAM) || !pathParams.containsKey(JOB_PATH_PARAM)) { + return; + } + List namespaceParams = pathParams.get(NAMESPACE_PATH_PARAM); + List jobParams = pathParams.get(JOB_PATH_PARAM); + if (namespaceParams.isEmpty() || jobParams.isEmpty()) { + return; + } + Optional job = jobService.findJobByName(namespaceParams.get(0), jobParams.get(0)); + job.ifPresent( + j -> { + if (!j.getName().getValue().equals(jobParams.get(0))) { + log.info( + "Job {}.{} has been redirected to {}.{}", + namespaceParams.get(0), + jobParams.get(0), + j.getNamespace().getValue(), + j.getName().getValue()); + URI location = buildLocationFor(requestContext, j.getId()); + log.debug("Redirecting to url {}", location); + requestContext.abortWith(Response.status(301).location(location).build()); + } + }); + } + + /** + * Construct a URI from a Request's matched resource, replacing the {@value #JOB_PATH_PARAM} and + * {@value #NAMESPACE_PATH_PARAM} parameters with the fully-qualified values from the provided + * {@link JobRow}. + * + * @param ctx + * @param jobId + * @return + */ + private URI buildLocationFor(ContainerRequestContext ctx, JobId jobId) { + Object resource = ctx.getUriInfo().getMatchedResources().get(0); + MultivaluedMap pathParameters = ctx.getUriInfo().getPathParameters(); + MultivaluedHashMap copy = new MultivaluedHashMap<>(pathParameters); + copy.putSingle( + JOB_PATH_PARAM, UriComponent.encode(jobId.getName().getValue(), Type.PATH_SEGMENT)); + copy.putSingle( + NAMESPACE_PATH_PARAM, + UriComponent.encode(jobId.getNamespace().getValue(), Type.PATH_SEGMENT)); + Map singletonMap = new HashMap<>(); + copy.forEach((k, v) -> singletonMap.put(k, v.get(0))); + UriTemplate pathTemplate = ((ExtendedUriInfo) ctx.getUriInfo()).getMatchedTemplates().get(0); + String newPath = pathTemplate.createURI(singletonMap); + return UriBuilder.fromResource(resource.getClass()).path(newPath).buildFromEncodedMap(copy); + } +} diff --git a/api/src/main/java/marquez/common/models/NamespaceName.java b/api/src/main/java/marquez/common/models/NamespaceName.java index 0d720c4428..0db8429bd4 100644 --- a/api/src/main/java/marquez/common/models/NamespaceName.java +++ b/api/src/main/java/marquez/common/models/NamespaceName.java @@ -21,7 +21,7 @@ public final class NamespaceName { private static final int MIN_SIZE = 1; private static final int MAX_SIZE = 1024; private static final Pattern PATTERN = - Pattern.compile(String.format("^[a-zA-Z:;=/0-9_\\-\\.]{%d,%d}$", MIN_SIZE, MAX_SIZE)); + Pattern.compile(String.format("^[a-zA-Z:;=/0-9_\\-\\.@]{%d,%d}$", MIN_SIZE, MAX_SIZE)); @Getter private final String value; @@ -29,7 +29,7 @@ public NamespaceName(@NonNull final String value) { checkArgument( PATTERN.matcher(value).matches(), "namespace '%s' must contain only letters (a-z, A-Z), numbers (0-9), " - + "underscores (_), dashes (-), colons (:), equals (=), semicolons (;), slashes (/) " + + "underscores (_), at (@), dashes (-), colons (:), equals (=), semicolons (;), slashes (/) " + "or dots (.) with a maximum length of %s characters.", value, MAX_SIZE); diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index d9dc11f7a8..64ff9b86f0 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -25,6 +25,7 @@ @Slf4j public final class Columns { + private Columns() {} private static final ObjectMapper MAPPER = Utils.getMapper(); @@ -78,6 +79,9 @@ private Columns() {} /* STREAM VERSION ROW COLUMNS */ public static final String SCHEMA_LOCATION = "schema_location"; + /* JOB ROW COLUMNS */ + public static final String SYMLINK_TARGET_UUID = "symlink_target_uuid"; + /* JOB VERSION I/O ROW COLUMNS */ public static final String INPUT_UUIDS = "input_uuids"; public static final String OUTPUT_UUIDS = "output_uuids"; diff --git a/api/src/main/java/marquez/db/DbMigration.java b/api/src/main/java/marquez/db/DbMigration.java index 800eb3de45..a0d47d34f6 100644 --- a/api/src/main/java/marquez/db/DbMigration.java +++ b/api/src/main/java/marquez/db/DbMigration.java @@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j; import org.flywaydb.core.Flyway; import org.flywaydb.core.api.FlywayException; +import org.flywaydb.core.api.output.MigrateResult; @Slf4j public final class DbMigration { @@ -31,8 +32,9 @@ public static void migrateDbOrError( // issues before app termination. try { log.info("Migrating database..."); - final int migrations = flyway.migrate(); - log.info("Successfully applied '{}' migrations to database.", migrations); + final MigrateResult migrateResult = flyway.migrate(); + log.info( + "Successfully applied '{}' migrations to database.", migrateResult.migrationsExecuted); } catch (FlywayException errorOnDbMigrate) { log.error("Failed to apply migration to database.", errorOnDbMigrate); try { diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index 9e49098894..21a3272377 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -48,23 +48,34 @@ public interface JobDao extends BaseDao { void updateVersionFor(UUID rowUuid, Instant updatedAt, UUID currentVersionUuid); @SqlQuery( - "SELECT j.*, jc.context, f.facets\n" - + " FROM jobs AS j\n" - + " LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid\n" - + " LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid\n" - + " LEFT OUTER JOIN (\n" - + " SELECT run_uuid, JSON_AGG(e.facets) AS facets\n" - + " FROM (\n" - + " SELECT run_uuid, event->'job'->'facets' AS facets\n" - + " FROM lineage_events AS le\n" - + " INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid\n" - + " INNER JOIN jobs j2 ON j2.current_version_uuid=jv2.uuid\n" - + " WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName\n" - + " ORDER BY event_time ASC\n" - + " ) e\n" - + " GROUP BY e.run_uuid\n" - + " ) f ON f.run_uuid=jv.latest_run_uuid\n" - + "WHERE j.namespace_name = :namespaceName AND j.name = :jobName") + """ + WITH RECURSIVE job_ids AS ( + SELECT uuid, symlink_target_uuid + FROM jobs j + WHERE j.namespace_name=:namespaceName AND j.name=:jobName + UNION + SELECT j.uuid, j.symlink_target_uuid + FROM jobs j + INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid + ) + SELECT j.*, jc.context, f.facets + FROM jobs j + INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL + LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid + LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid + LEFT OUTER JOIN ( + SELECT run_uuid, JSON_AGG(e.facets) AS facets + FROM ( + SELECT run_uuid, event->'job'->'facets' AS facets + FROM lineage_events AS le + INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid + INNER JOIN jobs j2 ON j2.current_version_uuid=jv2.uuid + WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName + ORDER BY event_time ASC + ) e + GROUP BY e.run_uuid + ) f ON f.run_uuid=jv.latest_run_uuid + """) Optional findJobByName(String namespaceName, String jobName); default Optional findWithRun(String namespaceName, String jobName) { @@ -78,11 +89,21 @@ default Optional findWithRun(String namespaceName, String jobName) { } @SqlQuery( - "SELECT j.*, n.name AS namespace_name FROM jobs AS j " - + "INNER JOIN namespaces AS n " - + " ON (n.name = :namespaceName AND " - + " j.namespace_uuid = n.uuid AND " - + " j.name = :jobName)") + """ + WITH RECURSIVE job_ids AS ( + SELECT uuid, symlink_target_uuid + FROM jobs j + WHERE j.namespace_name=:namespaceName AND j.name=:jobName + UNION + SELECT j.uuid, j.symlink_target_uuid + FROM jobs j + INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid + ) + SELECT j.*, n.name AS namespace_name + FROM jobs AS j + INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL + INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid + """) Optional findJobByNameAsRow(String namespaceName, String jobName); @SqlQuery( @@ -103,14 +124,17 @@ default Optional findWithRun(String namespaceName, String jobName) { + " GROUP BY e.run_uuid\n" + " ) f ON f.run_uuid=jv.latest_run_uuid\n" + "WHERE j.namespace_name = :namespaceName\n" + + "AND j.symlink_target_uuid IS NULL\n" + "ORDER BY j.name " + "LIMIT :limit OFFSET :offset") List findAll(String namespaceName, int limit, int offset); - @SqlQuery("SELECT count(*) FROM jobs AS j") - int count(String namespaceName); + @SqlQuery("SELECT count(*) FROM jobs AS j WHERE symlink_target_uuid IS NULL") + int count(); - @SqlQuery("SELECT count(*) FROM jobs AS j WHERE j.namespace_name = :namespaceName") + @SqlQuery( + "SELECT count(*) FROM jobs AS j WHERE j.namespace_name = :namespaceName\n" + + "AND symlink_target_uuid IS NULL") int countFor(String namespaceName); default List findAllWithRun(String namespaceName, int limit, int offset) { @@ -147,6 +171,15 @@ default void setJobData(Run run, Job j) { default JobRow upsertJobMeta( NamespaceName namespaceName, JobName jobName, JobMeta jobMeta, ObjectMapper mapper) { + return upsertJobMeta(namespaceName, jobName, null, jobMeta, mapper); + } + + default JobRow upsertJobMeta( + NamespaceName namespaceName, + JobName jobName, + UUID symlinkTargetUuid, + JobMeta jobMeta, + ObjectMapper mapper) { Instant createdAt = Instant.now(); NamespaceRow namespace = createNamespaceDao() @@ -170,6 +203,7 @@ default JobRow upsertJobMeta( jobMeta.getDescription().orElse(null), contextRow.getUuid(), toUrlString(jobMeta.getLocation().orElse(null)), + symlinkTargetUuid, toJson(jobMeta.getInputs(), mapper)); } @@ -192,39 +226,45 @@ default PGobject toJson(Set dataset, ObjectMapper mapper) { } @SqlQuery( - "INSERT INTO jobs (" - + "uuid, " - + "type, " - + "created_at, " - + "updated_at, " - + "namespace_uuid, " - + "namespace_name, " - + "name, " - + "description," - + "current_job_context_uuid," - + "current_location," - + "current_inputs" - + ") VALUES ( " - + ":uuid, " - + ":type, " - + ":now, " - + ":now, " - + ":namespaceUuid, " - + ":namespaceName, " - + ":name, " - + ":description, " - + ":jobContextUuid, " - + ":location, " - + ":inputs " - + ") ON CONFLICT (name, namespace_uuid) DO " - + "UPDATE SET " - + "updated_at = EXCLUDED.updated_at, " - + "type = EXCLUDED.type, " - + "description = EXCLUDED.description, " - + "current_job_context_uuid = EXCLUDED.current_job_context_uuid, " - + "current_location = EXCLUDED.current_location, " - + "current_inputs = EXCLUDED.current_inputs " - + "RETURNING *") + """ + INSERT INTO jobs AS j ( + uuid, + type, + created_at, + updated_at, + namespace_uuid, + namespace_name, + name, + description, + current_job_context_uuid, + current_location, + current_inputs, + symlink_target_uuid + ) VALUES ( + :uuid, + :type, + :now, + :now, + :namespaceUuid, + :namespaceName, + :name, + :description, + :jobContextUuid, + :location, + :inputs, + :symlinkTargetId + ) ON CONFLICT (name, namespace_uuid) DO + UPDATE SET + updated_at = EXCLUDED.updated_at, + type = EXCLUDED.type, + description = EXCLUDED.description, + current_job_context_uuid = EXCLUDED.current_job_context_uuid, + current_location = EXCLUDED.current_location, + current_inputs = EXCLUDED.current_inputs, + -- update the symlink target if not null. otherwise, keep the old value + symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid) + RETURNING * + """) JobRow upsertJob( UUID uuid, JobType type, @@ -235,5 +275,6 @@ JobRow upsertJob( String description, UUID jobContextUuid, String location, + UUID symlinkTargetId, PGobject inputs); } diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 65e89e0194..e83d0b9ff1 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -133,6 +133,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper description, jobContext.getUuid(), location, + null, jobDao.toJson(toDatasetId(event.getInputs()), mapper)); bag.setJob(job); diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 1a3e0a5639..7cfb24f1e0 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -104,39 +104,50 @@ public interface RunDao extends BaseDao { Optional findRunByUuidAsRow(UUID runUuid); @SqlQuery( - "SELECT r.*, ra.args, ctx.context, f.facets,\n" - + "jv.namespace_name, jv.job_name, jv.version AS job_version,\n" - + "ri.input_versions, ro.output_versions\n" - + "FROM runs AS r\n" - + "LEFT OUTER JOIN\n" - + "(\n" - + " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n" - + " FROM lineage_events le\n" - + " INNER JOIN runs ON runs.uuid=le.run_uuid\n" - + " WHERE runs.job_name=:jobName AND runs.namespace_name=:namespace\n" - + " GROUP BY le.run_uuid\n" - + ") AS f ON r.uuid=f.run_uuid\n" - + "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n" - + "LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n" - + "LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n" - + "LEFT OUTER JOIN (\n" - + " SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,\n" - + " 'name', dv.dataset_name,\n" - + " 'version', dv.version)) AS input_versions\n" - + " FROM runs_input_mapping im\n" - + " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n" - + " GROUP BY im.run_uuid\n" - + ") ri ON ri.run_uuid=r.uuid\n" - + "LEFT OUTER JOIN (\n" - + " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n" - + " 'name', dataset_name,\n" - + " 'version', version)) AS output_versions\n" - + " FROM dataset_versions\n" - + " GROUP BY run_uuid\n" - + ") ro ON ro.run_uuid=r.uuid\n" - + "WHERE r.namespace_name = :namespace and r.job_name = :jobName\n" - + "ORDER BY STARTED_AT DESC NULLS LAST\n" - + "LIMIT :limit OFFSET :offset") + """ + WITH RECURSIVE job_names AS ( + SELECT uuid, namespace_name, name, symlink_target_uuid + FROM jobs j + WHERE j.namespace_name=:namespace AND j.name=:jobName + UNION + SELECT j.uuid, j.namespace_name, j.name, j.symlink_target_uuid + FROM jobs j + INNER JOIN job_names jn ON j.uuid=jn.symlink_target_uuid OR j.symlink_target_uuid=jn.uuid + ) + SELECT r.*, ra.args, ctx.context, f.facets, + jv.namespace_name, jv.job_name, jv.version AS job_version, + ri.input_versions, ro.output_versions + FROM runs AS r + INNER JOIN job_names j ON r.namespace_name=j.namespace_name AND r.job_name=j.name + LEFT OUTER JOIN + ( + SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets + FROM lineage_events le + INNER JOIN runs ON runs.uuid=le.run_uuid + WHERE runs.job_name=:jobName AND runs.namespace_name=:namespace + GROUP BY le.run_uuid + ) AS f ON r.uuid=f.run_uuid + LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid + LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid + LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid + LEFT OUTER JOIN ( + SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, + 'name', dv.dataset_name, + 'version', dv.version)) AS input_versions + FROM runs_input_mapping im + INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid + GROUP BY im.run_uuid + ) ri ON ri.run_uuid=r.uuid + LEFT OUTER JOIN ( + SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, + 'name', dataset_name, + 'version', version)) AS output_versions + FROM dataset_versions + GROUP BY run_uuid + ) ro ON ro.run_uuid=r.uuid + ORDER BY STARTED_AT DESC NULLS LAST + LIMIT :limit OFFSET :offset + """) List findAll(String namespace, String jobName, int limit, int offset); @SqlQuery( @@ -384,11 +395,25 @@ default RunRow upsertRunMeta( void updateJobVersion(UUID runUuid, UUID jobVersionUuid); @SqlQuery( - BASE_FIND_RUN_SQL - + "WHERE r.uuid=(\n" - + " SELECT uuid FROM runs WHERE namespace_name = :namespace and job_name = :jobName\n" - + " ORDER BY transitioned_at DESC\n" - + " LIMIT 1\n" - + ")") + """ + WITH RECURSIVE job_names AS ( + SELECT uuid, namespace_name, name, symlink_target_uuid + FROM jobs j + WHERE j.namespace_name=:namespace AND j.name=:jobName + UNION + SELECT j.uuid, j.namespace_name, j.name, j.symlink_target_uuid + FROM jobs j + INNER JOIN job_names jn ON j.uuid=jn.symlink_target_uuid OR j.symlink_target_uuid=jn.uuid + ) + """ + + BASE_FIND_RUN_SQL + + """ + WHERE r.uuid=( + SELECT r.uuid FROM runs r + INNER JOIN job_names j ON j.namespace_name=r.namespace_name AND j.name=r.job_name + ORDER BY transitioned_at DESC + LIMIT 1 + ) + """) Optional findByLatestJob(String namespace, String jobName); } diff --git a/api/src/main/java/marquez/db/mappers/JobRowMapper.java b/api/src/main/java/marquez/db/mappers/JobRowMapper.java index 35f07a7244..7c865b69ef 100644 --- a/api/src/main/java/marquez/db/mappers/JobRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/JobRowMapper.java @@ -43,7 +43,8 @@ public JobRow map(@NonNull ResultSet results, @NonNull StatementContext context) uuidOrNull(results, Columns.CURRENT_VERSION_UUID), uuidOrNull(results, "current_job_context_uuid"), stringOrNull(results, "current_location"), - getDatasetFromJsonOrNull(results, "current_inputs")); + getDatasetFromJsonOrNull(results, "current_inputs"), + uuidOrNull(results, Columns.SYMLINK_TARGET_UUID)); } Set getDatasetFromJsonOrNull(@NonNull ResultSet results, String column) diff --git a/api/src/main/java/marquez/db/models/JobRow.java b/api/src/main/java/marquez/db/models/JobRow.java index e2aed8accd..3cc45ab2ed 100644 --- a/api/src/main/java/marquez/db/models/JobRow.java +++ b/api/src/main/java/marquez/db/models/JobRow.java @@ -24,6 +24,7 @@ public class JobRow { @Nullable UUID jobContextUuid; @Nullable String location; @Nullable Set inputs; + @Nullable UUID symlinkTargetId; public Optional getDescription() { return Optional.ofNullable(description); diff --git a/api/src/main/resources/marquez/db/migration/V42__add_job_symlink_target.sql b/api/src/main/resources/marquez/db/migration/V42__add_job_symlink_target.sql new file mode 100644 index 0000000000..07040a07a7 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V42__add_job_symlink_target.sql @@ -0,0 +1,4 @@ +ALTER TABLE jobs ADD COLUMN symlink_target_uuid uuid REFERENCES jobs (uuid); +CREATE INDEX jobs_symlinks ON jobs (symlink_target_uuid) + INCLUDE (uuid, namespace_name, name) + WHERE symlink_target_uuid IS NOT NULL; \ No newline at end of file diff --git a/api/src/test/java/marquez/MarquezAppIntegrationTest.java b/api/src/test/java/marquez/MarquezAppIntegrationTest.java index eb695cc3d9..9679aa96bb 100644 --- a/api/src/test/java/marquez/MarquezAppIntegrationTest.java +++ b/api/src/test/java/marquez/MarquezAppIntegrationTest.java @@ -17,6 +17,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.net.URL; +import java.sql.SQLException; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; @@ -42,11 +43,20 @@ import marquez.client.models.StreamMeta; import marquez.client.models.Tag; import marquez.common.models.DatasetName; +import marquez.common.models.JobType; +import marquez.db.JobDao; +import marquez.db.NamespaceDao; +import marquez.db.models.JobRow; +import marquez.db.models.NamespaceRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.postgres.PostgresPlugin; +import org.jdbi.v3.sqlobject.SqlObjectPlugin; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.postgresql.util.PGobject; @org.junit.jupiter.api.Tag("IntegrationTests") @ExtendWith(MarquezJdbiExternalPostgresExtension.class) @@ -626,4 +636,68 @@ public void testApp_search() { assertThat(result.getType()).isEqualTo(SearchResult.ResultType.DATASET); assertThat(result.getName()).isEqualTo(datasetName); } + + @Test + public void testApp_getJob() throws SQLException { + Jdbi jdbi = + Jdbi.create(POSTGRES.getJdbcUrl(), POSTGRES.getUsername(), POSTGRES.getPassword()) + .installPlugin(new SqlObjectPlugin()) + .installPlugin(new PostgresPlugin()); + createNamespace(NAMESPACE_NAME); + + // Create job + String jobName = newJobName().getValue(); + final JobMeta jobMeta = + JobMeta.builder() + .type(JOB_TYPE) + .inputs(ImmutableSet.of()) + .outputs(ImmutableSet.of()) + .location(JOB_LOCATION) + .context(JOB_CONTEXT) + .description(JOB_DESCRIPTION) + .build(); + final Job originalJob = client.createJob(NAMESPACE_NAME, jobName, jobMeta); + + String targetJobName = newJobName().getValue(); + final JobMeta targetJobMeta = + JobMeta.builder() + .type(JOB_TYPE) + .inputs(ImmutableSet.of()) + .outputs(ImmutableSet.of()) + .location(JOB_LOCATION) + .context(JOB_CONTEXT) + .description(JOB_DESCRIPTION) + .build(); + final Job targetJob = client.createJob(NAMESPACE_NAME, targetJobName, targetJobMeta); + + JobDao jobDao = jdbi.onDemand(JobDao.class); + NamespaceDao namespaceDao = jdbi.onDemand(NamespaceDao.class); + Optional namespaceRow = namespaceDao.findNamespaceByName(NAMESPACE_NAME); + Optional originalJobRow = jobDao.findJobByNameAsRow(NAMESPACE_NAME, jobName); + Optional targetJobRow = jobDao.findJobByNameAsRow(NAMESPACE_NAME, targetJobName); + PGobject inputs = new PGobject(); + inputs.setType("json"); + inputs.setValue("[]"); + originalJobRow.ifPresent( + j -> { + jobDao.upsertJob( + j.getUuid(), + JobType.valueOf(JOB_TYPE.name()), + Instant.now(), + namespaceRow.get().getUuid(), + NAMESPACE_NAME, + jobName, + JOB_DESCRIPTION, + j.getJobContextUuid().orElse(null), + JOB_LOCATION.toString(), + targetJobRow.get().getUuid(), + inputs); + }); + + Job job = client.getJob(NAMESPACE_NAME, jobName); + assertThat(job) + .isNotNull() + .hasFieldOrPropertyWithValue("namespace", NAMESPACE_NAME) + .hasFieldOrPropertyWithValue("name", targetJobName); + } } diff --git a/api/src/test/java/marquez/common/models/NamespaceNameTest.java b/api/src/test/java/marquez/common/models/NamespaceNameTest.java index 03fe2c1965..4e9ea3176a 100644 --- a/api/src/test/java/marquez/common/models/NamespaceNameTest.java +++ b/api/src/test/java/marquez/common/models/NamespaceNameTest.java @@ -17,14 +17,16 @@ public class NamespaceNameTest { "s3://bucket", "bigquery:", "sqlserver://synapse-test-test001.sql.azuresynapse.net;databaseName=TESTPOOL1;", - "\u003D" + "\u003D", + "@", + "abfss://something@.something-else.core.windows.net" }) void testValidNamespaceName(String name) { assertThat(NamespaceName.of(name).getValue()).isEqualTo(name); } @ParameterizedTest - @ValueSource(strings = {"@@@", "\uD83D\uDE02", "!", ""}) + @ValueSource(strings = {"\uD83D\uDE02", "!", ""}) void testInvalidNamespaceName(String name) { Assertions.assertThrows(IllegalArgumentException.class, () -> NamespaceName.of(name)); } diff --git a/api/src/test/java/marquez/db/DbTestUtils.java b/api/src/test/java/marquez/db/DbTestUtils.java index cfd706170d..8f8bf152e3 100644 --- a/api/src/test/java/marquez/db/DbTestUtils.java +++ b/api/src/test/java/marquez/db/DbTestUtils.java @@ -33,6 +33,7 @@ import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.JobName; +import marquez.common.models.JobType; import marquez.common.models.NamespaceName; import marquez.common.models.RunState; import marquez.db.models.DatasetRow; @@ -128,11 +129,56 @@ static ImmutableSet newJobs(final Jdbi jdbi, final int limit) { return Stream.generate(() -> newJob(jdbi)).limit(limit).collect(toImmutableSet()); } + public static JobRow createJobWithoutSymlinkTarget( + Jdbi jdbi, NamespaceRow namespace, String jobName, String description) { + return newJobWith( + jdbi, + namespace.getName(), + jobName, + new JobMeta( + JobType.BATCH, + ImmutableSet.of(), + ImmutableSet.of(), + null, + ImmutableMap.of(), + description, + null)); + } + + public static JobRow createJobWithSymlinkTarget( + Jdbi jdbi, NamespaceRow namespace, String jobName, UUID jobSymlinkId, String description) { + return newJobWith( + jdbi, + namespace.getName(), + jobName, + jobSymlinkId, + new JobMeta( + JobType.BATCH, + ImmutableSet.of(), + ImmutableSet.of(), + null, + ImmutableMap.of(), + description, + null)); + } + /** * Adds a new {@link JobRow} object to the {@code jobs} table with the provided {@link JobMeta}. */ static JobRow newJobWith( final Jdbi jdbi, final String namespaceName, final String jobName, final JobMeta jobMeta) { + return newJobWith(jdbi, namespaceName, jobName, null, jobMeta); + } + + /** + * Adds a new {@link JobRow} object to the {@code jobs} table with the provided {@link JobMeta}. + */ + static JobRow newJobWith( + final Jdbi jdbi, + final String namespaceName, + final String jobName, + UUID symlinkTargetUuid, + final JobMeta jobMeta) { final DatasetDao datasetDao = jdbi.onDemand(DatasetDao.class); final JobDao jobDao = jdbi.onDemand(JobDao.class); @@ -151,7 +197,11 @@ static JobRow newJobWith( } return jobDao.upsertJobMeta( - namespaceForDatasetAndJob, JobName.of(jobName), jobMeta, Utils.getMapper()); + namespaceForDatasetAndJob, + JobName.of(jobName), + symlinkTargetUuid, + jobMeta, + Utils.getMapper()); } /** Adds a new {@link JobContextRow} object to the {@code job_contexts} table. */ @@ -268,35 +318,36 @@ public static Stream> query(Jdbi jdbi, String sql) { .scanResultSet( (rs, ctx) -> { ResultSet resultSet = rs.get(); - return Stream.generate( - () -> { - try { - if (resultSet.next()) { - ResultSetMetaData metaData = resultSet.getMetaData(); - int keys = metaData.getColumnCount(); - return IntStream.range(1, keys + 1) - .mapToObj( - i -> { - try { - return Map.entry( - metaData.getColumnName(i), - Optional.ofNullable(resultSet.getObject(i)) - .orElse("NULL")); - } catch (SQLException e) { - throw new RuntimeException(e); - } - }) - .collect( - Collectors.toMap( - Map.Entry::getKey, Map.Entry::getValue)); - } else { - return null; - } - } catch (SQLException e) { - throw new RuntimeException(e); - } - }) - .takeWhile(Predicates.notNull()); + return streamResults(resultSet); })); } + + public static Stream> streamResults(ResultSet resultSet) { + return Stream.generate( + () -> { + try { + if (resultSet.next()) { + ResultSetMetaData metaData = resultSet.getMetaData(); + int keys = metaData.getColumnCount(); + return IntStream.range(1, keys + 1) + .mapToObj( + i -> { + try { + return Map.entry( + metaData.getColumnName(i), + Optional.ofNullable(resultSet.getObject(i)).orElse("NULL")); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } else { + return null; + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + }) + .takeWhile(Predicates.notNull()); + } } diff --git a/api/src/test/java/marquez/db/JobDaoTest.java b/api/src/test/java/marquez/db/JobDaoTest.java index 8a9732fd5d..81a0b0c3bc 100644 --- a/api/src/test/java/marquez/db/JobDaoTest.java +++ b/api/src/test/java/marquez/db/JobDaoTest.java @@ -2,6 +2,9 @@ package marquez.db; +import static marquez.db.DbTestUtils.createJobWithSymlinkTarget; +import static marquez.db.DbTestUtils.createJobWithoutSymlinkTarget; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -9,8 +12,18 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import marquez.db.models.DbModelGenerator; +import marquez.db.models.JobRow; +import marquez.db.models.NamespaceRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.Job; +import org.assertj.core.api.AbstractObjectAssert; import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -19,10 +32,26 @@ public class JobDaoTest { private static JobDao jobDao; + private static NamespaceDao namespaceDao; + private static NamespaceRow namespace; + private static Jdbi jdbi; @BeforeAll public static void setUpOnce(Jdbi jdbi) { + JobDaoTest.jdbi = jdbi; jobDao = jdbi.onDemand(JobDao.class); + namespaceDao = jdbi.onDemand(NamespaceDao.class); + namespace = + namespaceDao.upsertNamespaceRow( + UUID.randomUUID(), + Instant.now(), + JobDaoTest.class.getSimpleName(), + JobDaoTest.class.getName()); + } + + @AfterEach + public void cleanUp(Jdbi jdbi) { + jdbi.inTransaction(h -> h.execute("DELETE FROM jobs")); } @Test @@ -30,6 +59,120 @@ public void emptyUrl() { assertNull(jobDao.toUrlString(null)); } + @Test + public void testFindSymlinkedJobByName() { + JobRow targetJob = + createJobWithoutSymlinkTarget(jdbi, namespace, "targetJob", "the target of the symlink"); + JobRow symlinkJob = + createJobWithSymlinkTarget( + jdbi, namespace, "symlinkJob", targetJob.getUuid(), "the symlink job"); + + Optional jobByName = + jobDao.findJobByName(symlinkJob.getNamespaceName(), symlinkJob.getName()); + + assertJobEquals(jobByName, targetJob.getNamespaceName(), targetJob.getName()); + } + + @Test + public void testFindSymlinkedJobRowByName() { + JobRow targetJob = + createJobWithoutSymlinkTarget(jdbi, namespace, "targetJob", "the target of the symlink"); + JobRow symlinkJob = + createJobWithSymlinkTarget( + jdbi, namespace, "symlinkJob", targetJob.getUuid(), "the symlink job"); + + Optional jobByName = + jobDao.findJobByNameAsRow(symlinkJob.getNamespaceName(), symlinkJob.getName()); + assertThat(jobByName) + .isPresent() + .get() + .hasFieldOrPropertyWithValue("name", targetJob.getName()) + .hasFieldOrPropertyWithValue("namespaceName", targetJob.getNamespaceName()); + } + + @Test + public void testFindAll() { + JobRow targetJob = + createJobWithoutSymlinkTarget(jdbi, namespace, "targetJob", "the target of the symlink"); + JobRow symlinkJob = + createJobWithSymlinkTarget( + jdbi, namespace, "symlinkJob", targetJob.getUuid(), "the symlink job"); + JobRow anotherJobSameNamespace = + createJobWithoutSymlinkTarget(jdbi, namespace, "anotherJob", "a random other job"); + + List jobs = jobDao.findAll(namespace.getName(), 10, 0); + + // the symlinked job isn't present in the response - only the symlink target and the job with + // no symlink + assertThat(jobs) + .hasSize(2) + .map(Job::getId) + .containsExactlyInAnyOrder( + DbModelGenerator.jobIdFor(namespace.getName(), targetJob.getName()), + DbModelGenerator.jobIdFor(namespace.getName(), anotherJobSameNamespace.getName())); + } + + @Test + public void testCountFor() { + JobRow targetJob = + createJobWithoutSymlinkTarget(jdbi, namespace, "targetJob", "the target of the symlink"); + createJobWithSymlinkTarget( + jdbi, namespace, "symlinkJob", targetJob.getUuid(), "the symlink job"); + createJobWithoutSymlinkTarget(jdbi, namespace, "anotherJob", "a random other job"); + createJobWithoutSymlinkTarget(jdbi, namespace, "aThirdJob", "a random third job"); + + NamespaceRow anotherNamespace = + namespaceDao.upsertNamespaceRow( + UUID.randomUUID(), Instant.now(), "anotherNamespace", getClass().getName()); + createJobWithSymlinkTarget( + jdbi, anotherNamespace, "othernamespacejob", null, "job in another namespace"); + + assertThat(jobDao.count()).isEqualTo(4); + + assertThat(jobDao.countFor(namespace.getName())).isEqualTo(3); + } + + @Test + public void testUpsertJobWithNewSymlink() { + JobRow targetJob = + createJobWithoutSymlinkTarget(jdbi, namespace, "targetJob", "the target of the symlink"); + + String symlinkJobName = "symlinkJob"; + JobRow symlinkJob = + createJobWithoutSymlinkTarget(jdbi, namespace, symlinkJobName, "the symlink job"); + + // the job queried is returned, since there is no symlink + Optional jobByName = + jobDao.findJobByName(symlinkJob.getNamespaceName(), symlinkJob.getName()); + assertJobEquals(jobByName, symlinkJob.getNamespaceName(), symlinkJob.getName()); + + createJobWithSymlinkTarget( + jdbi, namespace, symlinkJobName, targetJob.getUuid(), "the symlink job"); + + // now the symlink target should be returned + assertJobEquals( + jobDao.findJobByName(symlinkJob.getNamespaceName(), symlinkJob.getName()), + targetJob.getNamespaceName(), + targetJob.getName()); + + // upsert without the symlink target - the previous value should be respected + createJobWithoutSymlinkTarget(jdbi, namespace, symlinkJobName, "the symlink job"); + + // the symlink target should still be returned + assertJobEquals( + jobDao.findJobByName(symlinkJob.getNamespaceName(), symlinkJob.getName()), + targetJob.getNamespaceName(), + targetJob.getName()); + } + + private AbstractObjectAssert assertJobEquals( + Optional jobByName, String namespaceName, String jobName) { + return assertThat(jobByName) + .isPresent() + .get() + .hasFieldOrPropertyWithValue("id", DbModelGenerator.jobIdFor(namespaceName, jobName)); + } + @Test public void pgObjectException() throws JsonProcessingException { ObjectMapper objectMapper = mock(ObjectMapper.class); diff --git a/api/src/test/java/marquez/db/RunDaoTest.java b/api/src/test/java/marquez/db/RunDaoTest.java index 9a29b61738..e8e05fe63e 100644 --- a/api/src/test/java/marquez/db/RunDaoTest.java +++ b/api/src/test/java/marquez/db/RunDaoTest.java @@ -3,15 +3,22 @@ package marquez.db; import static marquez.common.models.CommonModelGenerator.newJobName; +import static marquez.db.DbTestUtils.createJobWithSymlinkTarget; +import static marquez.db.DbTestUtils.createJobWithoutSymlinkTarget; +import static marquez.db.DbTestUtils.newJobWith; import static marquez.service.models.ServiceModelGenerator.newJobMetaWith; import static org.assertj.core.api.Assertions.assertThat; +import com.google.common.collect.ImmutableSet; import java.time.Instant; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import marquez.common.models.DatasetId; import marquez.common.models.DatasetVersionId; import marquez.common.models.NamespaceName; @@ -25,6 +32,7 @@ import marquez.service.models.Run; import org.assertj.core.api.InstanceOfAssertFactories; import org.jdbi.v3.core.Jdbi; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -79,7 +87,7 @@ public void getRun() { final JobMeta jobMeta = newJobMetaWith(NamespaceName.of(namespaceRow.getName())); final JobRow jobRow = - DbTestUtils.newJobWith(jdbi, namespaceRow.getName(), newJobName().getValue(), jobMeta); + newJobWith(jdbi, namespaceRow.getName(), newJobName().getValue(), jobMeta); final RunRow runRow = DbTestUtils.newRun(jdbi, jobRow.getNamespaceName(), jobRow.getName()); DbTestUtils.transitionRunWithOutputs( @@ -116,26 +124,35 @@ public void getFindAll() { final JobMeta jobMeta = newJobMetaWith(NamespaceName.of(namespaceRow.getName())); final JobRow jobRow = - DbTestUtils.newJobWith(jdbi, namespaceRow.getName(), newJobName().getValue(), jobMeta); + newJobWith(jdbi, namespaceRow.getName(), newJobName().getValue(), jobMeta); Set expectedRuns = - IntStream.range(0, 5) - .mapToObj( - i -> { - final RunRow runRow = - DbTestUtils.newRun(jdbi, jobRow.getNamespaceName(), jobRow.getName()); - DbTestUtils.transitionRunWithOutputs( - jdbi, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs()); - - jobVersionDao.upsertJobVersionOnRunTransition( - jobRow.getNamespaceName(), - jobRow.getName(), - runRow.getUuid(), - RunState.COMPLETED, - Instant.now()); - return runRow; - }) + createRunsForJob(jobRow, 5, jobMeta.getOutputs()).collect(Collectors.toSet()); + List runs = runDao.findAll(jobRow.getNamespaceName(), jobRow.getName(), 10, 0); + assertThat(runs) + .hasSize(expectedRuns.size()) + .map(Run::getId) + .map(RunId::getValue) + .containsAll(expectedRuns.stream().map(RunRow::getUuid).collect(Collectors.toSet())); + } + + @Test + public void getFindAllForSymlinkedJob() { + final JobMeta jobMeta = newJobMetaWith(NamespaceName.of(namespaceRow.getName())); + final JobRow jobRow = + newJobWith(jdbi, namespaceRow.getName(), newJobName().getValue(), jobMeta); + + final JobRow symlinkJob = + createJobWithSymlinkTarget( + jdbi, namespaceRow, newJobName().getValue(), jobRow.getUuid(), "symlink job"); + + Set expectedRuns = + Stream.concat( + createRunsForJob(symlinkJob, 3, jobMeta.getOutputs()), + createRunsForJob(jobRow, 2, jobMeta.getOutputs())) .collect(Collectors.toSet()); + + // all runs should be present List runs = runDao.findAll(jobRow.getNamespaceName(), jobRow.getName(), 10, 0); assertThat(runs) .hasSize(expectedRuns.size()) @@ -144,13 +161,70 @@ public void getFindAll() { .containsAll(expectedRuns.stream().map(RunRow::getUuid).collect(Collectors.toSet())); } + @Test + public void testFindByLatestJob() { + final JobMeta jobMeta = newJobMetaWith(NamespaceName.of(namespaceRow.getName())); + final JobRow jobRow = + newJobWith(jdbi, namespaceRow.getName(), newJobName().getValue(), jobMeta); + Set runs = + createRunsForJob(jobRow, 5, jobMeta.getOutputs()).collect(Collectors.toSet()); + + TreeSet sortedRuns = + new TreeSet<>(Comparator.comparing(RunRow::getUpdatedAt).reversed()); + sortedRuns.addAll(runs); + Optional byLatestJob = runDao.findByLatestJob(jobRow.getNamespaceName(), jobRow.getName()); + assertThat(byLatestJob) + .isPresent() + .get() + .hasFieldOrPropertyWithValue("id", new RunId(sortedRuns.first().getUuid())); + + JobRow newTargetJob = + createJobWithoutSymlinkTarget(jdbi, namespaceRow, "newTargetJob", "a symlink target"); + + // update the old job to point to the new targets + createJobWithSymlinkTarget( + jdbi, + namespaceRow, + jobRow.getName(), + newTargetJob.getUuid(), + jobMeta.getDescription().orElse(null)); + + // get the latest run for the *newTargetJob*. It should be the same as the old job's latest run + byLatestJob = runDao.findByLatestJob(newTargetJob.getNamespaceName(), newTargetJob.getName()); + assertThat(byLatestJob) + .isPresent() + .get() + .hasFieldOrPropertyWithValue("id", new RunId(sortedRuns.first().getUuid())); + } + + @NotNull + private Stream createRunsForJob( + JobRow jobRow, int count, ImmutableSet outputs) { + return IntStream.range(0, count) + .mapToObj( + i -> { + final RunRow runRow = + DbTestUtils.newRun(jdbi, jobRow.getNamespaceName(), jobRow.getName()); + DbTestUtils.transitionRunWithOutputs( + jdbi, runRow.getUuid(), RunState.COMPLETED, outputs); + + jobVersionDao.upsertJobVersionOnRunTransition( + jobRow.getNamespaceName(), + jobRow.getName(), + runRow.getUuid(), + RunState.COMPLETED, + Instant.now()); + return runRow; + }); + } + @Test public void updateRowWithNullNominalTimeDoesNotUpdateNominalTime() { final RunDao runDao = jdbi.onDemand(RunDao.class); final JobMeta jobMeta = newJobMetaWith(NamespaceName.of(namespaceRow.getName())); final JobRow jobRow = - DbTestUtils.newJobWith(jdbi, namespaceRow.getName(), newJobName().getValue(), jobMeta); + newJobWith(jdbi, namespaceRow.getName(), newJobName().getValue(), jobMeta); RunRow row = DbTestUtils.newRun(jdbi, namespaceRow.getName(), jobRow.getName()); diff --git a/api/src/test/java/marquez/db/models/DbModelGenerator.java b/api/src/test/java/marquez/db/models/DbModelGenerator.java index d3a555493d..1857c371f3 100644 --- a/api/src/test/java/marquez/db/models/DbModelGenerator.java +++ b/api/src/test/java/marquez/db/models/DbModelGenerator.java @@ -12,6 +12,10 @@ import java.util.UUID; import java.util.stream.Stream; import marquez.Generator; +import marquez.common.models.JobId; +import marquez.common.models.JobName; +import marquez.common.models.NamespaceName; +import org.jetbrains.annotations.NotNull; /** Generates new instances for {@code marquez.db.models} with random values used for testing. */ public final class DbModelGenerator extends Generator { @@ -38,4 +42,9 @@ public static NamespaceRow newNamespaceRow() { public static UUID newRowUuid() { return UUID.randomUUID(); } + + @NotNull + public static JobId jobIdFor(String namespaceName, String jobName) { + return new JobId(new NamespaceName(namespaceName), new JobName(jobName)); + } } diff --git a/build.gradle b/build.gradle index 8a89f87cdf..81d5afdffd 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,7 @@ buildscript { dependencies { classpath 'com.adarshr:gradle-test-logger-plugin:3.2.0' classpath 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2' - classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.4.2' + classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.5.1' } } @@ -57,11 +57,11 @@ subprojects { dropwizardVersion = '2.0.29' jacocoVersion = '0.8.7' junit5Version = '5.8.2' - lombokVersion = '1.18.22' - mockitoVersion = '4.4.0' - openlineageVersion = '0.6.2' + lombokVersion = '1.18.24' + mockitoVersion = '4.5.1' + openlineageVersion = '0.8.1' slf4jVersion = '1.7.36' - postgresqlVersion = '42.3.3' + postgresqlVersion = '42.3.4' isReleaseVersion = !version.endsWith('SNAPSHOT') } diff --git a/chart/Chart.lock b/chart/Chart.lock index 602aec2dcd..2d021410f9 100644 --- a/chart/Chart.lock +++ b/chart/Chart.lock @@ -1,9 +1,9 @@ dependencies: - name: common repository: https://charts.bitnami.com/bitnami - version: 1.13.0 + version: 1.13.1 - name: postgresql repository: https://charts.bitnami.com/bitnami - version: 11.1.17 -digest: sha256:92efcad91b772ca799fe293ee6c8fec421ae6b4d78c207599aa01634ee0f67df -generated: "2022-04-06T04:49:31.588003982Z" + version: 11.1.26 +digest: sha256:9f64607528832d10773a573c90ebc7a008350b2527a7de275458510a3e57fdec +generated: "2022-05-03T18:38:42.955188002Z" diff --git a/chart/Chart.yaml b/chart/Chart.yaml index a53c411c8d..e842ed550f 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -5,11 +5,11 @@ dependencies: repository: https://charts.bitnami.com/bitnami tags: - bitnami-common - version: 1.13.0 + version: 1.13.1 - condition: postgresql.enabled name: postgresql repository: https://charts.bitnami.com/bitnami - version: 11.1.17 + version: 11.1.26 description: Marquez is an open source metadata service for the collection, aggregation, and visualization of a data ecosystem's metadata. home: https://github.com/MarquezProject/marquez/tree/main/chart icon: https://raw.githubusercontent.com/MarquezProject/marquez/main/web/src/img/marquez-logo.png @@ -29,4 +29,4 @@ name: marquez sources: - https://github.com/MarquezProject/marquez - https://marquezproject.github.io/marquez/ -version: 0.21.7 +version: 0.21.10 diff --git a/chart/templates/web/deployment.yaml b/chart/templates/web/deployment.yaml index 31e4063f51..39e91dabfb 100644 --- a/chart/templates/web/deployment.yaml +++ b/chart/templates/web/deployment.yaml @@ -39,7 +39,7 @@ spec: port: http env: - name: MARQUEZ_HOST - value: {{ .Values.marquez.hostname | quote }} + value: {{ include "common.names.fullname" . }} - name: MARQUEZ_PORT value: {{ .Values.marquez.port | quote }} {{- if .Values.web.resources }} diff --git a/docker-compose.yml b/docker-compose.yml index 9afa4bcfc0..acb6a169fc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,12 +10,12 @@ services: - "${API_PORT}:${API_PORT}" - "${API_ADMIN_PORT}:${API_ADMIN_PORT}" volumes: - - ./docker/wait-for-it.sh:/usr/src/app/wait-for-it.sh + - utils:/opt/marquez links: - "db:postgres" depends_on: - db - entrypoint: ["./wait-for-it.sh", "db:5432", "--", "./entrypoint.sh"] + entrypoint: ["/opt/marquez/wait-for-it.sh", "db:5432", "--", "./entrypoint.sh"] web: image: "marquezproject/marquez-web:${TAG}" @@ -42,6 +42,10 @@ services: - MARQUEZ_USER=marquez - MARQUEZ_PASSWORD=marquez volumes: - - ./docker/init-db.sh:/docker-entrypoint-initdb.d/init-db.sh + - db-init:/docker-entrypoint-initdb.d # Enables SQL statement logging (see: https://www.postgresql.org/docs/12/runtime-config-logging.html#GUC-LOG-STATEMENT) # command: ["postgres", "-c", "log_statement=all"] + +volumes: + utils: + db-init: \ No newline at end of file diff --git a/docker/up.sh b/docker/up.sh index 8d1ba22a7f..6bb62bfd13 100755 --- a/docker/up.sh +++ b/docker/up.sh @@ -3,6 +3,9 @@ # SPDX-License-Identifier: Apache-2.0 set -e +set -x + +SCRIPTDIR=$(dirname $0) title() { echo -e "\033[1m${1}\033[0m" @@ -104,4 +107,6 @@ if [[ "${SEED}" = "true" ]]; then compose_files+=" -f docker-compose.seed.yml" fi +$SCRIPTDIR/volumes.sh marquez + API_PORT=${API_PORT} API_ADMIN_PORT=${API_ADMIN_PORT} WEB_PORT=${WEB_PORT} TAG="${TAG}" docker-compose $compose_files up $args diff --git a/docker/volumes.sh b/docker/volumes.sh new file mode 100755 index 0000000000..5a30fe7c62 --- /dev/null +++ b/docker/volumes.sh @@ -0,0 +1,14 @@ +#!/bin/sh + +VOLUME_PREFIX=$1 + +UTILS_VOLUME="${VOLUME_PREFIX}_utils" +DB_INIT_VOLUME="${VOLUME_PREFIX}_db-init" + +docker volume create $UTILS_VOLUME +docker volume create $DB_INIT_VOLUME +docker create --name marquez-volume-helper -v $UTILS_VOLUME:/opt/marquez-utils -v $DB_INIT_VOLUME:/opt/marquez-db-init busybox +docker cp ./docker/wait-for-it.sh marquez-volume-helper:/opt/marquez-utils/wait-for-it.sh +docker cp ./docker/init-db.sh marquez-volume-helper:/opt/marquez-db-init/init-db.sh + +docker rm marquez-volume-helper diff --git a/web/package-lock.json b/web/package-lock.json index 52799699d2..7141aaec34 100644 --- a/web/package-lock.json +++ b/web/package-lock.json @@ -39,7 +39,7 @@ "dagre": "^0.8.5", "http-proxy-middleware": "^0.20.0", "lodash": "^4.17.21", - "moment": "^2.29.1", + "moment": "^2.29.2", "postcss-loader": "^3.0.0", "postcss-modules-values": "^2.0.0", "react": "^16.8.0", @@ -11889,9 +11889,9 @@ } }, "node_modules/moment": { - "version": "2.29.1", - "resolved": "https://registry.npmjs.org/moment/-/moment-2.29.1.tgz", - "integrity": "sha512-kHmoybcPV8Sqy59DwNDY3Jefr64lK/by/da0ViFcuA4DH0vQg5Q6Ze5VimxkfQNSC+Mls/Kx53s7TjP1RhFEDQ==", + "version": "2.29.2", + "resolved": "https://registry.npmjs.org/moment/-/moment-2.29.2.tgz", + "integrity": "sha512-UgzG4rvxYpN15jgCmVJwac49h9ly9NurikMWGPdVxm8GZD6XjkKPxDTjQQ43gtGgnV3X0cAyWDdP2Wexoquifg==", "engines": { "node": "*" } @@ -25783,9 +25783,9 @@ } }, "moment": { - "version": "2.29.1", - "resolved": "https://registry.npmjs.org/moment/-/moment-2.29.1.tgz", - "integrity": "sha512-kHmoybcPV8Sqy59DwNDY3Jefr64lK/by/da0ViFcuA4DH0vQg5Q6Ze5VimxkfQNSC+Mls/Kx53s7TjP1RhFEDQ==" + "version": "2.29.2", + "resolved": "https://registry.npmjs.org/moment/-/moment-2.29.2.tgz", + "integrity": "sha512-UgzG4rvxYpN15jgCmVJwac49h9ly9NurikMWGPdVxm8GZD6XjkKPxDTjQQ43gtGgnV3X0cAyWDdP2Wexoquifg==" }, "moo": { "version": "0.4.3", diff --git a/web/package.json b/web/package.json index d61d28806c..36816676f0 100644 --- a/web/package.json +++ b/web/package.json @@ -47,7 +47,7 @@ "dagre": "^0.8.5", "http-proxy-middleware": "^0.20.0", "lodash": "^4.17.21", - "moment": "^2.29.1", + "moment": "^2.29.2", "postcss-loader": "^3.0.0", "postcss-modules-values": "^2.0.0", "react": "^16.8.0", diff --git a/web/src/__tests__/reducers/jobs.test.ts b/web/src/__tests__/reducers/jobs.test.ts index 0abb853706..24590647d4 100644 --- a/web/src/__tests__/reducers/jobs.test.ts +++ b/web/src/__tests__/reducers/jobs.test.ts @@ -2,6 +2,7 @@ import * as actionTypes from '../../store/actionCreators/actionTypes' import jobsReducer, { initialState } from '../../store/reducers/jobs' +import { stopWatchDuration } from "../../helpers/time"; const jobs = require('../../../docker/db/data/jobs.json') @@ -17,3 +18,45 @@ describe('jobs reducer', () => { expect(jobsReducer(initialState, action)).toStrictEqual({ isLoading: false, result: jobs, init: true }) }) }) + +describe('stopWatchDuration', () => { + const oneMinute = 60 * 1000; + const oneHour = 60 * oneMinute; + const oneDay = 24 * oneHour; + + it('more than one week', () => { + const value = stopWatchDuration(oneDay * 9) + expect("9d 0h 0m 0s").toBe(value); + }) + + it('more than one day', () => { + const value = stopWatchDuration(oneDay + oneHour) + expect("1d 1h 0m 0s").toBe(value); + }) + + it('less than one day', () => { + const value = stopWatchDuration(oneDay - 1000); + expect("23h 59m 59s").toBe(value); + }) + + it('less than one hour', () => { + const value = stopWatchDuration(oneHour - 1000); + expect("59m 59s").toBe(value); + }) + + it('less than one minute', () => { + const value = stopWatchDuration(oneMinute - 1000); + expect("0m 59s").toBe(value); + }) + + it('less than one second', () => { + const value = stopWatchDuration(999); + expect("999 ms").toBe(value); + }) + + it('no time', () => { + const value = stopWatchDuration(0); + expect("0").toBe(value); + }) + +}) diff --git a/web/src/components/core/code/MqCode.tsx b/web/src/components/core/code/MqCode.tsx index 38891d645b..74f4ec6e96 100644 --- a/web/src/components/core/code/MqCode.tsx +++ b/web/src/components/core/code/MqCode.tsx @@ -1,31 +1,30 @@ // SPDX-License-Identifier: Apache-2.0 -import { THEME_EXTRA } from '../../../helpers/theme' -import { Theme, alpha } from '@material-ui/core/styles' +import { THEME_EXTRA, theme } from '../../../helpers/theme' +import { alpha } from '@material-ui/core/styles' +import { ocean } from 'react-syntax-highlighter/dist/cjs/styles/hljs' import Box from '@material-ui/core/Box' import MqText from '../text/MqText' import React from 'react' +import SyntaxHighlighter from 'react-syntax-highlighter' import createStyles from '@material-ui/core/styles/createStyles' import withStyles, { WithStyles } from '@material-ui/core/styles/withStyles' -const styles = (theme: Theme) => - createStyles({ - codeContainer: { - padding: `${theme.spacing(2)}px ${theme.spacing(4)}px`, - backgroundColor: alpha(theme.palette.common.white, 0.1), - borderLeft: `2px dashed ${THEME_EXTRA.typography.subdued}`, - whiteSpace: 'pre-wrap' - } - }) +const styles = () => createStyles({}) interface OwnProps { code?: string + language?: string description?: string } -const MqCode: React.FC> = ({ code, description, classes }) => { +const MqCode: React.FC> = ({ + code, + description, + language +}) => { return ( - + {description && ( @@ -33,9 +32,17 @@ const MqCode: React.FC> = ({ code, descript )} - - {code ? code : 'Nothing to show here'} - + + {code ? code : 'No code available'} + ) } diff --git a/web/src/components/core/code/MqJson.tsx b/web/src/components/core/code/MqJson.tsx index 0ec94c9b28..a58015d240 100644 --- a/web/src/components/core/code/MqJson.tsx +++ b/web/src/components/core/code/MqJson.tsx @@ -17,7 +17,8 @@ const MqJson: React.FC = ({ code }) => { style={ocean} customStyle={{ backgroundColor: alpha(theme.palette.common.white, 0.1), - borderLeft: `2px dashed ${THEME_EXTRA.typography.subdued}` + borderLeft: `2px dashed ${THEME_EXTRA.typography.subdued}`, + padding: theme.spacing(2) }} > {JSON.stringify(code, null, ' ')} diff --git a/web/src/components/datasets/DatasetDetailPage.tsx b/web/src/components/datasets/DatasetDetailPage.tsx index 4ec841490d..08a251b930 100644 --- a/web/src/components/datasets/DatasetDetailPage.tsx +++ b/web/src/components/datasets/DatasetDetailPage.tsx @@ -140,7 +140,13 @@ const DatasetDetailPage: FunctionComponent = props => { {description} - {tab === 0 && } + {tab === 0 && ( + + )} {tab === 1 && } ) diff --git a/web/src/components/datasets/DatasetInfo.tsx b/web/src/components/datasets/DatasetInfo.tsx index a81901911e..8c656f231c 100644 --- a/web/src/components/datasets/DatasetInfo.tsx +++ b/web/src/components/datasets/DatasetInfo.tsx @@ -1,21 +1,25 @@ // SPDX-License-Identifier: Apache-2.0 import { Box, Table, TableBody, TableCell, TableHead, TableRow } from '@material-ui/core' -import { Field } from '../../types/api' +import { Field, Run } from '../../types/api' +import { stopWatchDuration } from '../../helpers/time' +import MqCode from '../core/code/MqCode' import MqEmpty from '../core/empty/MqEmpty' import MqJson from '../core/code/MqJson' import MqText from '../core/text/MqText' import React, { FunctionComponent } from 'react' +import RunStatus from '../jobs/RunStatus' const DATASET_COLUMNS = ['Field', 'Type', 'Description'] interface DatasetInfoProps { datasetFields: Field[] facets?: object + run?: Run } const DatasetInfo: FunctionComponent = props => { - const { datasetFields, facets } = props + const { datasetFields, facets, run } = props if (datasetFields.length === 0) { return @@ -57,6 +61,24 @@ const DatasetInfo: FunctionComponent = props => { )} + {run && ( + + + + + + Created by Run + + + Duration:  + {stopWatchDuration(run.durationMs)} + + + {run.jobVersion.name} + + + + )} ) } diff --git a/web/src/components/datasets/DatasetVersions.tsx b/web/src/components/datasets/DatasetVersions.tsx index 9a85d11cea..c82a16f65d 100644 --- a/web/src/components/datasets/DatasetVersions.tsx +++ b/web/src/components/datasets/DatasetVersions.tsx @@ -11,6 +11,7 @@ import DatasetInfo from './DatasetInfo' import IconButton from '@material-ui/core/IconButton' import MqText from '../core/text/MqText' import React, { FunctionComponent, SetStateAction } from 'react' +import RunStatus from '../jobs/RunStatus' import transitions from '@material-ui/core/styles/transitions' const styles = (theme: ITheme) => { @@ -25,7 +26,7 @@ const styles = (theme: ITheme) => { }) } -const DATASET_VERSIONS_COLUMNS = ['Version', 'Created At', 'Field Count', 'Lifecycle State'] +const DATASET_VERSIONS_COLUMNS = ['Version', 'Created At', 'Field Count', 'Dataset Creator (Run)', 'Lifecycle State'] interface DatasetVersionsProps { versions: DatasetVersion[] @@ -53,7 +54,11 @@ const DatasetVersions: FunctionComponent< - + ) } @@ -83,6 +88,18 @@ const DatasetVersions: FunctionComponent< {version.version} {formatUpdatedAt(version.createdAt)} {version.fields.length} + + + {version.createdByRun ? ( + <> + + {version.createdByRun ? version.createdByRun.id : 'N/A'} + + ) : ( + 'N/A' + )} + + {version.lifecycleState} ) diff --git a/web/src/helpers/time.ts b/web/src/helpers/time.ts index 54f5192126..40d75a0c5f 100644 --- a/web/src/helpers/time.ts +++ b/web/src/helpers/time.ts @@ -14,13 +14,15 @@ export function stopWatchDuration(durationMs: number) { if (duration.asMilliseconds() === 0) { return '0' } - - if (duration.asHours() > 1) { - return `${duration.hours()}h ${addLeadingZero(duration.seconds())}s` + if (duration.asHours() > 24) { + return `${duration.days()}d ${duration.hours()}h ${duration.minutes()}m ${duration.seconds()}s` + } + if (duration.asMinutes() > 60) { + return `${duration.hours()}h ${duration.minutes()}m ${duration.seconds()}s` } if (duration.asSeconds() > 1) { return `${duration.minutes()}m ${addLeadingZero(duration.seconds())}s` } else { - return `${duration.milliseconds()} ms` + return `${duration.asMilliseconds()} ms` } } diff --git a/web/src/store/requests/jobs.ts b/web/src/store/requests/jobs.ts index 2da2d35112..ed52ff8729 100644 --- a/web/src/store/requests/jobs.ts +++ b/web/src/store/requests/jobs.ts @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 import { API_URL } from '../../globals' -import { Job, Jobs, Run } from '../../types/api' +import { Jobs } from '../../types/api' import { genericFetchWrapper } from './index' export const getJobs = async (namespace: string, limit = 2000, offset = 0) => { diff --git a/web/src/store/requests/lineage.ts b/web/src/store/requests/lineage.ts index 0664d25b19..cc4d10bb21 100644 --- a/web/src/store/requests/lineage.ts +++ b/web/src/store/requests/lineage.ts @@ -2,7 +2,6 @@ import { API_URL } from '../../globals' import { JobOrDataset } from '../../components/lineage/types' -import { LineageGraph } from '../../types/api' import { generateNodeId } from '../../helpers/nodes' import { genericFetchWrapper } from './index' diff --git a/web/src/store/requests/namespaces.ts b/web/src/store/requests/namespaces.ts index 8db2ee779b..8d2e6f39b3 100644 --- a/web/src/store/requests/namespaces.ts +++ b/web/src/store/requests/namespaces.ts @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 import { API_URL } from '../../globals' -import { Namespaces } from '../../types/api' import { genericFetchWrapper } from './index' export const getNamespaces = async () => { diff --git a/web/src/store/requests/search.ts b/web/src/store/requests/search.ts index 3b1332304a..f7bd1f737c 100644 --- a/web/src/store/requests/search.ts +++ b/web/src/store/requests/search.ts @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 import { API_URL } from '../../globals' -import { Search } from '../../types/api' import { genericFetchWrapper } from './index' export const getSearch = async (q: string, filter = 'ALL', sort = 'NAME', limit = 100) => { diff --git a/web/src/types/api.ts b/web/src/types/api.ts index 6d4ffd70e4..73e41551fa 100644 --- a/web/src/types/api.ts +++ b/web/src/types/api.ts @@ -127,6 +127,11 @@ export interface Run { nominalStartTime: string nominalEndTime: string state: RunState + jobVersion: { + name: string + namespace: string + version: string + } startedAt: string endedAt: string durationMs: number