Skip to content

Commit

Permalink
Add java migrations for backfilling runs with job uuids and parents (#…
Browse files Browse the repository at this point in the history
…1980)

* Add migrations to support job parent relationship storage

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update all job and run queries to reference jobs_view and runs_view

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Remove references to simple_name as job redirects handle redirecting simple name to fqn
added unit test to verify

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Fix runs migration script

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Add java migrations for backfilling runs with job uuids and backfill Airflow runs
Signed-off-by: Michael Collado <collado.mike@gmail.com>

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike authored May 20, 2022
1 parent 0021a2b commit 9d97708
Show file tree
Hide file tree
Showing 10 changed files with 875 additions and 3 deletions.
78 changes: 78 additions & 0 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
Expand All @@ -37,6 +39,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -50,6 +53,7 @@
import marquez.service.models.DatasetMeta;
import marquez.service.models.DbTableMeta;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.ParentRunFacet;
import marquez.service.models.StreamMeta;
import org.apache.commons.lang3.tuple.Triple;

Expand All @@ -61,6 +65,16 @@ private Utils() {}
public static final String VERSION_DELIM = ":";
public static final Joiner VERSION_JOINER = Joiner.on(VERSION_DELIM).skipNulls();

/**
* pre-defined NAMESPACE_URL defined in RFC4122. This is the namespace used by the OpenLineage
* Airflow integration for constructing some run IDs as UUIDs. We use the same namespace to
* construct the same UUIDs when absolutely necessary (e.g., backfills, backward compatibility)
*
* @see "https://datatracker.ietf.org/doc/html/rfc4122#appendix-C"
*/
public static final UUID NAMESPACE_URL_UUID =
UUID.fromString("6ba7b811-9dad-11d1-80b4-00c04fd430c8");

private static final ObjectMapper MAPPER = newObjectMapper();

private static final int UUID_LENGTH = 36;
Expand Down Expand Up @@ -141,6 +155,70 @@ public static UUID toUuid(@NonNull final String uuidString) {
return UUID.fromString(uuidString);
}

/**
* Construct a name-based {@link UUID} based on the {@link #NAMESPACE_URL_UUID} namespace. Name
* parts are separated by a dot (.) character.
*
* @see "https://datatracker.ietf.org/doc/html/rfc4122#page-13"
* @param nameParts
* @return
*/
public static UUID toNameBasedUuid(String... nameParts) {
String constructedName = String.join(".", nameParts);

final byte[] nameBytes = constructedName.getBytes(StandardCharsets.UTF_8);

ByteBuffer buffer = ByteBuffer.allocate(nameBytes.length + 16);
buffer.putLong(NAMESPACE_URL_UUID.getMostSignificantBits());
buffer.putLong(NAMESPACE_URL_UUID.getLeastSignificantBits());
buffer.put(nameBytes);

return UUID.nameUUIDFromBytes(buffer.array());
}

/**
* Construct a UUID from a {@link ParentRunFacet} - if the {@link
* marquez.service.models.LineageEvent.RunLink#runId} field is a valid {@link UUID}, use it.
* Otherwise, compute a {@link UUID} from the job name and the reported runId. If the job name
* contains a dot (.), only return the portion up to the last dot in the name (this attempts to
* address airflow tasks, which always report the job name as &lt;dag_name&gt;.&lt;task_name&lt;
*
* @param parent
* @return
*/
public static UUID findParentRunUuid(ParentRunFacet parent) {
String jobName = parent.getJob().getName();
String parentRunId = parent.getRun().getRunId();
return findParentRunUuid(jobName, parentRunId);
}

public static UUID findParentRunUuid(String parentJobName, String parentRunId) {
String dagName = parseParentJobName(parentJobName);
return toUuid(parentRunId, dagName);
}

public static String parseParentJobName(String parentJobName) {
return parentJobName.contains(".")
? parentJobName.substring(0, parentJobName.lastIndexOf('.'))
: parentJobName;
}

/**
* Compute a UUID from a RunId and a jobName
*
* @see Utils#toNameBasedUuid(String...) for details on the UUID construction.
* @param runId
* @param jobName
* @return
*/
public static UUID toUuid(@NotNull String runId, String jobName) {
try {
return UUID.fromString(runId);
} catch (IllegalArgumentException e) {
return Utils.toNameBasedUuid(jobName, runId);
}
}

public static Instant toInstant(@Nullable final String asIso) {
return (asIso == null) ? null : Instant.from(ISO_INSTANT.parse(asIso));
}
Expand Down
4 changes: 3 additions & 1 deletion api/src/main/java/marquez/db/FlywayFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public final class FlywayFactory {
private static final boolean DEFAULT_CLEAN_DISABLED = false;
private static final boolean DEFAULT_OUT_OF_ORDER = false;
private static final String DEFAULT_LOCATION = "marquez/db/migration";
private static final List<String> DEFAULT_LOCATIONS = ImmutableList.of(DEFAULT_LOCATION);
private static final String DEFAULT_MIGRATION_CLASSPATH = "classpath:marquez/db/migrations";
private static final List<String> DEFAULT_LOCATIONS =
ImmutableList.of(DEFAULT_LOCATION, DEFAULT_MIGRATION_CLASSPATH);
private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
private static final String DEFAULT_TABLE = "flyway_schema_history";
private static final boolean DEFAULT_PLACEHOLDER_REPLACEMENT = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package marquez.db.migrations;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.Context;
import org.flywaydb.core.api.migration.JavaMigration;

/**
* This migration is dependent on the migration found in the SQL script for V43. This updates the
* runs table to include the <code>job_uuid</code> value for each record. We update the table in
* batches to avoid table-level locks so that concurrent reads and writes can continue to take
* place. Auto-commit is enabled, so it is entirely possible that this migration will fail partway
* through and some records will retain the <code>job_uuid</code> value while others will not. This
* is intentional as no harm will come from leaving these values in place in case of rollback.
*/
@Slf4j
public class V43_1__UpdateRunsWithJobUUID implements JavaMigration {

@Override
public MigrationVersion getVersion() {
return MigrationVersion.fromVersion("43.1");
}

// don't execute in a transaction so each batch can be committed immediately
@Override
public boolean canExecuteInTransaction() {
return false;
}

@Override
public void migrate(Context context) throws Exception {
Connection conn = context.getConnection();
try (PreparedStatement queryPs =
conn.prepareStatement("SELECT uuid, name, namespace_name FROM jobs");
PreparedStatement updatePs =
conn.prepareStatement(
"UPDATE runs SET job_uuid=? WHERE job_name=? AND namespace_name=?")) {

ResultSet resultSet = queryPs.executeQuery();
boolean isAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(true);
try {
while (resultSet.next()) {
String uuid = resultSet.getString("uuid");
String jobName = resultSet.getString("name");
String namespace = resultSet.getString("namespace_name");
updatePs.setObject(1, UUID.fromString(uuid));
updatePs.setString(2, jobName);
updatePs.setString(3, namespace);
if (!updatePs.execute()) {
log.error("Unable to execute update of runs for {}.{}", jobName, namespace);
}
}
} finally {
conn.setAutoCommit(isAutoCommit);
}
}
}

@Override
public String getDescription() {
return "UpdateRunsWithJobUUID";
}

@Override
public Integer getChecksum() {
return null;
}

@Override
public boolean isUndo() {
return false;
}

@Override
public boolean isBaselineMigration() {
return false;
}
}
Loading

0 comments on commit 9d97708

Please sign in to comment.