Skip to content

Commit

Permalink
rename ColumnLevelLineage -> ColumnLineage
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Sep 28, 2022
1 parent 1aefca2 commit 21dac22
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 49 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ public interface BaseDao extends SqlObject {
OpenLineageDao createOpenLineageDao();

@CreateSqlObject
ColumnLevelLineageDao createColumnLevelLineageDao();
ColumnLineageDao createColumnLineageDao();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.db.mappers.ColumnLevelLineageRowMapper;
import marquez.db.models.ColumnLevelLineageRow;
import marquez.db.mappers.ColumnLineageRowMapper;
import marquez.db.models.ColumnLineageRow;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindBeanList;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(ColumnLevelLineageRowMapper.class)
public interface ColumnLevelLineageDao extends BaseDao {
@RegisterRowMapper(ColumnLineageRowMapper.class)
public interface ColumnLineageDao extends BaseDao {

default List<ColumnLevelLineageRow> upsertColumnLevelLineageRow(
default List<ColumnLineageRow> upsertColumnLineageRow(
UUID outputDatasetVersionUuid,
UUID outputDatasetFieldUuid,
List<Pair<UUID, UUID>> inputs,
Expand All @@ -33,11 +33,11 @@ default List<ColumnLevelLineageRow> upsertColumnLevelLineageRow(
return Collections.emptyList();
}

List<ColumnLevelLineageRow> rows =
doUpsertColumnLineageRow(
inputs.stream()
.map(
input ->
new ColumnLevelLineageRow(
new ColumnLineageRow(
outputDatasetVersionUuid,
outputDatasetFieldUuid,
input.getLeft(), // input_dataset_version_uuid
Expand All @@ -46,20 +46,19 @@ default List<ColumnLevelLineageRow> upsertColumnLevelLineageRow(
transformationType,
now,
now))
.collect(Collectors.toList());
doUpsertColumnLevelLineageRow(rows.toArray(new ColumnLevelLineageRow[0]));
return findColumnLevelLineageByDatasetVersionColumnAndOutputDatasetField(
.collect(Collectors.toList()));
return findColumnLineageByDatasetVersionColumnAndOutputDatasetField(
outputDatasetVersionUuid, outputDatasetFieldUuid);
}

@SqlQuery(
"SELECT * FROM column_level_lineage WHERE output_dataset_version_uuid = :datasetVersionUuid AND output_dataset_field_uuid = :outputDatasetFieldUuid")
List<ColumnLevelLineageRow> findColumnLevelLineageByDatasetVersionColumnAndOutputDatasetField(
"SELECT * FROM column_lineage WHERE output_dataset_version_uuid = :datasetVersionUuid AND output_dataset_field_uuid = :outputDatasetFieldUuid")
List<ColumnLineageRow> findColumnLineageByDatasetVersionColumnAndOutputDatasetField(
UUID datasetVersionUuid, UUID outputDatasetFieldUuid);

@SqlUpdate(
"""
INSERT INTO column_level_lineage (
INSERT INTO column_lineage (
output_dataset_version_uuid,
output_dataset_field_uuid,
input_dataset_version_uuid,
Expand All @@ -74,9 +73,8 @@ ON CONFLICT (output_dataset_version_uuid, output_dataset_field_uuid, input_datas
transformation_description = EXCLUDED.transformation_description,
transformation_type = EXCLUDED.transformation_type,
updated_at = EXCLUDED.updated_at
RETURNING *
""")
void doUpsertColumnLevelLineageRow(
void doUpsertColumnLineageRow(
@BindBeanList(
propertyNames = {
"outputDatasetVersionUuid",
Expand All @@ -89,5 +87,5 @@ void doUpsertColumnLevelLineageRow(
"updatedAt"
},
value = "values")
ColumnLevelLineageRow... rows);
List<ColumnLineageRow> rows);
}
22 changes: 11 additions & 11 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.mappers.LineageEventMapper;
import marquez.db.models.ColumnLevelLineageRow;
import marquez.db.models.ColumnLineageRow;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
Expand Down Expand Up @@ -137,7 +137,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
RunDao runDao = createRunDao();
RunArgsDao runArgsDao = createRunArgsDao();
RunStateDao runStateDao = createRunStateDao();
ColumnLevelLineageDao columnLevelLineageDao = createColumnLevelLineageDao();
ColumnLineageDao columnLineageDao = createColumnLineageDao();

Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();

Expand Down Expand Up @@ -331,7 +331,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
datasetVersionDao,
datasetFieldDao,
runDao,
columnLevelLineageDao);
columnLineageDao);
datasetInputs.add(record);
}
}
Expand All @@ -354,7 +354,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
datasetVersionDao,
datasetFieldDao,
runDao,
columnLevelLineageDao);
columnLineageDao);
datasetOutputs.add(record);
}
}
Expand Down Expand Up @@ -551,7 +551,7 @@ default DatasetRecord upsertLineageDataset(
DatasetVersionDao datasetVersionDao,
DatasetFieldDao datasetFieldDao,
RunDao runDao,
ColumnLevelLineageDao columnLevelLineageDao) {
ColumnLineageDao columnLineageDao) {
NamespaceRow dsNamespace =
namespaceDao.upsertNamespaceRow(
UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER);
Expand Down Expand Up @@ -702,28 +702,28 @@ default DatasetRecord upsertLineageDataset(
}
}

