Skip to content

Commit

Permalink
[proxima-direct-core] simplify transactional write logic
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Oct 23, 2023
1 parent 2d56720 commit be085bf
Showing 1 changed file with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.core.transform.DirectElementWiseTransform;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.com.google.common.collect.Iterables;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -247,9 +246,8 @@ public void commitWrite(List<StreamElement> outputs, CommitCallback callback)
} catch (TransactionRejectedRuntimeException ex) {
throw (TransactionRejectedException) ex.getCause();
}
StreamElement toWrite = getSingleOrCommit(transformed);
OnlineAttributeWriter writer =
transformed.size() == 1 && !isGlobalTransaction ? delegate : commitDelegate;
StreamElement toWrite = getCommit(transformed);
OnlineAttributeWriter writer = commitDelegate;
List<KeyAttribute> keyAttributes =
transformed.stream().map(KeyAttributes::ofStreamElement).collect(Collectors.toList());
boolean anyDisallowed = keyAttributes.stream().anyMatch(KeyAttribute::isWildcardQuery);
Expand All @@ -275,10 +273,12 @@ public void commitWrite(List<StreamElement> outputs, CommitCallback callback)
rollback();
}
log.debug(
"Committed outputs {} (via {}) of transaction {}",
"Committed outputs {} (via {}) of transaction {}: ({}, {})",
transformed,
toWrite,
transactionId);
transactionId,
succ,
(Object) exc);
callback.commit(succ, exc);
};
state = State.Flags.COMMITTED;
Expand Down Expand Up @@ -351,10 +351,7 @@ private void applyTransform(
}
}

private StreamElement getSingleOrCommit(Collection<StreamElement> outputs) {
if (outputs.size() == 1 && !isGlobalTransaction) {
return Iterables.getOnlyElement(outputs);
}
private StreamElement getCommit(Collection<StreamElement> outputs) {
return manager
.getCommitDesc()
.upsert(transactionId, stamp, Commit.of(sequenceId, stamp, outputs));
Expand Down

0 comments on commit be085bf

Please sign in to comment.