diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java index ceb18bb90..465059367 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java @@ -37,7 +37,6 @@ import cz.o2.proxima.direct.core.OnlineAttributeWriter; import cz.o2.proxima.direct.core.transform.DirectElementWiseTransform; import cz.o2.proxima.internal.com.google.common.base.Preconditions; -import cz.o2.proxima.internal.com.google.common.collect.Iterables; import java.net.URI; import java.util.ArrayList; import java.util.Collection; @@ -247,9 +246,8 @@ public void commitWrite(List outputs, CommitCallback callback) } catch (TransactionRejectedRuntimeException ex) { throw (TransactionRejectedException) ex.getCause(); } - StreamElement toWrite = getSingleOrCommit(transformed); - OnlineAttributeWriter writer = - transformed.size() == 1 && !isGlobalTransaction ? delegate : commitDelegate; + StreamElement toWrite = getCommit(transformed); + OnlineAttributeWriter writer = commitDelegate; List keyAttributes = transformed.stream().map(KeyAttributes::ofStreamElement).collect(Collectors.toList()); boolean anyDisallowed = keyAttributes.stream().anyMatch(KeyAttribute::isWildcardQuery); @@ -275,10 +273,12 @@ public void commitWrite(List outputs, CommitCallback callback) rollback(); } log.debug( - "Committed outputs {} (via {}) of transaction {}", + "Committed outputs {} (via {}) of transaction {}: ({}, {})", transformed, toWrite, - transactionId); + transactionId, + succ, + (Object) exc); callback.commit(succ, exc); }; state = State.Flags.COMMITTED; @@ -351,10 +351,7 @@ private void applyTransform( } } - private StreamElement getSingleOrCommit(Collection outputs) { - if (outputs.size() == 1 && !isGlobalTransaction) { - return Iterables.getOnlyElement(outputs); - } + private StreamElement getCommit(Collection outputs) { return manager .getCommitDesc() .upsert(transactionId, stamp, Commit.of(sequenceId, stamp, outputs));