Skip to content

Commit

Permalink
add operation column
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Mar 9, 2022
1 parent 6c312d9 commit d04951f
Show file tree
Hide file tree
Showing 36 changed files with 313 additions and 37 deletions.
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public static Version newJobVersionFor(
* @param sourceName The source name of the dataset.
* @param physicalName The physical name of the dataset.
* @param datasetName The dataset name.
* @param lifecycleStateChange The dataset change like CREATE, DROP, TRUNCATE.
* @param fields The fields of the dataset.
* @param runId The UUID of the run linked to the dataset.
* @return A {@link Version} object based on the specified job meta.
Expand All @@ -213,6 +214,7 @@ public static Version newDatasetVersionFor(
String sourceName,
String physicalName,
String datasetName,
String lifecycleStateChange,
List<LineageEvent.SchemaField> fields,
UUID runId) {
DatasetVersionData data =
Expand All @@ -221,6 +223,7 @@ public static Version newDatasetVersionFor(
.sourceName(sourceName)
.physicalName(physicalName)
.datasetName(datasetName)
.lifecycleStateChange(lifecycleStateChange)
.schemaFields(fields)
.runId(runId)
.build();
Expand Down Expand Up @@ -259,6 +262,7 @@ private static Version newDatasetVersionFor(DatasetVersionData data) {
data.getPhysicalName(),
data.getSchemaLocation(),
data.getFields().stream().map(Utils::joinField).collect(joining(VERSION_DELIM)),
data.getLifecycleStateChange(),
data.getRunId())
.getBytes(UTF_8);
return Version.of(UUID.nameUUIDFromBytes(bytes));
Expand All @@ -275,6 +279,7 @@ private static class DatasetVersionData {
private String sourceName;
private String physicalName;
private String datasetName;
private String lifecycleStateChange;
private String schemaLocation;
private Set<Triple<String, String, String>> fields;
private UUID runId;
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ private Columns() {}
public static final String TAG_UUIDS = "tag_uuids";
public static final String TAGGED_AT = "tagged_at";
public static final String LAST_MODIFIED_AT = "last_modified_at";
public static final String IS_DELETED = "is_deleted";

/* DATASET VERSION ROW COLUMNS */
public static final String FIELD_UUIDS = "field_uuids";
public static final String LIFECYCLE_STATE_CHANGE = "lifecycle_state_change";

/* STREAM VERSION ROW COLUMNS */
public static final String SCHEMA_LOCATION = "schema_location";
Expand Down Expand Up @@ -160,6 +162,15 @@ public static String stringOrThrow(final ResultSet results, final String column)
return results.getString(column);
}

public static boolean booleanOrDefault(
final ResultSet results, final String column, final boolean defaultValue)
throws SQLException {
if (results.getObject(column) == null) {
return defaultValue;
}
return results.getBoolean(column);
}

public static int intOrThrow(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
Expand Down
29 changes: 18 additions & 11 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,17 @@ void updateLastModifiedAt(
+ " FROM datasets d\n"
+ " WHERE d.namespace_name = :namespaceName\n"
+ " AND d.name = :datasetName\n"
+ " AND d.is_deleted = FALSE\n"
+ "), dataset_runs AS (\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, event_time, event\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state_change, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " UNION\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, event_time, event\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state_change, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
Expand All @@ -89,7 +90,7 @@ void updateLastModifiedAt(
+ " WHERE run_uuid = rim.run_uuid\n"
+ " ) e ON e.run_uuid = rim.run_uuid\n"
+ ")\n"
+ "SELECT d.*, dv.fields, sv.schema_location, t.tags, facets\n"
+ "SELECT d.*, dv.fields, dv.lifecycle_state_change, sv.schema_location, t.tags, facets\n"
+ "FROM selected_datasets d\n"
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
Expand Down Expand Up @@ -142,15 +143,15 @@ default void setFields(Dataset ds) {
+ " ORDER BY d.name\n"
+ " LIMIT :limit OFFSET :offset\n"
+ "), dataset_runs AS (\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, event_time, event\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state_change, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " UNION\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, event_time, event\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state_change, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
Expand All @@ -159,7 +160,7 @@ default void setFields(Dataset ds) {
+ " WHERE run_uuid = rim.run_uuid\n"
+ " ) e ON e.run_uuid = rim.run_uuid\n"
+ ")\n"
+ "SELECT d.*, dv.fields, sv.schema_location, t.tags, facets\n"
+ "SELECT d.*, dv.fields, dv.lifecycle_state_change, sv.schema_location, t.tags, facets\n"
+ "FROM selected_datasets d\n"
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
Expand Down Expand Up @@ -205,7 +206,8 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
+ "source_name, "
+ "name, "
+ "physical_name, "
+ "description "
+ "description, "
+ "is_deleted "
+ ") VALUES ( "
+ ":uuid, "
+ ":type, "
Expand All @@ -217,13 +219,15 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
+ ":sourceName, "
+ ":name, "
+ ":physicalName, "
+ ":description) "
+ ":description, "
+ ":isDeleted) "
+ "ON CONFLICT (namespace_uuid, name) "
+ "DO UPDATE SET "
+ "type = EXCLUDED.type, "
+ "updated_at = EXCLUDED.updated_at, "
+ "physical_name = EXCLUDED.physical_name, "
+ "description = EXCLUDED.description "
+ "description = EXCLUDED.description, "
+ "is_deleted = EXCLUDED.is_deleted "
+ "RETURNING *")
DatasetRow upsert(
UUID uuid,
Expand All @@ -235,7 +239,8 @@ DatasetRow upsert(
String sourceName,
String name,
String physicalName,
String description);
String description,
boolean isDeleted);

@SqlQuery(
"INSERT INTO datasets ("
Expand Down Expand Up @@ -308,7 +313,8 @@ default Dataset upsertDatasetMeta(
sourceRow.getName(),
datasetName.getValue(),
datasetMeta.getPhysicalName().getValue(),
datasetMeta.getDescription().orElse(null));
datasetMeta.getDescription().orElse(null),
false);
} else {
datasetRow =
upsert(
Expand Down Expand Up @@ -340,6 +346,7 @@ default Dataset upsertDatasetMeta(
now,
namespaceName.getValue(),
datasetName.getValue(),
null,
datasetMeta);

return findWithTags(namespaceName.getValue(), datasetName.getValue()).get();
Expand Down
17 changes: 10 additions & 7 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ default DatasetVersionRow upsertDatasetVersion(
Instant now,
String namespaceName,
String datasetName,
String lifecycleStateChange,
DatasetMeta datasetMeta) {
TagDao tagDao = createTagDao();
DatasetFieldDao datasetFieldDao = createDatasetFieldDao();
Expand All @@ -63,7 +64,8 @@ default DatasetVersionRow upsertDatasetVersion(
datasetMeta.getRunId().map(RunId::getValue).orElse(null),
toPgObjectFields(datasetMeta.getFields()),
namespaceName,
datasetName);
datasetName,
lifecycleStateChange);
updateDatasetVersionMetric(
namespaceName,
datasetMeta.getType().toString(),
Expand Down Expand Up @@ -167,7 +169,7 @@ default void updateDatasetVersionMetric(
+ " FROM selected_dataset_version_runs dv\n"
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
+ ")\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state_change, \n"
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
+ " t.tags, f.facets\n"
+ "FROM selected_dataset_versions dv\n"
Expand Down Expand Up @@ -209,7 +211,7 @@ default void updateDatasetVersionMetric(
+ " FROM selected_dataset_version_runs dv\n"
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
+ ")\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state_change, \n"
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
+ " t.tags, f.facets\n"
+ "FROM selected_dataset_versions dv\n"
Expand Down Expand Up @@ -280,7 +282,7 @@ default Optional<DatasetVersion> findByWithRun(UUID version) {
+ " FROM selected_dataset_version_runs dv\n"
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
+ ")\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state_change,\n"
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
+ " t.tags, f.facets\n"
+ "FROM selected_dataset_versions dv\n"
Expand Down Expand Up @@ -324,9 +326,9 @@ default List<DatasetVersion> findAllWithRun(

@SqlQuery(
"INSERT INTO dataset_versions "
+ "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name) "
+ "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name, lifecycle_state_change) "
+ "VALUES "
+ "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName) "
+ "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName, :lifecycleStateChange) "
+ "ON CONFLICT(version) "
+ "DO UPDATE SET "
+ "run_uuid = EXCLUDED.run_uuid "
Expand All @@ -339,7 +341,8 @@ DatasetVersionRow upsert(
UUID runUuid,
PGobject fields,
String namespaceName,
String datasetName);
String datasetName,
String lifecycleStateChange);

@SqlUpdate("UPDATE dataset_versions SET fields = :fields WHERE uuid = :uuid")
void updateFields(UUID uuid, PGobject fields);
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public interface LineageDao {
Optional<UUID> getJobUuid(String jobName, String namespace);

@SqlQuery(
"SELECT ds.*, dv.fields\n"
"SELECT ds.*, dv.fields, dv.lifecycle_state_change\n"
+ "FROM datasets ds\n"
+ "LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid\n"
+ "WHERE ds.uuid IN (<dsUuids>);")
Expand Down
15 changes: 12 additions & 3 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import marquez.service.models.LineageEvent.Dataset;
import marquez.service.models.LineageEvent.DatasetFacets;
import marquez.service.models.LineageEvent.Job;
import marquez.service.models.LineageEvent.LifecycleStateChangeFacet;
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
import marquez.service.models.LineageEvent.SchemaField;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
Expand Down Expand Up @@ -350,6 +351,12 @@ default DatasetRecord upsertLineageDataset(
formatNamespaceName(ds.getNamespace()),
DEFAULT_NAMESPACE_OWNER);

String dsLifecycleStateChange =
Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getLifecycleStateChange)
.map(LifecycleStateChangeFacet::getLifeCycleStateChange)
.orElse("");

DatasetRow datasetRow =
datasetDao.upsert(
UUID.randomUUID(),
Expand All @@ -361,7 +368,8 @@ default DatasetRecord upsertLineageDataset(
source.getName(),
formatDatasetName(ds.getName()),
ds.getName(),
dsDescription);
dsDescription,
dsLifecycleStateChange.equalsIgnoreCase("DROP"));

List<SchemaField> fields =
Optional.ofNullable(ds.getFacets())
Expand All @@ -385,6 +393,7 @@ default DatasetRecord upsertLineageDataset(
source.getName(),
dsRow.getPhysicalName(),
dsRow.getName(),
dsLifecycleStateChange,
fields,
runUuid)
.getValue();
Expand All @@ -397,8 +406,8 @@ default DatasetRecord upsertLineageDataset(
isInput ? null : runUuid,
datasetVersionDao.toPgObjectSchemaFields(fields),
dsNamespace.getName(),
ds.getName());

ds.getName(),
dsLifecycleStateChange);
return row;
});
List<DatasetFieldMapping> datasetFieldMappings = new ArrayList<>();
Expand Down
4 changes: 3 additions & 1 deletion api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet<DatasetId> runOu
d.getSourceName().getValue(),
d.getPhysicalName().getValue(),
d.getName().getValue(),
null,
toSchemaFields(d.getFields()),
runUuid)
.getValue();
Expand All @@ -288,7 +289,8 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet<DatasetId> runOu
runUuid,
datasetVersionDao.toPgObjectFields(d.getFields()),
d.getNamespace().getValue(),
d.getName().getValue());
d.getName().getValue(),
null);
});
}
}
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/marquez/db/mappers/DatasetDataMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public DatasetData map(@NonNull ResultSet results, @NonNull StatementContext con
toFields(results, "fields"),
ImmutableSet.of(),
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
stringOrNull(results, Columns.DESCRIPTION));
stringOrNull(results, Columns.DESCRIPTION),
stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE));
}

