From d04951f92836dfed54bcbcb36a5b64c779f9b546 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Mon, 24 Jan 2022 11:03:10 +0100 Subject: [PATCH] add operation column Signed-off-by: Pawel Leszczynski --- api/src/main/java/marquez/common/Utils.java | 5 ++ api/src/main/java/marquez/db/Columns.java | 11 ++++ api/src/main/java/marquez/db/DatasetDao.java | 29 +++++---- .../java/marquez/db/DatasetVersionDao.java | 17 +++--- api/src/main/java/marquez/db/LineageDao.java | 2 +- .../main/java/marquez/db/OpenLineageDao.java | 15 ++++- api/src/main/java/marquez/db/RunDao.java | 4 +- .../marquez/db/mappers/DatasetDataMapper.java | 3 +- .../marquez/db/mappers/DatasetMapper.java | 9 ++- .../marquez/db/mappers/DatasetRowMapper.java | 4 +- .../db/mappers/DatasetVersionMapper.java | 2 + .../db/mappers/DatasetVersionRowMapper.java | 2 + .../ExtendedDatasetVersionRowMapper.java | 1 + .../java/marquez/db/models/DatasetData.java | 5 ++ .../java/marquez/db/models/DatasetRow.java | 1 + .../marquez/db/models/DatasetVersionRow.java | 1 + .../db/models/ExtendedDatasetVersionRow.java | 4 +- .../java/marquez/service/models/Dataset.java | 12 +++- .../service/models/DatasetVersion.java | 7 +++ .../java/marquez/service/models/DbTable.java | 8 ++- .../service/models/DbTableVersion.java | 2 + .../marquez/service/models/LineageEvent.java | 30 +++++++++- .../java/marquez/service/models/Stream.java | 8 ++- .../marquez/service/models/StreamVersion.java | 2 + ...V40__add_operation_to_dataset_versions.sql | 2 + .../test/java/marquez/common/UtilsTest.java | 11 +++- .../common/models/CommonModelGenerator.java | 4 ++ api/src/test/java/marquez/db/ColumnsTest.java | 19 ++++++ .../test/java/marquez/db/DatasetDaoTest.java | 59 +++++++++++++++++++ .../test/java/marquez/db/LineageDaoTest.java | 33 +++++++++++ .../java/marquez/db/OpenLineageDaoTest.java | 26 ++++++++ .../marquez/db/mappers/DatasetMapperTest.java | 2 + .../service/models/ServiceModelGenerator.java | 5 +- .../mappers/full_dataset_mapper.json | 1 + .../components/datasets/DatasetVersions.tsx | 3 +- web/src/types/api.ts | 1 + 36 files changed, 313 insertions(+), 37 deletions(-) create mode 100644 api/src/main/resources/marquez/db/migration/V40__add_operation_to_dataset_versions.sql diff --git a/api/src/main/java/marquez/common/Utils.java b/api/src/main/java/marquez/common/Utils.java index 2def865580..6859b05517 100644 --- a/api/src/main/java/marquez/common/Utils.java +++ b/api/src/main/java/marquez/common/Utils.java @@ -204,6 +204,7 @@ public static Version newJobVersionFor( * @param sourceName The source name of the dataset. * @param physicalName The physical name of the dataset. * @param datasetName The dataset name. + * @param lifecycleStateChange The dataset change like CREATE, DROP, TRUNCATE. * @param fields The fields of the dataset. * @param runId The UUID of the run linked to the dataset. * @return A {@link Version} object based on the specified job meta. @@ -213,6 +214,7 @@ public static Version newDatasetVersionFor( String sourceName, String physicalName, String datasetName, + String lifecycleStateChange, List fields, UUID runId) { DatasetVersionData data = @@ -221,6 +223,7 @@ public static Version newDatasetVersionFor( .sourceName(sourceName) .physicalName(physicalName) .datasetName(datasetName) + .lifecycleStateChange(lifecycleStateChange) .schemaFields(fields) .runId(runId) .build(); @@ -259,6 +262,7 @@ private static Version newDatasetVersionFor(DatasetVersionData data) { data.getPhysicalName(), data.getSchemaLocation(), data.getFields().stream().map(Utils::joinField).collect(joining(VERSION_DELIM)), + data.getLifecycleStateChange(), data.getRunId()) .getBytes(UTF_8); return Version.of(UUID.nameUUIDFromBytes(bytes)); @@ -275,6 +279,7 @@ private static class DatasetVersionData { private String sourceName; private String physicalName; private String datasetName; + private String lifecycleStateChange; private String schemaLocation; private Set> fields; private UUID runId; diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index 2be2b56f41..0126ca6aae 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -69,9 +69,11 @@ private Columns() {} public static final String TAG_UUIDS = "tag_uuids"; public static final String TAGGED_AT = "tagged_at"; public static final String LAST_MODIFIED_AT = "last_modified_at"; + public static final String IS_DELETED = "is_deleted"; /* DATASET VERSION ROW COLUMNS */ public static final String FIELD_UUIDS = "field_uuids"; + public static final String LIFECYCLE_STATE_CHANGE = "lifecycle_state_change"; /* STREAM VERSION ROW COLUMNS */ public static final String SCHEMA_LOCATION = "schema_location"; @@ -160,6 +162,15 @@ public static String stringOrThrow(final ResultSet results, final String column) return results.getString(column); } + public static boolean booleanOrDefault( + final ResultSet results, final String column, final boolean defaultValue) + throws SQLException { + if (results.getObject(column) == null) { + return defaultValue; + } + return results.getBoolean(column); + } + public static int intOrThrow(final ResultSet results, final String column) throws SQLException { if (results.getObject(column) == null) { throw new IllegalArgumentException(); diff --git a/api/src/main/java/marquez/db/DatasetDao.java b/api/src/main/java/marquez/db/DatasetDao.java index 9eac597321..b02c3c6734 100644 --- a/api/src/main/java/marquez/db/DatasetDao.java +++ b/api/src/main/java/marquez/db/DatasetDao.java @@ -71,8 +71,9 @@ void updateLastModifiedAt( + " FROM datasets d\n" + " WHERE d.namespace_name = :namespaceName\n" + " AND d.name = :datasetName\n" + + " AND d.is_deleted = FALSE\n" + "), dataset_runs AS (\n" - + " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, event_time, event\n" + + " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state_change, event_time, event\n" + " FROM selected_datasets d\n" + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n" + " LEFT JOIN LATERAL (\n" @@ -80,7 +81,7 @@ void updateLastModifiedAt( + " WHERE run_uuid = dv.run_uuid\n" + " ) e ON e.run_uuid = dv.run_uuid\n" + " UNION\n" - + " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, event_time, event\n" + + " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state_change, event_time, event\n" + " FROM selected_datasets d\n" + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n" + " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n" @@ -89,7 +90,7 @@ void updateLastModifiedAt( + " WHERE run_uuid = rim.run_uuid\n" + " ) e ON e.run_uuid = rim.run_uuid\n" + ")\n" - + "SELECT d.*, dv.fields, sv.schema_location, t.tags, facets\n" + + "SELECT d.*, dv.fields, dv.lifecycle_state_change, sv.schema_location, t.tags, facets\n" + "FROM selected_datasets d\n" + "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n" + "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n" @@ -142,7 +143,7 @@ default void setFields(Dataset ds) { + " ORDER BY d.name\n" + " LIMIT :limit OFFSET :offset\n" + "), dataset_runs AS (\n" - + " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, event_time, event\n" + + " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state_change, event_time, event\n" + " FROM selected_datasets d\n" + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n" + " LEFT JOIN LATERAL (\n" @@ -150,7 +151,7 @@ default void setFields(Dataset ds) { + " WHERE run_uuid = dv.run_uuid\n" + " ) e ON e.run_uuid = dv.run_uuid\n" + " UNION\n" - + " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, event_time, event\n" + + " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state_change, event_time, event\n" + " FROM selected_datasets d\n" + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n" + " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n" @@ -159,7 +160,7 @@ default void setFields(Dataset ds) { + " WHERE run_uuid = rim.run_uuid\n" + " ) e ON e.run_uuid = rim.run_uuid\n" + ")\n" - + "SELECT d.*, dv.fields, sv.schema_location, t.tags, facets\n" + + "SELECT d.*, dv.fields, dv.lifecycle_state_change, sv.schema_location, t.tags, facets\n" + "FROM selected_datasets d\n" + "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n" + "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n" @@ -205,7 +206,8 @@ default List findAllWithTags(String namespaceName, int limit, int offse + "source_name, " + "name, " + "physical_name, " - + "description " + + "description, " + + "is_deleted " + ") VALUES ( " + ":uuid, " + ":type, " @@ -217,13 +219,15 @@ default List findAllWithTags(String namespaceName, int limit, int offse + ":sourceName, " + ":name, " + ":physicalName, " - + ":description) " + + ":description, " + + ":isDeleted) " + "ON CONFLICT (namespace_uuid, name) " + "DO UPDATE SET " + "type = EXCLUDED.type, " + "updated_at = EXCLUDED.updated_at, " + "physical_name = EXCLUDED.physical_name, " - + "description = EXCLUDED.description " + + "description = EXCLUDED.description, " + + "is_deleted = EXCLUDED.is_deleted " + "RETURNING *") DatasetRow upsert( UUID uuid, @@ -235,7 +239,8 @@ DatasetRow upsert( String sourceName, String name, String physicalName, - String description); + String description, + boolean isDeleted); @SqlQuery( "INSERT INTO datasets (" @@ -308,7 +313,8 @@ default Dataset upsertDatasetMeta( sourceRow.getName(), datasetName.getValue(), datasetMeta.getPhysicalName().getValue(), - datasetMeta.getDescription().orElse(null)); + datasetMeta.getDescription().orElse(null), + false); } else { datasetRow = upsert( @@ -340,6 +346,7 @@ default Dataset upsertDatasetMeta( now, namespaceName.getValue(), datasetName.getValue(), + null, datasetMeta); return findWithTags(namespaceName.getValue(), datasetName.getValue()).get(); diff --git a/api/src/main/java/marquez/db/DatasetVersionDao.java b/api/src/main/java/marquez/db/DatasetVersionDao.java index ee99d4e53a..a8bfe0e769 100644 --- a/api/src/main/java/marquez/db/DatasetVersionDao.java +++ b/api/src/main/java/marquez/db/DatasetVersionDao.java @@ -48,6 +48,7 @@ default DatasetVersionRow upsertDatasetVersion( Instant now, String namespaceName, String datasetName, + String lifecycleStateChange, DatasetMeta datasetMeta) { TagDao tagDao = createTagDao(); DatasetFieldDao datasetFieldDao = createDatasetFieldDao(); @@ -63,7 +64,8 @@ default DatasetVersionRow upsertDatasetVersion( datasetMeta.getRunId().map(RunId::getValue).orElse(null), toPgObjectFields(datasetMeta.getFields()), namespaceName, - datasetName); + datasetName, + lifecycleStateChange); updateDatasetVersionMetric( namespaceName, datasetMeta.getType().toString(), @@ -167,7 +169,7 @@ default void updateDatasetVersionMetric( + " FROM selected_dataset_version_runs dv\n" + " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n" + ")\n" - + "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n" + + "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state_change, \n" + " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n" + " t.tags, f.facets\n" + "FROM selected_dataset_versions dv\n" @@ -209,7 +211,7 @@ default void updateDatasetVersionMetric( + " FROM selected_dataset_version_runs dv\n" + " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n" + ")\n" - + "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n" + + "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state_change, \n" + " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n" + " t.tags, f.facets\n" + "FROM selected_dataset_versions dv\n" @@ -280,7 +282,7 @@ default Optional findByWithRun(UUID version) { + " FROM selected_dataset_version_runs dv\n" + " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n" + ")\n" - + "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n" + + "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state_change,\n" + " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n" + " t.tags, f.facets\n" + "FROM selected_dataset_versions dv\n" @@ -324,9 +326,9 @@ default List findAllWithRun( @SqlQuery( "INSERT INTO dataset_versions " - + "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name) " + + "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name, lifecycle_state_change) " + "VALUES " - + "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName) " + + "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName, :lifecycleStateChange) " + "ON CONFLICT(version) " + "DO UPDATE SET " + "run_uuid = EXCLUDED.run_uuid " @@ -339,7 +341,8 @@ DatasetVersionRow upsert( UUID runUuid, PGobject fields, String namespaceName, - String datasetName); + String datasetName, + String lifecycleStateChange); @SqlUpdate("UPDATE dataset_versions SET fields = :fields WHERE uuid = :uuid") void updateFields(UUID uuid, PGobject fields); diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 3ad077778a..b9e26c6788 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -67,7 +67,7 @@ public interface LineageDao { Optional getJobUuid(String jobName, String namespace); @SqlQuery( - "SELECT ds.*, dv.fields\n" + "SELECT ds.*, dv.fields, dv.lifecycle_state_change\n" + "FROM datasets ds\n" + "LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid\n" + "WHERE ds.uuid IN ();") diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 3d28226faa..ea09da5d2b 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -41,6 +41,7 @@ import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.DatasetFacets; import marquez.service.models.LineageEvent.Job; +import marquez.service.models.LineageEvent.LifecycleStateChangeFacet; import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; import org.jdbi.v3.sqlobject.statement.SqlUpdate; @@ -350,6 +351,12 @@ default DatasetRecord upsertLineageDataset( formatNamespaceName(ds.getNamespace()), DEFAULT_NAMESPACE_OWNER); + String dsLifecycleStateChange = + Optional.ofNullable(ds.getFacets()) + .map(DatasetFacets::getLifecycleStateChange) + .map(LifecycleStateChangeFacet::getLifeCycleStateChange) + .orElse(""); + DatasetRow datasetRow = datasetDao.upsert( UUID.randomUUID(), @@ -361,7 +368,8 @@ default DatasetRecord upsertLineageDataset( source.getName(), formatDatasetName(ds.getName()), ds.getName(), - dsDescription); + dsDescription, + dsLifecycleStateChange.equalsIgnoreCase("DROP")); List fields = Optional.ofNullable(ds.getFacets()) @@ -385,6 +393,7 @@ default DatasetRecord upsertLineageDataset( source.getName(), dsRow.getPhysicalName(), dsRow.getName(), + dsLifecycleStateChange, fields, runUuid) .getValue(); @@ -397,8 +406,8 @@ default DatasetRecord upsertLineageDataset( isInput ? null : runUuid, datasetVersionDao.toPgObjectSchemaFields(fields), dsNamespace.getName(), - ds.getName()); - + ds.getName(), + dsLifecycleStateChange); return row; }); List datasetFieldMappings = new ArrayList<>(); diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 6dc2e9c548..1a3e0a5639 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -277,6 +277,7 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet runOu d.getSourceName().getValue(), d.getPhysicalName().getValue(), d.getName().getValue(), + null, toSchemaFields(d.getFields()), runUuid) .getValue(); @@ -288,7 +289,8 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet runOu runUuid, datasetVersionDao.toPgObjectFields(d.getFields()), d.getNamespace().getValue(), - d.getName().getValue()); + d.getName().getValue(), + null); }); } } diff --git a/api/src/main/java/marquez/db/mappers/DatasetDataMapper.java b/api/src/main/java/marquez/db/mappers/DatasetDataMapper.java index 2a21de7802..ea432a01f8 100644 --- a/api/src/main/java/marquez/db/mappers/DatasetDataMapper.java +++ b/api/src/main/java/marquez/db/mappers/DatasetDataMapper.java @@ -52,7 +52,8 @@ public DatasetData map(@NonNull ResultSet results, @NonNull StatementContext con toFields(results, "fields"), ImmutableSet.of(), timestampOrNull(results, Columns.LAST_MODIFIED_AT), - stringOrNull(results, Columns.DESCRIPTION)); + stringOrNull(results, Columns.DESCRIPTION), + stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE)); } public static ImmutableList toFields(ResultSet results, String column) diff --git a/api/src/main/java/marquez/db/mappers/DatasetMapper.java b/api/src/main/java/marquez/db/mappers/DatasetMapper.java index 2fef22e829..1cb90db861 100644 --- a/api/src/main/java/marquez/db/mappers/DatasetMapper.java +++ b/api/src/main/java/marquez/db/mappers/DatasetMapper.java @@ -2,6 +2,7 @@ package marquez.db.mappers; +import static marquez.db.Columns.booleanOrDefault; import static marquez.db.Columns.stringArrayOrThrow; import static marquez.db.Columns.stringOrNull; import static marquez.db.Columns.stringOrThrow; @@ -62,9 +63,11 @@ public Dataset map(@NonNull ResultSet results, @NonNull StatementContext context toFields(results, "fields"), toTags(results, "tags"), timestampOrNull(results, Columns.LAST_MODIFIED_AT), + stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE), stringOrNull(results, Columns.DESCRIPTION), uuidOrNull(results, Columns.CURRENT_VERSION_UUID), - toFacetsOrNull(results, Columns.FACETS)); + toFacetsOrNull(results, Columns.FACETS), + booleanOrDefault(results, Columns.IS_DELETED, false)); } else { return new Stream( new DatasetId( @@ -79,9 +82,11 @@ public Dataset map(@NonNull ResultSet results, @NonNull StatementContext context toFields(results, "fields"), toTags(results, "tags"), timestampOrNull(results, Columns.LAST_MODIFIED_AT), + stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE), stringOrNull(results, Columns.DESCRIPTION), uuidOrNull(results, Columns.CURRENT_VERSION_UUID), - toFacetsOrNull(results, Columns.FACETS)); + toFacetsOrNull(results, Columns.FACETS), + booleanOrDefault(results, Columns.IS_DELETED, false)); } } diff --git a/api/src/main/java/marquez/db/mappers/DatasetRowMapper.java b/api/src/main/java/marquez/db/mappers/DatasetRowMapper.java index 9a6a8461f5..71969f2775 100644 --- a/api/src/main/java/marquez/db/mappers/DatasetRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/DatasetRowMapper.java @@ -2,6 +2,7 @@ package marquez.db.mappers; +import static marquez.db.Columns.booleanOrDefault; import static marquez.db.Columns.stringOrNull; import static marquez.db.Columns.stringOrThrow; import static marquez.db.Columns.timestampOrNull; @@ -32,6 +33,7 @@ public DatasetRow map(@NonNull ResultSet results, @NonNull StatementContext cont stringOrThrow(results, Columns.PHYSICAL_NAME), timestampOrNull(results, Columns.LAST_MODIFIED_AT), stringOrNull(results, Columns.DESCRIPTION), - uuidOrNull(results, Columns.CURRENT_VERSION_UUID)); + uuidOrNull(results, Columns.CURRENT_VERSION_UUID), + booleanOrDefault(results, Columns.IS_DELETED, false)); } } diff --git a/api/src/main/java/marquez/db/mappers/DatasetVersionMapper.java b/api/src/main/java/marquez/db/mappers/DatasetVersionMapper.java index 8042a33596..5084835f62 100644 --- a/api/src/main/java/marquez/db/mappers/DatasetVersionMapper.java +++ b/api/src/main/java/marquez/db/mappers/DatasetVersionMapper.java @@ -54,6 +54,7 @@ public DatasetVersion map(@NonNull ResultSet results, @NonNull StatementContext toFields(results, "fields"), columnNames.contains("tags") ? toTags(results, "tags") : null, stringOrNull(results, Columns.DESCRIPTION), + stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE), null, toFacetsOrNull(results, Columns.FACETS)); } else { @@ -71,6 +72,7 @@ public DatasetVersion map(@NonNull ResultSet results, @NonNull StatementContext toFields(results, "fields"), columnNames.contains("tags") ? toTags(results, "tags") : null, stringOrNull(results, Columns.DESCRIPTION), + stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE), null, toFacetsOrNull(results, Columns.FACETS)); } diff --git a/api/src/main/java/marquez/db/mappers/DatasetVersionRowMapper.java b/api/src/main/java/marquez/db/mappers/DatasetVersionRowMapper.java index 3854498984..c204abd7ea 100644 --- a/api/src/main/java/marquez/db/mappers/DatasetVersionRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/DatasetVersionRowMapper.java @@ -2,6 +2,7 @@ package marquez.db.mappers; +import static marquez.db.Columns.stringOrNull; import static marquez.db.Columns.timestampOrThrow; import static marquez.db.Columns.uuidOrNull; import static marquez.db.Columns.uuidOrThrow; @@ -23,6 +24,7 @@ public DatasetVersionRow map(@NonNull ResultSet results, @NonNull StatementConte timestampOrThrow(results, Columns.CREATED_AT), uuidOrThrow(results, Columns.DATASET_UUID), uuidOrThrow(results, Columns.VERSION), + stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE), uuidOrNull(results, Columns.RUN_UUID)); } } diff --git a/api/src/main/java/marquez/db/mappers/ExtendedDatasetVersionRowMapper.java b/api/src/main/java/marquez/db/mappers/ExtendedDatasetVersionRowMapper.java index 11143e6c21..09eedfc4da 100644 --- a/api/src/main/java/marquez/db/mappers/ExtendedDatasetVersionRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/ExtendedDatasetVersionRowMapper.java @@ -24,6 +24,7 @@ public ExtendedDatasetVersionRow map( timestampOrThrow(results, Columns.CREATED_AT), uuidOrThrow(results, Columns.DATASET_UUID), uuidOrThrow(results, Columns.VERSION), + stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE), uuidOrNull(results, Columns.RUN_UUID), stringOrNull(results, Columns.NAMESPACE_NAME), stringOrNull(results, Columns.DATASET_NAME)); diff --git a/api/src/main/java/marquez/db/models/DatasetData.java b/api/src/main/java/marquez/db/models/DatasetData.java index a435a664d0..f83ae32fe3 100644 --- a/api/src/main/java/marquez/db/models/DatasetData.java +++ b/api/src/main/java/marquez/db/models/DatasetData.java @@ -36,6 +36,7 @@ public class DatasetData implements NodeData { @NonNull ImmutableSet tags; @Nullable Instant lastModifiedAt; @Nullable String description; + @Nullable String lastLifecycleStateChange; public Optional getLastModifiedAt() { return Optional.ofNullable(lastModifiedAt); @@ -45,6 +46,10 @@ public Optional getDescription() { return Optional.ofNullable(description); } + public Optional getLastLifecycleStateChange() { + return Optional.ofNullable(lastLifecycleStateChange); + } + @JsonIgnore public UUID getUuid() { return uuid; diff --git a/api/src/main/java/marquez/db/models/DatasetRow.java b/api/src/main/java/marquez/db/models/DatasetRow.java index 1f695c16ab..0e57f6bb03 100644 --- a/api/src/main/java/marquez/db/models/DatasetRow.java +++ b/api/src/main/java/marquez/db/models/DatasetRow.java @@ -28,6 +28,7 @@ public class DatasetRow { @Nullable private final Instant lastModifiedAt; @Nullable private final String description; @With @Nullable private final UUID currentVersionUuid; + @Getter private final boolean isDeleted; public Optional getLastModifiedAt() { return Optional.ofNullable(lastModifiedAt); diff --git a/api/src/main/java/marquez/db/models/DatasetVersionRow.java b/api/src/main/java/marquez/db/models/DatasetVersionRow.java index fafd17470c..008e27df7c 100644 --- a/api/src/main/java/marquez/db/models/DatasetVersionRow.java +++ b/api/src/main/java/marquez/db/models/DatasetVersionRow.java @@ -20,6 +20,7 @@ public class DatasetVersionRow { @Getter @NonNull private final Instant createdAt; @Getter @NonNull private final UUID datasetUuid; @Getter @NonNull private final UUID version; + @Getter @Nullable private final String lifecycleStateChange; @Nullable private final UUID runUuid; public Optional getRunUuid() { diff --git a/api/src/main/java/marquez/db/models/ExtendedDatasetVersionRow.java b/api/src/main/java/marquez/db/models/ExtendedDatasetVersionRow.java index b1084737d3..7fb3b55f36 100644 --- a/api/src/main/java/marquez/db/models/ExtendedDatasetVersionRow.java +++ b/api/src/main/java/marquez/db/models/ExtendedDatasetVersionRow.java @@ -4,6 +4,7 @@ import java.time.Instant; import java.util.UUID; +import javax.annotation.Nullable; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; @@ -20,10 +21,11 @@ public ExtendedDatasetVersionRow( @NonNull Instant createdAt, @NonNull UUID datasetUuid, @NonNull UUID version, + @Nullable String lifecycleStateChange, UUID runUuid, @NonNull final String namespaceName, @NonNull final String datasetName) { - super(uuid, createdAt, datasetUuid, version, runUuid); + super(uuid, createdAt, datasetUuid, version, lifecycleStateChange, runUuid); this.namespaceName = namespaceName; this.datasetName = datasetName; } diff --git a/api/src/main/java/marquez/service/models/Dataset.java b/api/src/main/java/marquez/service/models/Dataset.java index c72128835f..080ad88fc0 100644 --- a/api/src/main/java/marquez/service/models/Dataset.java +++ b/api/src/main/java/marquez/service/models/Dataset.java @@ -47,9 +47,11 @@ public abstract class Dataset { @Getter @Setter private List fields; @Getter private final ImmutableSet tags; @Nullable private final Instant lastModifiedAt; + @Nullable private final String lastLifecycleStateChange; @Nullable private final String description; @Nullable private final UUID currentVersion; @Getter ImmutableMap facets; + @Getter private final boolean isDeleted; public Dataset( @NonNull final DatasetId id, @@ -62,9 +64,11 @@ public Dataset( @Nullable final ImmutableList fields, @Nullable final ImmutableSet tags, @Nullable final Instant lastModifiedAt, + @Nullable final String lastLifecycleStateChange, @Nullable final String description, @Nullable final UUID currentVersion, - @Nullable final ImmutableMap facets) { + @Nullable final ImmutableMap facets, + boolean isDeleted) { this.id = id; this.type = type; this.name = name; @@ -76,9 +80,11 @@ public Dataset( this.fields = (fields == null) ? ImmutableList.of() : fields; this.tags = (tags == null) ? ImmutableSet.of() : tags; this.lastModifiedAt = lastModifiedAt; + this.lastLifecycleStateChange = lastLifecycleStateChange; this.description = description; this.currentVersion = currentVersion; this.facets = (facets == null) ? ImmutableMap.of() : facets; + this.isDeleted = isDeleted; } public Optional getLastModifiedAt() { @@ -89,6 +95,10 @@ public Optional getDescription() { return Optional.ofNullable(description); } + public Optional getLastLifecycleStateChange() { + return Optional.ofNullable(lastLifecycleStateChange); + } + public Optional getCurrentVersion() { return Optional.ofNullable(currentVersion); } diff --git a/api/src/main/java/marquez/service/models/DatasetVersion.java b/api/src/main/java/marquez/service/models/DatasetVersion.java index 5f616eed6d..641ddee430 100644 --- a/api/src/main/java/marquez/service/models/DatasetVersion.java +++ b/api/src/main/java/marquez/service/models/DatasetVersion.java @@ -47,6 +47,7 @@ public abstract class DatasetVersion { @Getter private final SourceName sourceName; @Getter @Setter private ImmutableList fields; @Getter @Setter private ImmutableSet tags; + @Nullable private final String lifecycleStateChange; @Nullable private final String description; @Nullable @Setter private Run createdByRun; @Nullable @Setter private UUID createdByRunUuid; @@ -62,6 +63,7 @@ public DatasetVersion( @NonNull final SourceName sourceName, @Nullable final ImmutableList fields, @Nullable final ImmutableSet tags, + @Nullable final String lifecycleStateChange, @Nullable final String description, @Nullable final Run createdByRun, @Nullable final ImmutableMap facets) { @@ -75,6 +77,7 @@ public DatasetVersion( this.sourceName = sourceName; this.fields = (fields == null) ? ImmutableList.of() : fields; this.tags = (tags == null) ? ImmutableSet.of() : tags; + this.lifecycleStateChange = lifecycleStateChange; this.description = description; this.createdByRun = createdByRun; this.facets = (facets == null) ? ImmutableMap.of() : facets; @@ -88,6 +91,10 @@ public Optional getCreatedByRun() { return Optional.ofNullable(createdByRun); } + public Optional getLifecycleStateChange() { + return Optional.ofNullable(lifecycleStateChange); + } + @JsonIgnore public UUID getCreatedByRunUuid() { return createdByRunUuid; diff --git a/api/src/main/java/marquez/service/models/DbTable.java b/api/src/main/java/marquez/service/models/DbTable.java index 283985583c..fb217513d7 100644 --- a/api/src/main/java/marquez/service/models/DbTable.java +++ b/api/src/main/java/marquez/service/models/DbTable.java @@ -31,9 +31,11 @@ public DbTable( @Nullable final ImmutableList fields, @Nullable final ImmutableSet tags, @Nullable final Instant lastModifiedAt, + @Nullable final String lastLifecycleStateChange, @Nullable final String description, @Nullable final UUID currentVersion, - @Nullable final ImmutableMap facets) { + @Nullable final ImmutableMap facets, + final boolean isDeleted) { super( id, DB_TABLE, @@ -45,8 +47,10 @@ public DbTable( fields, tags, lastModifiedAt, + lastLifecycleStateChange, description, currentVersion, - facets); + facets, + isDeleted); } } diff --git a/api/src/main/java/marquez/service/models/DbTableVersion.java b/api/src/main/java/marquez/service/models/DbTableVersion.java index 81d6bdbb24..3f4ea9be4a 100644 --- a/api/src/main/java/marquez/service/models/DbTableVersion.java +++ b/api/src/main/java/marquez/service/models/DbTableVersion.java @@ -31,6 +31,7 @@ public DbTableVersion( @Nullable final ImmutableList fields, @Nullable final ImmutableSet tags, @Nullable final String description, + @Nullable final String lifecycleStateChange, @Nullable final Run createdByRun, @Nullable final ImmutableMap facets) { super( @@ -43,6 +44,7 @@ public DbTableVersion( sourceName, fields, tags, + lifecycleStateChange, description, createdByRun, facets); diff --git a/api/src/main/java/marquez/service/models/LineageEvent.java b/api/src/main/java/marquez/service/models/LineageEvent.java index f8d29ebb32..714394a890 100644 --- a/api/src/main/java/marquez/service/models/LineageEvent.java +++ b/api/src/main/java/marquez/service/models/LineageEvent.java @@ -309,11 +309,18 @@ public static class Dataset extends BaseJsonModel { @Setter @Valid @ToString - @JsonPropertyOrder({"documentation", "schema", "dataSource", "description"}) + @JsonPropertyOrder({ + "documentation", + "schema", + "dataSource", + "description", + "lifecycleStateChange" + }) public static class DatasetFacets { @Valid private DocumentationDatasetFacet documentation; @Valid private SchemaDatasetFacet schema; + @Valid private LifecycleStateChangeFacet lifecycleStateChangeFacet; @Valid private DatasourceDatasetFacet dataSource; private String description; @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); @@ -336,6 +343,10 @@ public SchemaDatasetFacet getSchema() { return schema; } + public LifecycleStateChangeFacet getLifecycleStateChange() { + return lifecycleStateChangeFacet; + } + public DatasourceDatasetFacet getDataSource() { return dataSource; } @@ -411,4 +422,21 @@ public DatasourceDatasetFacet( this.uri = uri; } } + + @NoArgsConstructor + @Getter + @Setter + @Valid + @ToString + public static class LifecycleStateChangeFacet extends BaseFacet { + + private String lifeCycleStateChange; + + @Builder + public LifecycleStateChangeFacet( + @NotNull URI _producer, @NotNull URI _schemaURL, String stateChange) { + super(_producer, _schemaURL); + this.lifeCycleStateChange = stateChange; + } + } } diff --git a/api/src/main/java/marquez/service/models/Stream.java b/api/src/main/java/marquez/service/models/Stream.java index 29dc4135ec..b97755ca9d 100644 --- a/api/src/main/java/marquez/service/models/Stream.java +++ b/api/src/main/java/marquez/service/models/Stream.java @@ -37,9 +37,11 @@ public Stream( @Nullable final ImmutableList fields, @Nullable final ImmutableSet tags, @Nullable final Instant lastModifiedAt, + @Nullable final String lastLifecycleStateChange, @Nullable final String description, @Nullable final UUID currentVersion, - @Nullable final ImmutableMap facets) { + @Nullable final ImmutableMap facets, + final boolean isDeleted) { super( id, STREAM, @@ -51,9 +53,11 @@ public Stream( fields, tags, lastModifiedAt, + lastLifecycleStateChange, description, currentVersion, - facets); + facets, + isDeleted); this.schemaLocation = schemaLocation; } } diff --git a/api/src/main/java/marquez/service/models/StreamVersion.java b/api/src/main/java/marquez/service/models/StreamVersion.java index 3bace63c22..dc3af3656e 100644 --- a/api/src/main/java/marquez/service/models/StreamVersion.java +++ b/api/src/main/java/marquez/service/models/StreamVersion.java @@ -37,6 +37,7 @@ public StreamVersion( @Nullable final ImmutableList fields, @Nullable final ImmutableSet tags, @Nullable final String description, + @Nullable final String lifecycleStateChange, @Nullable final Run createdByRun, @Nullable final ImmutableMap facets) { super( @@ -49,6 +50,7 @@ public StreamVersion( sourceName, fields, tags, + lifecycleStateChange, description, createdByRun, facets); diff --git a/api/src/main/resources/marquez/db/migration/V40__add_operation_to_dataset_versions.sql b/api/src/main/resources/marquez/db/migration/V40__add_operation_to_dataset_versions.sql new file mode 100644 index 0000000000..822b0c98e6 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V40__add_operation_to_dataset_versions.sql @@ -0,0 +1,2 @@ +alter table dataset_versions add column lifecycle_state_change VARCHAR(63); +alter table datasets add column is_deleted BOOLEAN DEFAULT FALSE; \ No newline at end of file diff --git a/api/src/test/java/marquez/common/UtilsTest.java b/api/src/test/java/marquez/common/UtilsTest.java index f7d53ad097..552421fffa 100644 --- a/api/src/test/java/marquez/common/UtilsTest.java +++ b/api/src/test/java/marquez/common/UtilsTest.java @@ -5,6 +5,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static marquez.common.models.CommonModelGenerator.newDatasetName; import static marquez.common.models.CommonModelGenerator.newJobName; +import static marquez.common.models.CommonModelGenerator.newLifecycleStateChange; import static marquez.common.models.CommonModelGenerator.newNamespaceName; import static marquez.common.models.CommonModelGenerator.newRunId; import static marquez.common.models.CommonModelGenerator.newSchemaFields; @@ -259,6 +260,7 @@ public void testDatasetVersionEqualOnSameData() { DatasetName datasetName = newDatasetName(); DatasetName physicalName = newDatasetName(); SourceName sourceName = newSourceName(); + String lifecycleStateChange = newLifecycleStateChange(); List schemaFields = newSchemaFields(2); RunId runId = newRunId(); @@ -268,6 +270,7 @@ public void testDatasetVersionEqualOnSameData() { sourceName.getValue(), physicalName.getValue(), datasetName.getValue(), + lifecycleStateChange, schemaFields, runId.getValue()); Version second = @@ -276,6 +279,7 @@ public void testDatasetVersionEqualOnSameData() { sourceName.getValue(), physicalName.getValue(), datasetName.getValue(), + lifecycleStateChange, schemaFields, runId.getValue()); @@ -320,6 +324,7 @@ public void testDatasetVersionIsNotEqualOnDifferentData() { newSourceName().getValue(), newDatasetName().getValue(), newDatasetName().getValue(), + newLifecycleStateChange(), schemaFields, newRunId().getValue()); @@ -329,6 +334,7 @@ public void testDatasetVersionIsNotEqualOnDifferentData() { newSourceName().getValue(), newDatasetName().getValue(), newDatasetName().getValue(), + newLifecycleStateChange(), schemaFields, newRunId().getValue()); @@ -337,7 +343,7 @@ public void testDatasetVersionIsNotEqualOnDifferentData() { @Test public void testDatasetVersionWithNullFields() { - Version version = Utils.newDatasetVersionFor(null, null, null, null, null, null); + Version version = Utils.newDatasetVersionFor(null, null, null, null, null, null, null); assertThat(version.getValue()).isNotNull(); } @@ -355,6 +361,7 @@ public void testNewDatasetVersionFor_equalOnUnsortedSchemaFields() { DatasetName datasetName = newDatasetName(); DatasetName physicalName = newDatasetName(); SourceName sourceName = newSourceName(); + String lifecycleStateChange = newLifecycleStateChange(); List schemaFields = newSchemaFields(2); RunId runId = newRunId(); @@ -364,6 +371,7 @@ public void testNewDatasetVersionFor_equalOnUnsortedSchemaFields() { sourceName.getValue(), physicalName.getValue(), datasetName.getValue(), + lifecycleStateChange, schemaFields, runId.getValue()); @@ -375,6 +383,7 @@ public void testNewDatasetVersionFor_equalOnUnsortedSchemaFields() { sourceName.getValue(), physicalName.getValue(), datasetName.getValue(), + lifecycleStateChange, shuffleSchemaFields, runId.getValue()); diff --git a/api/src/test/java/marquez/common/models/CommonModelGenerator.java b/api/src/test/java/marquez/common/models/CommonModelGenerator.java index 0d67367ae0..dd33b3229c 100644 --- a/api/src/test/java/marquez/common/models/CommonModelGenerator.java +++ b/api/src/test/java/marquez/common/models/CommonModelGenerator.java @@ -157,6 +157,10 @@ public static RunId newRunId() { return RunId.of(UUID.randomUUID()); } + public static String newLifecycleStateChange() { + return "TRUNCATE"; + } + public static Version newVersion() { return Version.of(UUID.randomUUID()); } diff --git a/api/src/test/java/marquez/db/ColumnsTest.java b/api/src/test/java/marquez/db/ColumnsTest.java index 80f9c68522..f5645e2c64 100644 --- a/api/src/test/java/marquez/db/ColumnsTest.java +++ b/api/src/test/java/marquez/db/ColumnsTest.java @@ -285,4 +285,23 @@ public void testBlankUri() throws SQLException { final URI actual = Columns.uriOrNull(results, column); assertThat(actual).isNull(); } + + @Test + public void testBooleanOrDefault() throws SQLException { + final String column = "is_deleted"; + when(results.getObject(column)).thenReturn(true); + when(results.getBoolean(column)).thenReturn(true); + + final boolean actual = Columns.booleanOrDefault(results, column, false); + assertThat(actual).isTrue(); + } + + @Test + public void testBooleanOrDefaultWhenNoValue() throws SQLException { + final String column = "is_deleted"; + when(results.getObject(column)).thenReturn(null); + + final boolean actual = Columns.booleanOrDefault(results, column, true); + assertThat(actual).isTrue(); + } } diff --git a/api/src/test/java/marquez/db/DatasetDaoTest.java b/api/src/test/java/marquez/db/DatasetDaoTest.java index ae92c38bab..274741675e 100644 --- a/api/src/test/java/marquez/db/DatasetDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetDaoTest.java @@ -19,6 +19,7 @@ import java.util.Optional; import lombok.Getter; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.LineageEvent.SchemaField; @@ -131,6 +132,64 @@ public void testGetDataset() { "anotherInputFacet"); } + @Test + public void testGetDatasetWithLifecycleStateChangePresent() { + Dataset dataset = + new Dataset( + NAMESPACE, + DATASET, + LineageEvent.DatasetFacets.builder() + .lifecycleStateChangeFacet( + new LineageEvent.LifecycleStateChangeFacet(PRODUCER_URL, SCHEMA_URL, "CREATE")) + .build()); + + createLineageRow( + openLineageDao, + "aWriteJob", + "COMPLETE", + jobFacet, + Collections.emptyList(), + Collections.singletonList(dataset)); + + Optional datasetByName = + datasetDao.findDatasetByName(NAMESPACE, DATASET); + assertThat(datasetByName.get().getLastLifecycleStateChange().get()).isEqualTo("CREATE"); + } + + @Test + public void testGetDatasetWithDatasetMarkedDeleted() { + // create dataset + createLineageRow( + openLineageDao, + "aWriteJob", + "COMPLETE", + jobFacet, + Collections.emptyList(), + Collections.singletonList( + new Dataset(NAMESPACE, DATASET, LineageEvent.DatasetFacets.builder().build()))); + + // mark it deleted + createLineageRow( + openLineageDao, + "aWriteJob", + "COMPLETE", + jobFacet, + Collections.emptyList(), + Collections.singletonList( + new Dataset( + NAMESPACE, + DATASET, + LineageEvent.DatasetFacets.builder() + .lifecycleStateChangeFacet( + new LineageEvent.LifecycleStateChangeFacet( + PRODUCER_URL, SCHEMA_URL, "DROP")) + .build()))); + + // make sure it's not returned by DAO + assertThat(datasetDao.findDatasetByName(NAMESPACE, DATASET)).isEmpty(); + assertThat(datasetDao.findWithTags(NAMESPACE, DATASET)).isEmpty(); + } + @Test public void testGetDatasetWithMultipleVersions() { createLineageRow( diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 97ab8cb6a7..b9f2e9b02e 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -2,7 +2,10 @@ package marquez.db; +import static marquez.db.DatasetDaoTest.DATASET; import static marquez.db.LineageTestUtils.NAMESPACE; +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; import static marquez.db.LineageTestUtils.newDatasetFacet; import static marquez.db.LineageTestUtils.writeDownstreamLineage; import static org.assertj.core.api.Assertions.assertThat; @@ -25,6 +28,7 @@ import marquez.db.models.JobData; import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.LineageEvent.SchemaField; @@ -534,6 +538,35 @@ public void testGetDatasetData() { .allMatch(str -> str.contains("outputData2")); } + @Test + public void testGetDatasetDataLifecycleStateChangeReturned() { + Dataset dataset = + new Dataset( + NAMESPACE, + DATASET, + LineageEvent.DatasetFacets.builder() + .lifecycleStateChangeFacet( + new LineageEvent.LifecycleStateChangeFacet(PRODUCER_URL, SCHEMA_URL, "CREATE")) + .build()); + + UpdateLineageRow row = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(dataset)); + + Set datasetData = + lineageDao.getDatasetData( + Collections.singleton(row.getOutputs().get().get(0).getDatasetRow().getUuid())); + + assertThat(datasetData) + .extracting(ds -> ds.getLastLifecycleStateChange().orElse("")) + .anyMatch(str -> str.contains("CREATE")); + } + @Test public void testGetCurrentRuns() { diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index 375f8f26eb..9229d8ccc0 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -2,12 +2,15 @@ package marquez.db; +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; import static org.assertj.core.api.Assertions.assertThat; import java.util.Arrays; import marquez.db.models.UpdateLineageRow; import marquez.db.models.UpdateLineageRow.DatasetRecord; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.DatasetFacets; import marquez.service.models.LineageEvent.JobFacet; @@ -65,6 +68,28 @@ void testUpdateMarquezModel() { .isEqualTo(writeJob.getOutputs().get().get(0).getDatasetVersionRow()); } + @Test + void testUpdateMarquezModelLifecycleStateChangeFacet() { + Dataset dataset = + new Dataset( + LineageTestUtils.NAMESPACE, + DATASET_NAME, + LineageEvent.DatasetFacets.builder() + .lifecycleStateChangeFacet( + new LineageEvent.LifecycleStateChangeFacet( + PRODUCER_URL, SCHEMA_URL, "TRUNCATE")) + .build()); + + JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + dao, WRITE_JOB_NAME, "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList(dataset)); + + assertThat(writeJob.getOutputs()).isPresent().get().asList().size().isEqualTo(1); + assertThat(writeJob.getOutputs().get().get(0).getDatasetVersionRow().getLifecycleStateChange()) + .isEqualTo("TRUNCATE"); + } + /** * When reading a new dataset, a version is created and the dataset's current version is updated * immediately. @@ -115,6 +140,7 @@ void testUpdateMarquezModelWithNonMatchingReadSchema() { new SchemaField("name", "STRING", "my name"), new SchemaField("age", "INT", "my age"), new SchemaField("eyeColor", "STRING", "my eye color"))), + this.datasetFacets.getLifecycleStateChange(), this.datasetFacets.getDataSource(), this.datasetFacets.getDescription(), this.datasetFacets.getAdditionalFacets()); diff --git a/api/src/test/java/marquez/db/mappers/DatasetMapperTest.java b/api/src/test/java/marquez/db/mappers/DatasetMapperTest.java index 28eb07d048..b97a50ebb9 100644 --- a/api/src/test/java/marquez/db/mappers/DatasetMapperTest.java +++ b/api/src/test/java/marquez/db/mappers/DatasetMapperTest.java @@ -42,6 +42,8 @@ public static void setUp() throws SQLException, MalformedURLException { when(resultSet.getObject(Columns.PHYSICAL_NAME)).thenReturn("PHYSICAL_NAME"); when(resultSet.getString(Columns.TYPE)).thenReturn("DB_TABLE"); when(resultSet.getObject(Columns.TYPE)).thenReturn("DB_TABLE"); + when(resultSet.getString(Columns.LIFECYCLE_STATE_CHANGE)).thenReturn("TRUNCATE"); + when(resultSet.getObject(Columns.LIFECYCLE_STATE_CHANGE)).thenReturn("TRUNCATE"); when(resultSet.getString(Columns.DESCRIPTION)).thenReturn("DESCRIPTION"); when(resultSet.getObject(Columns.DESCRIPTION)).thenReturn("DESCRIPTION"); when(resultSet.getString(Columns.SOURCE_NAME)).thenReturn("POSTGRES"); diff --git a/api/src/test/java/marquez/service/models/ServiceModelGenerator.java b/api/src/test/java/marquez/service/models/ServiceModelGenerator.java index 6ab61d6281..a63c28f078 100644 --- a/api/src/test/java/marquez/service/models/ServiceModelGenerator.java +++ b/api/src/test/java/marquez/service/models/ServiceModelGenerator.java @@ -11,6 +11,7 @@ import static marquez.common.models.CommonModelGenerator.newDescription; import static marquez.common.models.CommonModelGenerator.newFields; import static marquez.common.models.CommonModelGenerator.newJobType; +import static marquez.common.models.CommonModelGenerator.newLifecycleStateChange; import static marquez.common.models.CommonModelGenerator.newLocation; import static marquez.common.models.CommonModelGenerator.newNamespaceName; import static marquez.common.models.CommonModelGenerator.newOwnerName; @@ -59,9 +60,11 @@ public static DbTable newDbTableWith(final DatasetId dbTableId) { newFields(4), newTagNames(2), null, + newLifecycleStateChange(), newDescription(), null, - null); + null, + false); } /** Returns a new {@link DbTableMeta} object. */ diff --git a/api/src/test/resources/mappers/full_dataset_mapper.json b/api/src/test/resources/mappers/full_dataset_mapper.json index c6dba5a7cc..d8bd7d81ab 100644 --- a/api/src/test/resources/mappers/full_dataset_mapper.json +++ b/api/src/test/resources/mappers/full_dataset_mapper.json @@ -4,6 +4,7 @@ "name": "NAME" }, "type": "DB_TABLE", + "lastLifecycleStateChange": "TRUNCATE", "description": "DESCRIPTION", "name": "NAME", "physicalName": "PHYSICAL_NAME", diff --git a/web/src/components/datasets/DatasetVersions.tsx b/web/src/components/datasets/DatasetVersions.tsx index 775ab2967c..758d545d63 100644 --- a/web/src/components/datasets/DatasetVersions.tsx +++ b/web/src/components/datasets/DatasetVersions.tsx @@ -25,7 +25,7 @@ const styles = (theme: ITheme) => { }) } -const DATASET_VERSIONS_COLUMNS = ['Version', 'Created At', 'Field Count'] +const DATASET_VERSIONS_COLUMNS = ['Version', 'Created At', 'Field Count', 'State Change'] interface DatasetVersionsProps { versions: DatasetVersion[] @@ -83,6 +83,7 @@ const DatasetVersions: FunctionComponent< {version.version} {formatUpdatedAt(version.createdAt)} {version.fields.length} + {version.stateChange} ) })} diff --git a/web/src/types/api.ts b/web/src/types/api.ts index 7b35e8d726..1e145b8203 100644 --- a/web/src/types/api.ts +++ b/web/src/types/api.ts @@ -60,6 +60,7 @@ export interface DatasetVersion { tags: string[] lastModifiedAt: string description: string + stateChange: string facets: object }