Skip to content

Commit

Permalink
Merge pull request #6 from grcevski/spacetime_transactions
Browse files Browse the repository at this point in the history
Chaining transaction ops
  • Loading branch information
henningandersen authored Nov 19, 2021
2 parents d8c046c + 816ab60 commit 6d9d61c
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardTransactionRegistry;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -64,6 +65,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
Expand All @@ -79,6 +81,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ

private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
private static final ShardTransactionRegistry transactionRegistry = new ShardTransactionRegistry();

@Inject
public TransportShardBulkAction(
Expand Down Expand Up @@ -186,10 +189,12 @@ public static void performOnPrimary(

@Override
protected void doRun() throws Exception {
String uid = UUIDs.base64UUID();
long transactionId = -1L;
TxID txID1 = TxID.create();
Translog.Location[] transactionId = new Translog.Location[1];
try {
transactionId = primary.startTransaction(uid);
transactionId[0] = primary.startTransaction(txID1.id());
transactionRegistry.registerTransaction(txID1, Set.of(transactionId[0].id()));

while (context.hasMoreOperationsToExecute()) {
if (executeBulkItemRequest(
context,
Expand All @@ -207,12 +212,12 @@ protected void doRun() throws Exception {
assert context.isInitial(); // either completed and moved to next or reset
}

primary.commitTransaction(uid, transactionId);
primary.commitTransaction(transactionId);
} catch (Exception x) {
logger.warn("Encountered an error while executing bulk transaction", x);
primary.rollbackTransaction(uid, transactionId);
primary.rollbackTransaction(transactionId);
} finally {
primary.closeTransaction(uid, transactionId);
primary.closeTransaction(transactionId);
}
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
// We're done, there's no more operations to execute so we resolve the wrapped listener
Expand Down Expand Up @@ -282,7 +287,7 @@ static boolean executeBulkItemRequest(
mappingUpdater,
waitForMappingUpdate,
itemDoneListener,
IndexShard.NO_TRANSACTION_ID
new Translog.Location[] {IndexShard.NO_TRANSACTION_ID}
);
}

Expand All @@ -298,7 +303,7 @@ static boolean executeBulkItemRequest(
MappingUpdatePerformer mappingUpdater,
Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<Void> itemDoneListener,
long transactionId
Translog.Location[] transactionId
) throws Exception {
final DocWriteRequest.OpType opType = context.getCurrent().opType();

Expand Down Expand Up @@ -344,7 +349,7 @@ static boolean executeBulkItemRequest(
request.versionType(),
request.ifSeqNo(),
request.ifPrimaryTerm(),
transactionId
transactionId[0]
);
} else {
final IndexRequest request = context.getRequestToExecute();
Expand All @@ -363,9 +368,14 @@ static boolean executeBulkItemRequest(
request.ifPrimaryTerm(),
request.getAutoGeneratedTimestamp(),
request.isRetry(),
transactionId
transactionId[0]
);
}

if (result.getTranslogLocation() != null) {
transactionId[0] = result.getTranslogLocation();
}

if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

try {
Expand Down Expand Up @@ -564,7 +574,8 @@ protected int replicaOperationCount(BulkShardRequest request) {
return request.items().length;
}

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica, long transactionId) throws Exception {
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica, Translog.Location transactionId)
throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
Expand Down Expand Up @@ -604,7 +615,7 @@ private static Engine.Result performOpOnReplica(
DocWriteResponse primaryResponse,
DocWriteRequest<?> docWriteRequest,
IndexShard replica,
long transactionId
Translog.Location transactionId
) throws Exception {
final Engine.Result result;
switch (docWriteRequest.opType()) {
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/action/bulk/TxID.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,8 @@ public int hashCode() {
public String toString() {
return "[tx=" + id + "]";
}

public String id() {
return id;
}
}
25 changes: 13 additions & 12 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,13 @@ public Condition newCondition() {
}
}

public abstract long startTransaction(String id) throws IOException;
public abstract Translog.Location startTransaction(String id) throws IOException;

public abstract boolean commitTransaction(String id, long transactionId) throws IOException;
public abstract Translog.Location commitTransaction(Translog.Location prevId) throws IOException;

public abstract boolean rollbackTransaction(String id, long transactionId) throws IOException;
public abstract Translog.Location rollbackTransaction(Translog.Location prevId) throws IOException;

public abstract boolean closeTransaction(String id, long transactionId) throws IOException;
public abstract Translog.Location closeTransaction(Translog.Location prevId) throws IOException;

/**
* Perform document index operation on the engine
Expand Down Expand Up @@ -1372,7 +1372,7 @@ public static class Index extends Operation {
private final boolean isRetry;
private final long ifSeqNo;
private final long ifPrimaryTerm;
private final long transactionId;
private final Translog.Location transactionId;

public Index(
Term uid,
Expand Down Expand Up @@ -1401,7 +1401,7 @@ public Index(
isRetry,
ifSeqNo,
ifPrimaryTerm,
-1L
new Translog.Location(0, 0, 0)
);
}

Expand All @@ -1418,7 +1418,7 @@ public Index(
boolean isRetry,
long ifSeqNo,
long ifPrimaryTerm,
long transactionId
Translog.Location transactionId
) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
Expand Down Expand Up @@ -1511,7 +1511,7 @@ public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}

public long getTransactionId() {
public Translog.Location getTransactionId() {
return transactionId;
}
}
Expand All @@ -1521,7 +1521,7 @@ public static class Delete extends Operation {
private final String id;
private final long ifSeqNo;
private final long ifPrimaryTerm;
private final long transactionId;
private final Translog.Location transactionId;

public Delete(
String id,
Expand All @@ -1535,7 +1535,8 @@ public Delete(
long ifSeqNo,
long ifPrimaryTerm
) {
this(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm, -1L);
this(id, uid, seqNo, primaryTerm, version, versionType, origin,
startTime, ifSeqNo, ifPrimaryTerm, new Translog.Location(0, 0, 0));
}

public Delete(
Expand All @@ -1549,7 +1550,7 @@ public Delete(
long startTime,
long ifSeqNo,
long ifPrimaryTerm,
long transactionId
Translog.Location transactionId
) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
Expand Down Expand Up @@ -1616,7 +1617,7 @@ public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}

public long getTransactionId() {
public Translog.Location getTransactionId() {
return transactionId;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,8 @@ public IndexResult index(Index index) throws IOException {
index.getAutoGeneratedIdTimestamp(),
index.isRetry(),
index.getIfSeqNo(),
index.getIfPrimaryTerm()
index.getIfPrimaryTerm(),
index.getTransactionId()
);

final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
Expand Down Expand Up @@ -2078,27 +2079,48 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws E
}

@Override
public long startTransaction(String id) throws IOException {
Translog.Location location = translog.add(new Translog.TxStart(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime()))));
return location.translogLocation;
public Translog.Location startTransaction(String id) throws IOException {
return translog.add(new Translog.TxStart(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime()))));
}

@Override
public boolean commitTransaction(String id, long transactionId) throws IOException {
translog.add(new Translog.TxCommit(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
return true;
public Translog.Location commitTransaction(Translog.Location prevId) throws IOException {
Translog.Location loc = prevId;

while (loc != null) {
Translog.Operation op = translog.readOperation(loc);
if (op == null) {
logger.error("Couldn't read translog location " + loc);
break;
}

logger.info("Committing op " + op);

if (op instanceof Translog.TransactionMember) {
loc = ((Translog.TransactionMember)op).getTransactionId();
} else if (op instanceof Translog.TxStart) {
break;
} else {
logger.error("Found op that doesn't have transaction loc?");
break;
}
}

return translog.add(
new Translog.TxCommit(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));

}

@Override
public boolean rollbackTransaction(String id, long transactionId) throws IOException {
translog.add(new Translog.TxRollback(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
return true;
public Translog.Location rollbackTransaction(Translog.Location prevId) throws IOException {
return translog.add(
new Translog.TxRollback(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));
}

@Override
public boolean closeTransaction(String id, long transactionId) throws IOException {
translog.add(new Translog.TxClose(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
return true;
public Translog.Location closeTransaction(Translog.Location prevId) throws IOException {
return translog.add(
new Translog.TxClose(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));
}

private void pruneDeletedTombstones() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,23 +528,23 @@ public void skipTranslogRecovery() {}
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {}

@Override
public long startTransaction(String id) throws IOException {
return 0;
public Translog.Location startTransaction(String id) throws IOException {
return null;
}

@Override
public boolean commitTransaction(String id, long transactionId) throws IOException {
return false;
public Translog.Location commitTransaction(Translog.Location transactionId) throws IOException {
return null;
}

@Override
public boolean rollbackTransaction(String id, long transactionId) throws IOException {
return false;
public Translog.Location rollbackTransaction(Translog.Location transactionId) throws IOException {
return null;
}

@Override
public boolean closeTransaction(String id, long transactionId) throws IOException {
return false;
public Translog.Location closeTransaction(Translog.Location transactionId) throws IOException {
return null;
}

@Override
Expand Down
Loading

0 comments on commit 6d9d61c

Please sign in to comment.