diff --git a/CHANGELOG.md b/CHANGELOG.md index bb785b4c4a..7c72ae7ea0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ ### Changed * Set default limit for listing datasets and jobs in UI from `2000` to `25` [#2018](https://github.com/MarquezProject/marquez/pull/2018) [@wslulciuc](https://github.com/wslulciuc) +* Update OpenLineage write API to be non-transactional and avoid unnecessary locks on records under heavy contention [@collado-mike](https://github.com/collado-mike) ### Fixed diff --git a/api/src/main/java/marquez/db/JobContextDao.java b/api/src/main/java/marquez/db/JobContextDao.java index 0f66c21a86..2d2cae779d 100644 --- a/api/src/main/java/marquez/db/JobContextDao.java +++ b/api/src/main/java/marquez/db/JobContextDao.java @@ -12,20 +12,26 @@ import marquez.db.models.JobContextRow; import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; @RegisterRowMapper(JobContextRowMapper.class) public interface JobContextDao { @SqlQuery("SELECT * FROM job_contexts WHERE uuid = :uuid") Optional findContextByUuid(UUID uuid); - @SqlQuery( + @SqlQuery("SELECT * FROM job_contexts WHERE checksum=:checksum") + Optional findContextByChecksum(String checksum); + + default JobContextRow upsert(UUID uuid, Instant now, String context, String checksum) { + doUpsert(uuid, now, context, checksum); + return findContextByChecksum(checksum).orElseThrow(); + } + + @SqlUpdate( "INSERT INTO job_contexts " + "(uuid, created_at, context, checksum) " + "VALUES " + "(:uuid, :now, :context, :checksum) " - + "ON CONFLICT (checksum) DO " - + "UPDATE SET " - + "context = EXCLUDED.context " - + "RETURNING *") - JobContextRow upsert(UUID uuid, Instant now, String context, String checksum); + + "ON CONFLICT (checksum) DO NOTHING") + void doUpsert(UUID uuid, Instant now, String context, String checksum); } diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 6ac8025ab4..28fcc5eb18 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -36,7 +36,6 @@ import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; -import org.jdbi.v3.sqlobject.transaction.Transaction; /** The DAO for {@code JobVersion}. */ @RegisterRowMapper(ExtendedJobVersionRowMapper.class) @@ -294,7 +293,6 @@ default List findOutputDatasetsFor(UUID jobVersionUuid) { * @param transitionedAt The timestamp of the run state transition. * @return A {@link BagOfJobVersionInfo} object. */ - @Transaction default BagOfJobVersionInfo upsertJobVersionOnRunTransition( @NonNull String namespaceName, @NonNull String jobName, @@ -327,7 +325,10 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( final Version jobVersion = Utils.newJobVersionFor( NamespaceName.of(jobRow.getNamespaceName()), - JobName.of(jobRow.getName()), + JobName.of( + Optional.ofNullable(jobRow.getParentJobName()) + .map(pn -> pn + "." + jobRow.getSimpleName()) + .orElse(jobRow.getName())), toDatasetIds(jobVersionInputs), toDatasetIds(jobVersionOutputs), jobContext, diff --git a/api/src/main/java/marquez/db/NamespaceDao.java b/api/src/main/java/marquez/db/NamespaceDao.java index 6f566f84fc..a7f36e2336 100644 --- a/api/src/main/java/marquez/db/NamespaceDao.java +++ b/api/src/main/java/marquez/db/NamespaceDao.java @@ -78,7 +78,27 @@ default Namespace upsertNamespaceMeta( @SqlQuery("SELECT * FROM namespaces ORDER BY name LIMIT :limit OFFSET :offset") List findAll(int limit, int offset); - @SqlQuery( + default NamespaceRow upsertNamespaceRow( + UUID uuid, Instant now, String name, String currentOwnerName) { + doUpsertNamespaceRow(uuid, now, name, currentOwnerName); + return findNamespaceByName(name).orElseThrow(); + } + + /** + * This query is executed by the OpenLineage write path, meaning namespaces are written to a LOT. + * Updating the record to modify the updateAt timestamp means the same namespace is often under + * heavy contention unnecessarily (it's really not being updated), causing some requests to wait + * for a lock while other requests are finishing. If a single namespace is under heavy contention, + * this can cause some requests to wait a long time - i.e., minutes. This causes unacceptable + * latency and failures in the write path. Avoid any updates in this query to avoid unnecessary + * locks. + * + * @param uuid + * @param now + * @param name + * @param currentOwnerName + */ + @SqlUpdate( "INSERT INTO namespaces ( " + "uuid, " + "created_at, " @@ -91,10 +111,8 @@ default Namespace upsertNamespaceMeta( + ":now, " + ":name, " + ":currentOwnerName) " - + "ON CONFLICT(name) DO " - + "UPDATE SET updated_at = EXCLUDED.updated_at " - + "RETURNING *") - NamespaceRow upsertNamespaceRow(UUID uuid, Instant now, String name, String currentOwnerName); + + "ON CONFLICT(name) DO NOTHING") + void doUpsertNamespaceRow(UUID uuid, Instant now, String name, String currentOwnerName); @SqlQuery( "INSERT INTO namespaces ( " diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 9ee3272f6a..f2b69735bb 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -55,7 +55,6 @@ import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; import org.jdbi.v3.sqlobject.statement.SqlUpdate; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.postgresql.util.PGobject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +82,6 @@ void createLineageEvent( PGobject event, String producer); - @Transaction default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) { UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper); RunState runState = getRunState(event.getEventType()); diff --git a/api/src/main/java/marquez/db/RunArgsDao.java b/api/src/main/java/marquez/db/RunArgsDao.java index 12f52c7bdc..973abec616 100644 --- a/api/src/main/java/marquez/db/RunArgsDao.java +++ b/api/src/main/java/marquez/db/RunArgsDao.java @@ -6,15 +6,25 @@ package marquez.db; import java.time.Instant; +import java.util.Optional; import java.util.UUID; import marquez.db.mappers.RunArgsRowMapper; import marquez.db.models.RunArgsRow; import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; @RegisterRowMapper(RunArgsRowMapper.class) public interface RunArgsDao { - @SqlQuery( + default RunArgsRow upsertRunArgs(UUID uuid, Instant now, String args, String checksum) { + doUpsertRunArgs(uuid, now, args, checksum); + return findRunArgsByChecksum(checksum).orElseThrow(); + } + + @SqlQuery("SELECT * FROM run_args WHERE checksum=:checksum") + Optional findRunArgsByChecksum(String checksum); + + @SqlUpdate( "INSERT INTO run_args ( " + "uuid, " + "created_at, " @@ -25,9 +35,6 @@ public interface RunArgsDao { + ":now, " + ":args, " + ":checksum " - + ") ON CONFLICT(checksum) DO " - + "UPDATE SET " - + "args = :args " - + "RETURNING *") - RunArgsRow upsertRunArgs(UUID uuid, Instant now, String args, String checksum); + + ") ON CONFLICT(checksum) DO NOTHING") + void doUpsertRunArgs(UUID uuid, Instant now, String args, String checksum); }