Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chaining transaction ops #6

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardTransactionRegistry;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -64,6 +65,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
Expand All @@ -79,6 +81,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ

private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
private static final ShardTransactionRegistry transactionRegistry = new ShardTransactionRegistry();

@Inject
public TransportShardBulkAction(
Expand Down Expand Up @@ -186,10 +189,12 @@ public static void performOnPrimary(

@Override
protected void doRun() throws Exception {
String uid = UUIDs.base64UUID();
long transactionId = -1L;
TxID txID1 = TxID.create();
Translog.Location[] transactionId = new Translog.Location[1];
try {
transactionId = primary.startTransaction(uid);
transactionId[0] = primary.startTransaction(txID1.id());
transactionRegistry.registerTransaction(txID1, Set.of(transactionId[0].id()));

while (context.hasMoreOperationsToExecute()) {
if (executeBulkItemRequest(
context,
Expand All @@ -207,12 +212,12 @@ protected void doRun() throws Exception {
assert context.isInitial(); // either completed and moved to next or reset
}

primary.commitTransaction(uid, transactionId);
primary.commitTransaction(transactionId);
} catch (Exception x) {
logger.warn("Encountered an error while executing bulk transaction", x);
primary.rollbackTransaction(uid, transactionId);
primary.rollbackTransaction(transactionId);
} finally {
primary.closeTransaction(uid, transactionId);
primary.closeTransaction(transactionId);
}
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
// We're done, there's no more operations to execute so we resolve the wrapped listener
Expand Down Expand Up @@ -282,7 +287,7 @@ static boolean executeBulkItemRequest(
mappingUpdater,
waitForMappingUpdate,
itemDoneListener,
IndexShard.NO_TRANSACTION_ID
new Translog.Location[] {IndexShard.NO_TRANSACTION_ID}
);
}

Expand All @@ -298,7 +303,7 @@ static boolean executeBulkItemRequest(
MappingUpdatePerformer mappingUpdater,
Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<Void> itemDoneListener,
long transactionId
Translog.Location[] transactionId
) throws Exception {
final DocWriteRequest.OpType opType = context.getCurrent().opType();

Expand Down Expand Up @@ -344,7 +349,7 @@ static boolean executeBulkItemRequest(
request.versionType(),
request.ifSeqNo(),
request.ifPrimaryTerm(),
transactionId
transactionId[0]
);
} else {
final IndexRequest request = context.getRequestToExecute();
Expand All @@ -363,9 +368,14 @@ static boolean executeBulkItemRequest(
request.ifPrimaryTerm(),
request.getAutoGeneratedTimestamp(),
request.isRetry(),
transactionId
transactionId[0]
);
}

if (result.getTranslogLocation() != null) {
transactionId[0] = result.getTranslogLocation();
}

if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

try {
Expand Down Expand Up @@ -564,7 +574,8 @@ protected int replicaOperationCount(BulkShardRequest request) {
return request.items().length;
}

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica, long transactionId) throws Exception {
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica, Translog.Location transactionId)
throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
Expand Down Expand Up @@ -604,7 +615,7 @@ private static Engine.Result performOpOnReplica(
DocWriteResponse primaryResponse,
DocWriteRequest<?> docWriteRequest,
IndexShard replica,
long transactionId
Translog.Location transactionId
) throws Exception {
final Engine.Result result;
switch (docWriteRequest.opType()) {
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/action/bulk/TxID.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,8 @@ public int hashCode() {
public String toString() {
return "[tx=" + id + "]";
}

public String id() {
return id;
}
}
25 changes: 13 additions & 12 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,13 @@ public Condition newCondition() {
}
}

public abstract long startTransaction(String id) throws IOException;
public abstract Translog.Location startTransaction(String id) throws IOException;

public abstract boolean commitTransaction(String id, long transactionId) throws IOException;
public abstract Translog.Location commitTransaction(Translog.Location prevId) throws IOException;

public abstract boolean rollbackTransaction(String id, long transactionId) throws IOException;
public abstract Translog.Location rollbackTransaction(Translog.Location prevId) throws IOException;

public abstract boolean closeTransaction(String id, long transactionId) throws IOException;
public abstract Translog.Location closeTransaction(Translog.Location prevId) throws IOException;

/**
* Perform document index operation on the engine
Expand Down Expand Up @@ -1372,7 +1372,7 @@ public static class Index extends Operation {
private final boolean isRetry;
private final long ifSeqNo;
private final long ifPrimaryTerm;
private final long transactionId;
private final Translog.Location transactionId;

public Index(
Term uid,
Expand Down Expand Up @@ -1401,7 +1401,7 @@ public Index(
isRetry,
ifSeqNo,
ifPrimaryTerm,
-1L
new Translog.Location(0, 0, 0)
);
}

Expand All @@ -1418,7 +1418,7 @@ public Index(
boolean isRetry,
long ifSeqNo,
long ifPrimaryTerm,
long transactionId
Translog.Location transactionId
) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
Expand Down Expand Up @@ -1511,7 +1511,7 @@ public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}

public long getTransactionId() {
public Translog.Location getTransactionId() {
return transactionId;
}
}
Expand All @@ -1521,7 +1521,7 @@ public static class Delete extends Operation {
private final String id;
private final long ifSeqNo;
private final long ifPrimaryTerm;
private final long transactionId;
private final Translog.Location transactionId;

public Delete(
String id,
Expand All @@ -1535,7 +1535,8 @@ public Delete(
long ifSeqNo,
long ifPrimaryTerm
) {
this(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm, -1L);
this(id, uid, seqNo, primaryTerm, version, versionType, origin,
startTime, ifSeqNo, ifPrimaryTerm, new Translog.Location(0, 0, 0));
}

public Delete(
Expand All @@ -1549,7 +1550,7 @@ public Delete(
long startTime,
long ifSeqNo,
long ifPrimaryTerm,
long transactionId
Translog.Location transactionId
) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
Expand Down Expand Up @@ -1616,7 +1617,7 @@ public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}

public long getTransactionId() {
public Translog.Location getTransactionId() {
return transactionId;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,8 @@ public IndexResult index(Index index) throws IOException {
index.getAutoGeneratedIdTimestamp(),
index.isRetry(),
index.getIfSeqNo(),
index.getIfPrimaryTerm()
index.getIfPrimaryTerm(),
index.getTransactionId()
);

final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
Expand Down Expand Up @@ -2078,27 +2079,48 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws E
}

@Override
public long startTransaction(String id) throws IOException {
Translog.Location location = translog.add(new Translog.TxStart(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime()))));
return location.translogLocation;
public Translog.Location startTransaction(String id) throws IOException {
return translog.add(new Translog.TxStart(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime()))));
}

@Override
public boolean commitTransaction(String id, long transactionId) throws IOException {
translog.add(new Translog.TxCommit(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
return true;
public Translog.Location commitTransaction(Translog.Location prevId) throws IOException {
Translog.Location loc = prevId;

while (loc != null) {
Translog.Operation op = translog.readOperation(loc);
if (op == null) {
logger.error("Couldn't read translog location " + loc);
break;
}

logger.info("Committing op " + op);

if (op instanceof Translog.TransactionMember) {
loc = ((Translog.TransactionMember)op).getTransactionId();
} else if (op instanceof Translog.TxStart) {
break;
} else {
logger.error("Found op that doesn't have transaction loc?");
break;
}
}

return translog.add(
new Translog.TxCommit(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));

}

@Override
public boolean rollbackTransaction(String id, long transactionId) throws IOException {
translog.add(new Translog.TxRollback(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
return true;
public Translog.Location rollbackTransaction(Translog.Location prevId) throws IOException {
return translog.add(
new Translog.TxRollback(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));
}

@Override
public boolean closeTransaction(String id, long transactionId) throws IOException {
translog.add(new Translog.TxClose(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
return true;
public Translog.Location closeTransaction(Translog.Location prevId) throws IOException {
return translog.add(
new Translog.TxClose(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));
}

private void pruneDeletedTombstones() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,23 +528,23 @@ public void skipTranslogRecovery() {}
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {}

@Override
public long startTransaction(String id) throws IOException {
return 0;
public Translog.Location startTransaction(String id) throws IOException {
return null;
}

@Override
public boolean commitTransaction(String id, long transactionId) throws IOException {
return false;
public Translog.Location commitTransaction(Translog.Location transactionId) throws IOException {
return null;
}

@Override
public boolean rollbackTransaction(String id, long transactionId) throws IOException {
return false;
public Translog.Location rollbackTransaction(Translog.Location transactionId) throws IOException {
return null;
}

@Override
public boolean closeTransaction(String id, long transactionId) throws IOException {
return false;
public Translog.Location closeTransaction(Translog.Location transactionId) throws IOException {
return null;
}

@Override
Expand Down
Loading