Skip to content

Commit

Permalink
fix(version): forUpdate needed for versioning (#11328)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Sep 10, 2024
1 parent cf49f80 commit c6eea1e
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,30 @@ Map<EntityAspectIdentifier, EntityAspect> batchGet(
List<EntityAspect> getAspectsInRange(
@Nonnull Urn urn, Set<String> aspectNames, long startTimeMillis, long endTimeMillis);

/**
* @param urn urn to fetch
* @param aspectName aspect to fetch
* @param forUpdate set to true if the result is used for versioning <a
* href="https://ebean.io/docs/query/option#forUpdate">link</a>
* @return
*/
@Nullable
default EntityAspect getLatestAspect(
@Nonnull final String urn, @Nonnull final String aspectName) {
return getLatestAspects(Map.of(urn, Set.of(aspectName)))
@Nonnull final String urn, @Nonnull final String aspectName, boolean forUpdate) {
return getLatestAspects(Map.of(urn, Set.of(aspectName)), forUpdate)
.getOrDefault(urn, Map.of())
.getOrDefault(aspectName, null);
}

/**
* @param urnAspects urn/aspects to fetch
* @param forUpdate set to true if the result is used for versioning <a
* href="https://ebean.io/docs/query/option#forUpdate">link</a>
* @return the data
*/
@Nonnull
Map<String, Map<String, EntityAspect>> getLatestAspects(Map<String, Set<String>> urnAspects);
Map<String, Map<String, EntityAspect>> getLatestAspects(
Map<String, Set<String>> urnAspects, boolean forUpdate);

void saveAspect(
@Nullable Transaction tx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
final Map<String, Map<String, SystemAspect>> latestAspects =
EntityUtils.toSystemAspects(
opContext.getRetrieverContext().get(),
aspectDao.getLatestAspects(urnAspects));
aspectDao.getLatestAspects(urnAspects, true));
// read #2 (potentially)
final Map<String, Map<String, Long>> nextVersions =
EntityUtils.calculateNextVersions(aspectDao, latestAspects, urnAspects);
Expand All @@ -866,7 +866,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
Map<String, Map<String, SystemAspect>> newLatestAspects =
EntityUtils.toSystemAspects(
opContext.getRetrieverContext().get(),
aspectDao.getLatestAspects(updatedItems.getFirst()));
aspectDao.getLatestAspects(updatedItems.getFirst(), true));
// merge
updatedLatestAspects = AspectsBatch.merge(latestAspects, newLatestAspects);

Expand Down Expand Up @@ -2064,7 +2064,7 @@ public RollbackRunResult deleteUrn(@Nonnull OperationContext opContext, Urn urn)

EntityAspect latestKey = null;
try {
latestKey = aspectDao.getLatestAspect(urn.toString(), keyAspectName);
latestKey = aspectDao.getLatestAspect(urn.toString(), keyAspectName, false);
} catch (EntityNotFoundException e) {
log.warn("Entity to delete does not exist. {}", urn.toString());
}
Expand Down Expand Up @@ -2217,7 +2217,7 @@ private RollbackResult deleteAspectWithoutMCL(
(EntityAspect.EntitySystemAspect)
EntityUtils.toSystemAspect(
opContext.getRetrieverContext().get(),
aspectDao.getLatestAspect(urn, aspectName))
aspectDao.getLatestAspect(urn, aspectName, false))
.orElse(null);

// 1.1 If no latest exists, skip this aspect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ private boolean validateConnection() {
}

@Override
public EntityAspect getLatestAspect(@Nonnull String urn, @Nonnull String aspectName) {
public EntityAspect getLatestAspect(
@Nonnull String urn, @Nonnull String aspectName, boolean forUpdate) {
validateConnection();
return getAspect(urn, aspectName, ASPECT_LATEST_VERSION);
}

@Override
public Map<String, Map<String, EntityAspect>> getLatestAspects(
Map<String, Set<String>> urnAspects) {
Map<String, Set<String>> urnAspects, boolean forUpdate) {
return urnAspects.entrySet().stream()
.map(
entry ->
Expand All @@ -97,7 +98,8 @@ public Map<String, Map<String, EntityAspect>> getLatestAspects(
entry.getValue().stream()
.map(
aspectName -> {
EntityAspect aspect = getLatestAspect(entry.getKey(), aspectName);
EntityAspect aspect =
getLatestAspect(entry.getKey(), aspectName, forUpdate);
return aspect != null ? Map.entry(aspectName, aspect) : null;
})
.filter(Objects::nonNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void saveEbeanAspect(

@Override
public Map<String, Map<String, EntityAspect>> getLatestAspects(
@Nonnull Map<String, Set<String>> urnAspects) {
@Nonnull Map<String, Set<String>> urnAspects, boolean forUpdate) {
validateConnection();

List<EbeanAspectV2.PrimaryKey> keys =
Expand All @@ -256,7 +256,12 @@ public Map<String, Map<String, EntityAspect>> getLatestAspects(
entry.getKey(), aspect, ASPECT_LATEST_VERSION)))
.collect(Collectors.toList());

List<EbeanAspectV2> results = _server.find(EbeanAspectV2.class).where().idIn(keys).findList();
final List<EbeanAspectV2> results;
if (forUpdate) {
results = _server.find(EbeanAspectV2.class).where().idIn(keys).forUpdate().findList();
} else {
results = _server.find(EbeanAspectV2.class).where().idIn(keys).findList();
}

return toUrnAspectMap(results);
}
Expand Down Expand Up @@ -814,7 +819,8 @@ public Map<String, Map<String, Long>> getNextVersions(
return result;
}

List<EbeanAspectV2.PrimaryKey> dbResults = exp.endOr().findIds();
// forUpdate is required to avoid duplicate key violations
List<EbeanAspectV2.PrimaryKey> dbResults = exp.endOr().forUpdate().findIds();

for (EbeanAspectV2.PrimaryKey key : dbResults) {
if (result.get(key.getUrn()).get(key.getAspect()) <= key.getVersion()) {
Expand Down

0 comments on commit c6eea1e

Please sign in to comment.