public static ImmutableList<Field> toFields(ResultSet results, String column)
Expand Down
9 changes: 7 additions & 2 deletions api/src/main/java/marquez/db/mappers/DatasetMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package marquez.db.mappers;

import static marquez.db.Columns.booleanOrDefault;
import static marquez.db.Columns.stringArrayOrThrow;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
Expand Down Expand Up @@ -62,9 +63,11 @@ public Dataset map(@NonNull ResultSet results, @NonNull StatementContext context
toFields(results, "fields"),
toTags(results, "tags"),
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE),
stringOrNull(results, Columns.DESCRIPTION),
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
toFacetsOrNull(results, Columns.FACETS));
toFacetsOrNull(results, Columns.FACETS),
booleanOrDefault(results, Columns.IS_DELETED, false));
} else {
return new Stream(
new DatasetId(
Expand All @@ -79,9 +82,11 @@ public Dataset map(@NonNull ResultSet results, @NonNull StatementContext context
toFields(results, "fields"),
toTags(results, "tags"),
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE),
stringOrNull(results, Columns.DESCRIPTION),
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
toFacetsOrNull(results, Columns.FACETS));
toFacetsOrNull(results, Columns.FACETS),
booleanOrDefault(results, Columns.IS_DELETED, false));
}
}

Expand Down
4 changes: 3 additions & 1 deletion api/src/main/java/marquez/db/mappers/DatasetRowMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package marquez.db.mappers;

import static marquez.db.Columns.booleanOrDefault;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrNull;
Expand Down Expand Up @@ -32,6 +33,7 @@ public DatasetRow map(@NonNull ResultSet results, @NonNull StatementContext cont
stringOrThrow(results, Columns.PHYSICAL_NAME),
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
stringOrNull(results, Columns.DESCRIPTION),
uuidOrNull(results, Columns.CURRENT_VERSION_UUID));
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
booleanOrDefault(results, Columns.IS_DELETED, false));
}
}
Loading

0 comments on commit d04951f

Please sign in to comment.