List<ColumnLevelLineageRow> columnLineageRows = Collections.emptyList();
List<ColumnLineageRow> columnLineageRows = Collections.emptyList();
if (!isInput) {
columnLineageRows =
upsertColumnLineage(
runUuid,
ds,
now,
datasetFields,
columnLevelLineageDao,
columnLineageDao,
datasetFieldDao,
datasetVersionRow);
}

return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace, columnLineageRows);
}

private List<ColumnLevelLineageRow> upsertColumnLineage(
private List<ColumnLineageRow> upsertColumnLineage(
UUID runUuid,
Dataset ds,
Instant now,
List<DatasetFieldRow> datasetFields,
ColumnLevelLineageDao columnLevelLineageDao,
ColumnLineageDao columnLineageDao,
DatasetFieldDao datasetFieldDao,
DatasetVersionRow datasetVersionRow) {
// get all the fields related to this particular run
Expand Down Expand Up @@ -770,8 +770,8 @@ private List<ColumnLevelLineageRow> upsertColumnLineage(
fieldData.getDatasetFieldUuid()))
.collect(Collectors.toList());

return columnLevelLineageDao
.upsertColumnLevelLineageRow(
return columnLineageDao
.upsertColumnLineageRow(
datasetVersionRow.getUuid(),
outputField.get().getUuid(),
inputFields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
import java.sql.SQLException;
import lombok.NonNull;
import marquez.db.Columns;
import marquez.db.models.ColumnLevelLineageRow;
import marquez.db.models.ColumnLineageRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public class ColumnLevelLineageRowMapper implements RowMapper<ColumnLevelLineageRow> {
public class ColumnLineageRowMapper implements RowMapper<ColumnLineageRow> {

@Override
public ColumnLevelLineageRow map(@NonNull ResultSet results, @NonNull StatementContext context)
public ColumnLineageRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new ColumnLevelLineageRow(
return new ColumnLineageRow(
uuidOrThrow(results, Columns.OUTPUT_DATASET_VERSION_UUID),
uuidOrThrow(results, Columns.OUTPUT_DATASET_FIELD_UUID),
uuidOrThrow(results, Columns.INPUT_DATASET_VERSION_UUID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class ColumnLevelLineageRow {
public class ColumnLineageRow {
@Getter @NonNull private final UUID outputDatasetVersionUuid;
@Getter @NonNull private final UUID outputDatasetFieldUuid;
@Getter @NonNull private final UUID inputDatasetVersionUuid;
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/models/UpdateLineageRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ public static class DatasetRecord {
DatasetRow datasetRow;
DatasetVersionRow datasetVersionRow;
NamespaceRow namespaceRow;
List<ColumnLevelLineageRow> columnLineageRows;
List<ColumnLineageRow> columnLineageRows;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* SPDX-License-Identifier: Apache-2.0 */

CREATE TABLE column_level_lineage (
CREATE TABLE column_lineage (
output_dataset_version_uuid uuid REFERENCES dataset_versions(uuid), -- allows join to run_id
output_dataset_field_uuid uuid REFERENCES dataset_fields(uuid),
input_dataset_version_uuid uuid REFERENCES dataset_versions(uuid), -- speed up graph column lineage graph traversal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.List;
import java.util.UUID;
import marquez.common.models.DatasetType;
import marquez.db.models.ColumnLevelLineageRow;
import marquez.db.models.ColumnLineageRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.NamespaceRow;
Expand All @@ -30,9 +30,9 @@
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class ColumnLevelLineageDaoTest {
public class ColumnLineageDaoTest {

private static ColumnLevelLineageDao dao;
private static ColumnLineageDao dao;
private static DatasetFieldDao fieldDao;
private static DatasetDao datasetDao;
private static NamespaceDao namespaceDao;
Expand All @@ -50,7 +50,7 @@ public class ColumnLevelLineageDaoTest {

@BeforeAll
public static void setUpOnce(Jdbi jdbi) {
dao = jdbi.onDemand(ColumnLevelLineageDao.class);
dao = jdbi.onDemand(ColumnLineageDao.class);
fieldDao = jdbi.onDemand(DatasetFieldDao.class);
datasetDao = jdbi.onDemand(DatasetDao.class);
namespaceDao = jdbi.onDemand(NamespaceDao.class);
Expand All @@ -73,7 +73,7 @@ public void setup() {
"",
sourceRow.getUuid(),
"",
"",
"inputDataset",
"",
"",
false);
Expand All @@ -86,7 +86,7 @@ public void setup() {
"",
sourceRow.getUuid(),
"",
"",
"outputDataset",
"",
"",
false);
Expand Down Expand Up @@ -135,7 +135,7 @@ public void setup() {
public void tearDown(Jdbi jdbi) {
jdbi.inTransaction(
handle -> {
handle.execute("DELETE FROM column_level_lineage");
handle.execute("DELETE FROM column_lineage");
handle.execute("DELETE FROM dataset_versions");
handle.execute("DELETE FROM dataset_fields");
handle.execute("DELETE FROM datasets");
Expand All @@ -154,8 +154,8 @@ void testUpsertMultipleColumns() {
fieldDao.upsert(inputFieldUuid1, now, "a", "string", "desc", inputDatasetRow.getUuid());
fieldDao.upsert(inputFieldUuid2, now, "b", "string", "desc", inputDatasetRow.getUuid());

List<ColumnLevelLineageRow> rows =
dao.upsertColumnLevelLineageRow(
List<ColumnLineageRow> rows =
dao.upsertColumnLineageRow(
outputDatasetVersionRow.getUuid(),
outputDatasetFieldUuid,
Arrays.asList(
Expand All @@ -180,8 +180,8 @@ void testUpsertMultipleColumns() {

@Test
void testUpsertEmptyList() {
List<ColumnLevelLineageRow> rows =
dao.upsertColumnLevelLineageRow(
List<ColumnLineageRow> rows =
dao.upsertColumnLineageRow(
UUID.randomUUID(),
outputDatasetFieldUuid,
Collections.emptyList(), // provide empty list
Expand All @@ -198,15 +198,15 @@ void testUpsertOnUpdatePreventsDuplicates() {
UUID inputFieldUuid = UUID.randomUUID();
fieldDao.upsert(inputFieldUuid, now, "a", "string", "desc", inputDatasetRow.getUuid());

dao.upsertColumnLevelLineageRow(
dao.upsertColumnLineageRow(
inputDatasetVersionRow.getUuid(),
outputDatasetFieldUuid,
Arrays.asList(Pair.of(inputDatasetVersionRow.getUuid(), inputFieldUuid)),
transformationDescription,
transformationType,
now);
List<ColumnLevelLineageRow> rows =
dao.upsertColumnLevelLineageRow(
List<ColumnLineageRow> rows =
dao.upsertColumnLineageRow(
inputDatasetVersionRow.getUuid(),
outputDatasetFieldUuid,
Arrays.asList(Pair.of(inputDatasetVersionRow.getUuid(), inputFieldUuid)),
Expand Down

0 comments on commit 21dac22

Please sign in to comment.