diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java index 646b995f87d009..401d40ec177cee 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java @@ -51,16 +51,30 @@ Map batchGet( List getAspectsInRange( @Nonnull Urn urn, Set aspectNames, long startTimeMillis, long endTimeMillis); + /** + * @param urn urn to fetch + * @param aspectName aspect to fetch + * @param forUpdate set to true if the result is used for versioning link + * @return + */ @Nullable default EntityAspect getLatestAspect( - @Nonnull final String urn, @Nonnull final String aspectName) { - return getLatestAspects(Map.of(urn, Set.of(aspectName))) + @Nonnull final String urn, @Nonnull final String aspectName, boolean forUpdate) { + return getLatestAspects(Map.of(urn, Set.of(aspectName)), forUpdate) .getOrDefault(urn, Map.of()) .getOrDefault(aspectName, null); } + /** + * @param urnAspects urn/aspects to fetch + * @param forUpdate set to true if the result is used for versioning link + * @return the data + */ @Nonnull - Map> getLatestAspects(Map> urnAspects); + Map> getLatestAspects( + Map> urnAspects, boolean forUpdate); void saveAspect( @Nullable Transaction tx, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 2e12b52194ffbb..fd6ad57c0adf52 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -849,7 +849,7 @@ private List ingestAspectsToLocalDB( final Map> latestAspects = EntityUtils.toSystemAspects( opContext.getRetrieverContext().get(), - aspectDao.getLatestAspects(urnAspects)); + aspectDao.getLatestAspects(urnAspects, true)); // read #2 (potentially) final Map> nextVersions = EntityUtils.calculateNextVersions(aspectDao, latestAspects, urnAspects); @@ -866,7 +866,7 @@ private List ingestAspectsToLocalDB( Map> newLatestAspects = EntityUtils.toSystemAspects( opContext.getRetrieverContext().get(), - aspectDao.getLatestAspects(updatedItems.getFirst())); + aspectDao.getLatestAspects(updatedItems.getFirst(), true)); // merge updatedLatestAspects = AspectsBatch.merge(latestAspects, newLatestAspects); @@ -2064,7 +2064,7 @@ public RollbackRunResult deleteUrn(@Nonnull OperationContext opContext, Urn urn) EntityAspect latestKey = null; try { - latestKey = aspectDao.getLatestAspect(urn.toString(), keyAspectName); + latestKey = aspectDao.getLatestAspect(urn.toString(), keyAspectName, false); } catch (EntityNotFoundException e) { log.warn("Entity to delete does not exist. {}", urn.toString()); } @@ -2217,7 +2217,7 @@ private RollbackResult deleteAspectWithoutMCL( (EntityAspect.EntitySystemAspect) EntityUtils.toSystemAspect( opContext.getRetrieverContext().get(), - aspectDao.getLatestAspect(urn, aspectName)) + aspectDao.getLatestAspect(urn, aspectName, false)) .orElse(null); // 1.1 If no latest exists, skip this aspect diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java index 15c37b6c0085f7..51f898d3122af3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java @@ -81,14 +81,15 @@ private boolean validateConnection() { } @Override - public EntityAspect getLatestAspect(@Nonnull String urn, @Nonnull String aspectName) { + public EntityAspect getLatestAspect( + @Nonnull String urn, @Nonnull String aspectName, boolean forUpdate) { validateConnection(); return getAspect(urn, aspectName, ASPECT_LATEST_VERSION); } @Override public Map> getLatestAspects( - Map> urnAspects) { + Map> urnAspects, boolean forUpdate) { return urnAspects.entrySet().stream() .map( entry -> @@ -97,7 +98,8 @@ public Map> getLatestAspects( entry.getValue().stream() .map( aspectName -> { - EntityAspect aspect = getLatestAspect(entry.getKey(), aspectName); + EntityAspect aspect = + getLatestAspect(entry.getKey(), aspectName, forUpdate); return aspect != null ? Map.entry(aspectName, aspect) : null; }) .filter(Objects::nonNull) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 524e947476122b..93c06b9236d501 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -242,7 +242,7 @@ private void saveEbeanAspect( @Override public Map> getLatestAspects( - @Nonnull Map> urnAspects) { + @Nonnull Map> urnAspects, boolean forUpdate) { validateConnection(); List keys = @@ -256,7 +256,12 @@ public Map> getLatestAspects( entry.getKey(), aspect, ASPECT_LATEST_VERSION))) .collect(Collectors.toList()); - List results = _server.find(EbeanAspectV2.class).where().idIn(keys).findList(); + final List results; + if (forUpdate) { + results = _server.find(EbeanAspectV2.class).where().idIn(keys).forUpdate().findList(); + } else { + results = _server.find(EbeanAspectV2.class).where().idIn(keys).findList(); + } return toUrnAspectMap(results); } @@ -814,7 +819,8 @@ public Map> getNextVersions( return result; } - List dbResults = exp.endOr().findIds(); + // forUpdate is required to avoid duplicate key violations + List dbResults = exp.endOr().forUpdate().findIds(); for (EbeanAspectV2.PrimaryKey key : dbResults) { if (result.get(key.getUrn()).get(key.getAspect()) <= key.getVersion()) {