Skip to content

Commit

Permalink
Implement Lifecycle for MySqlConnection (#171)
Browse files Browse the repository at this point in the history
Motivation:

Implement `Lifecycle` for `MySqlConnection`. See also #64 

Modification:

The `MySqlConnection`.

Result:

The `MySqlConnection` implements `Lifecycle` and will rollback in
`preRelease`.
  • Loading branch information
mirromutth authored Dec 21, 2023
1 parent 9073c9d commit d4a9fe0
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
14 changes: 13 additions & 1 deletion src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Lifecycle;
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import org.jetbrains.annotations.Nullable;
Expand All @@ -52,7 +53,7 @@
/**
* An implementation of {@link Connection} for connecting to the MySQL database.
*/
public final class MySqlConnection implements Connection, ConnectionState {
public final class MySqlConnection implements Connection, Lifecycle, ConnectionState {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(MySqlConnection.class);

Expand Down Expand Up @@ -278,6 +279,17 @@ public MySqlStatement createStatement(String sql) {
return new PrepareParametrizedStatement(client, codecs, query, context, prepareCache);
}

@Override
public Mono<Void> postAllocate() {
return Mono.empty();
}

@Override
public Mono<Void> preRelease() {
// Rollback if the connection is in transaction.
return rollbackTransaction();
}

@Override
public Mono<Void> releaseSavepoint(String name) {
requireValidName(name, "Savepoint name must not be empty and not contain backticks");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;

import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED;
import static io.r2dbc.spi.IsolationLevel.READ_UNCOMMITTED;
Expand Down Expand Up @@ -52,6 +53,54 @@ void isInTransaction() {
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse()));
}

@Test
void autoRollbackPreRelease() {
// Mock pool allocate/release.
complete(conn -> conn.postAllocate()
.thenMany(conn.createStatement("CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY)")
.execute())
.flatMap(MySqlResult::getRowsUpdated)
.then(conn.beginTransaction())
.thenMany(conn.createStatement("INSERT INTO test VALUES (1)")
.execute())
.flatMap(MySqlResult::getRowsUpdated)
.single()
.doOnNext(it -> assertThat(it).isEqualTo(1))
.doOnSuccess(ignored -> assertThat(conn.isInTransaction()).isTrue())
.then(conn.preRelease())
.doOnSuccess(ignored -> assertThat(conn.isInTransaction()).isFalse())
.then(conn.postAllocate())
.thenMany(conn.createStatement("SELECT * FROM test")
.execute())
.flatMap(it -> it.map((row, metadata) -> row.get(0, Integer.class)))
.count()
.doOnNext(it -> assertThat(it).isZero()));
}

@Test
void shouldNotRollbackCommittedPreRelease() {
// Mock pool allocate/release.
complete(conn -> conn.postAllocate()
.thenMany(conn.createStatement("CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY)")
.execute())
.flatMap(MySqlResult::getRowsUpdated)
.then(conn.beginTransaction())
.thenMany(conn.createStatement("INSERT INTO test VALUES (1)")
.execute())
.flatMap(MySqlResult::getRowsUpdated)
.single()
.doOnNext(it -> assertThat(it).isEqualTo(1))
.then(conn.commitTransaction())
.then(conn.preRelease())
.doOnSuccess(ignored -> assertThat(conn.isInTransaction()).isFalse())
.then(conn.postAllocate())
.thenMany(conn.createStatement("SELECT * FROM test")
.execute())
.flatMap(it -> it.map((row, metadata) -> row.get(0, Integer.class)))
.collectList()
.doOnNext(it -> assertThat(it).isEqualTo(Collections.singletonList(1))));
}

@Test
void transactionDefinitionLockWaitTimeout() {
complete(connection -> connection.beginTransaction(MySqlTransactionDefinition.builder()
Expand Down

0 comments on commit d4a9fe0

Please sign in to comment.