Skip to content

Commit

Permalink
fix flaky TransactionManager tests (#4897)
Browse files Browse the repository at this point in the history
Transactions can always be aborted by Cloud Spanner, and all transaction
code should therefore be surrounded by with abort safeguarding.
  • Loading branch information
olavloite authored and kolea2 committed Apr 3, 2019
1 parent ef0e167 commit 4368c24
Showing 1 changed file with 108 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,85 +63,131 @@ public static void setUpDatabase() {
client = env.getTestHelper().getDatabaseClient(db);
}

@SuppressWarnings("resource")
@Test
public void simpleInsert() {
TransactionManager manager = client.transactionManager();
TransactionContext txn = manager.begin();
assertThat(manager.getState()).isEqualTo(TransactionState.STARTED);
txn.buffer(
Mutation.newInsertBuilder("T").set("K").to("Key1").set("BoolValue").to(true).build());
manager.commit();
assertThat(manager.getState()).isEqualTo(TransactionState.COMMITTED);
Struct row = client.singleUse().readRow("T", Key.of("Key1"), Arrays.asList("K", "BoolValue"));
assertThat(row.getString(0)).isEqualTo("Key1");
assertThat(row.getBoolean(1)).isTrue();
public void simpleInsert() throws InterruptedException {
try (TransactionManager manager = client.transactionManager()) {
TransactionContext txn = manager.begin();
while (true) {
assertThat(manager.getState()).isEqualTo(TransactionState.STARTED);
txn.buffer(
Mutation.newInsertBuilder("T").set("K").to("Key1").set("BoolValue").to(true).build());
try {
manager.commit();
assertThat(manager.getState()).isEqualTo(TransactionState.COMMITTED);
Struct row =
client.singleUse().readRow("T", Key.of("Key1"), Arrays.asList("K", "BoolValue"));
assertThat(row.getString(0)).isEqualTo("Key1");
assertThat(row.getBoolean(1)).isTrue();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
txn = manager.resetForRetry();
}
}
}
}

@SuppressWarnings("resource")
@Test
public void invalidInsert() {
TransactionManager manager = client.transactionManager();
TransactionContext txn = manager.begin();
txn.buffer(
Mutation.newInsertBuilder("InvalidTable")
.set("K")
.to("Key1")
.set("BoolValue")
.to(true)
.build());
try {
manager.commit();
fail("Expected exception");
} catch (SpannerException e) {
// expected
public void invalidInsert() throws InterruptedException {
try (TransactionManager manager = client.transactionManager()) {
TransactionContext txn = manager.begin();
while (true) {
txn.buffer(
Mutation.newInsertBuilder("InvalidTable")
.set("K")
.to("Key1")
.set("BoolValue")
.to(true)
.build());
try {
manager.commit();
fail("Expected exception");
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
txn = manager.resetForRetry();
} catch (SpannerException e) {
// expected
break;
}
}
assertThat(manager.getState()).isEqualTo(TransactionState.COMMIT_FAILED);
// We cannot retry for non aborted errors.
expectedException.expect(IllegalStateException.class);
manager.resetForRetry();
}
assertThat(manager.getState()).isEqualTo(TransactionState.COMMIT_FAILED);
// We cannot retry for non aborted errors.
expectedException.expect(IllegalStateException.class);
manager.resetForRetry();
}

@SuppressWarnings("resource")
@Test
public void rollback() {
TransactionManager manager = client.transactionManager();
TransactionContext txn = manager.begin();
txn.buffer(
Mutation.newInsertBuilder("T").set("K").to("Key2").set("BoolValue").to(true).build());
manager.rollback();
assertThat(manager.getState()).isEqualTo(TransactionState.ROLLED_BACK);
// Row should not have been inserted.
assertThat(client.singleUse().readRow("T", Key.of("Key2"), Arrays.asList("K", "BoolValue")))
.isNull();
public void rollback() throws InterruptedException {
try (TransactionManager manager = client.transactionManager()) {
TransactionContext txn = manager.begin();
while (true) {
txn.buffer(
Mutation.newInsertBuilder("T").set("K").to("Key2").set("BoolValue").to(true).build());
try {
manager.rollback();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
txn = manager.resetForRetry();
}
}
assertThat(manager.getState()).isEqualTo(TransactionState.ROLLED_BACK);
// Row should not have been inserted.
assertThat(client.singleUse().readRow("T", Key.of("Key2"), Arrays.asList("K", "BoolValue")))
.isNull();
}
}

@SuppressWarnings("resource")
@Test
public void abortAndRetry() {
public void abortAndRetry() throws InterruptedException {
client.write(
Arrays.asList(
Mutation.newInsertBuilder("T").set("K").to("Key3").set("BoolValue").to(true).build()));
TransactionManager manager1 = client.transactionManager();
TransactionContext txn1 = manager1.begin();
txn1.readRow("T", Key.of("Key3"), Arrays.asList("K", "BoolValue"));
TransactionManager manager2 = client.transactionManager();
TransactionContext txn2 = manager2.begin();
txn2.readRow("T", Key.of("Key3"), Arrays.asList("K", "BoolValue"));
try (TransactionManager manager1 = client.transactionManager()) {
TransactionContext txn1 = manager1.begin();
TransactionManager manager2;
TransactionContext txn2;
while (true) {
try {
txn1.readRow("T", Key.of("Key3"), Arrays.asList("K", "BoolValue"));
manager2 = client.transactionManager();
txn2 = manager2.begin();
txn2.readRow("T", Key.of("Key3"), Arrays.asList("K", "BoolValue"));

txn1.buffer(
Mutation.newUpdateBuilder("T").set("K").to("Key3").set("BoolValue").to(false).build());
manager1.commit();
txn1.buffer(
Mutation.newUpdateBuilder("T")
.set("K")
.to("Key3")
.set("BoolValue")
.to(false)
.build());
manager1.commit();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
txn1 = manager1.resetForRetry();
}
}

// txn2 should have been aborted.
try {
// txn2 should have been aborted.
try {
manager2.commit();
fail("Expected to abort");
} catch (AbortedException e) {
assertThat(manager2.getState()).isEqualTo(TransactionState.ABORTED);
txn2 = manager2.resetForRetry();
}
txn2.buffer(
Mutation.newUpdateBuilder("T").set("K").to("Key3").set("BoolValue").to(true).build());
manager2.commit();
fail("Expected to abort");
} catch (AbortedException e) {
assertThat(manager2.getState()).isEqualTo(TransactionState.ABORTED);
txn2 = manager2.resetForRetry();
Struct row = client.singleUse().readRow("T", Key.of("Key3"), Arrays.asList("K", "BoolValue"));
assertThat(row.getString(0)).isEqualTo("Key3");
assertThat(row.getBoolean(1)).isTrue();
}
txn2.buffer(
Mutation.newUpdateBuilder("T").set("K").to("Key3").set("BoolValue").to(true).build());
manager2.commit();
Struct row = client.singleUse().readRow("T", Key.of("Key3"), Arrays.asList("K", "BoolValue"));
assertThat(row.getString(0)).isEqualTo("Key3");
assertThat(row.getBoolean(1)).isTrue();
}
}

0 comments on commit 4368c24

Please sign in to comment.