Skip to content

Commit

Permalink
fix issue with long failover time (#242)
Browse files Browse the repository at this point in the history
* fix issue with long failover time
* minor improvements for exception logging
* fix issue with checking test preconditions
  • Loading branch information
sergiyvamz committed Jul 7, 2022
1 parent f4aea65 commit 9593383
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 28 deletions.
4 changes: 3 additions & 1 deletion src/main/core-api/java/com/mysql/cj/log/StandardLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ public void logWarn(Object message, Throwable exception) {
protected String logInternal(int level, Object msg, Throwable exception) {
StringBuilder msgBuf = new StringBuilder();
msgBuf.append(new Date().toString());
msgBuf.append(" ");
msgBuf.append(" - [");
msgBuf.append(Thread.currentThread().getId());
msgBuf.append("] - ");

switch (level) {
case FATAL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,8 @@ ClusterAwareWriterFailoverHandler.11=[ClusterAwareWriterFailoverHandler] [TaskB]
ClusterAwareWriterFailoverHandler.12=[ClusterAwareWriterFailoverHandler] [TaskB] Failed to connect to any reader.
ClusterAwareWriterFailoverHandler.13=[ClusterAwareWriterFailoverHandler] [TaskB] Topology obtained: {0}
ClusterAwareWriterFailoverHandler.14=[ClusterAwareWriterFailoverHandler] [TaskB] Trying to connect to a new writer ''{0}''
ClusterAwareWriterFailoverHandler.15=[ClusterAwareWriterFailoverHandler] [TaskB] encountered an exception: {0}
ClusterAwareWriterFailoverHandler.15=[ClusterAwareWriterFailoverHandler] [TaskB] encountered an exception:
ClusterAwareWriterFailoverHandler.16=[ClusterAwareWriterFailoverHandler] [TaskA] encountered an exception:

ClusterAwareReaderFailoverHandler.1=Thread was interrupted.
ClusterAwareReaderFailoverHandler.2=[ClusterAwareReaderFailoverHandler] Connected to reader [{0,number,#}]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ synchronized void abortConnection() {
} catch (SQLException sqlEx) {
// ignore
this.log.logTrace(String.format(
"Exception during aborting connection: %s",
"[MonitorConnectionContext] Exception during aborting connection: %s",
sqlEx.getMessage()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.mysql.cj.Messages;
import com.mysql.cj.conf.HostInfo;
import com.mysql.cj.exceptions.CJCommunicationsException;
import com.mysql.cj.jdbc.JdbcConnection;
import com.mysql.cj.jdbc.ha.ConnectionUtils;
import com.mysql.cj.jdbc.ha.plugins.IConnectionProvider;
Expand Down Expand Up @@ -248,33 +249,53 @@ public WriterFailoverResult call() {
Messages.getString(
"ClusterAwareWriterFailoverHandler.6",
new Object[] {this.originalWriterHost.getHostPortPair()}));

JdbcConnection conn = null;
List<HostInfo> latestTopology = null;
boolean success = false;

try {
while (true) {
while (latestTopology == null || Util.isNullOrEmpty(latestTopology)) {
try {
JdbcConnection conn = connectionProvider.connect(this.originalWriterHost);

List<HostInfo> latestTopology = topologyService.getTopology(conn, true);
if (!Util.isNullOrEmpty(latestTopology)
&& isCurrentHostWriter(latestTopology)) {
topologyService.removeFromDownHostList(this.originalWriterHost);
return new WriterFailoverResult(true, false, latestTopology, conn, "TaskA");
if (conn != null && !conn.isClosed()) {
conn.close();
}

conn = connectionProvider.connect(this.originalWriterHost);
latestTopology = topologyService.getTopology(conn, true);

} catch (CJCommunicationsException exception) {
// do nothing
} catch (SQLException exception) {
// Propagate exceptions that are not caused by network errors.
if (!ConnectionUtils.isNetworkException(exception)) {
log.logTrace(Messages.getString("ClusterAwareWriterFailoverHandler.16"), exception);
return new WriterFailoverResult(false, false, null, null, "TaskA", exception);
}
}

TimeUnit.MILLISECONDS.sleep(reconnectWriterIntervalMs);
}

success = isCurrentHostWriter(latestTopology);
topologyService.removeFromDownHostList(this.originalWriterHost);
return new WriterFailoverResult(success, false, latestTopology, success ? conn : null, "TaskA");

} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
return new WriterFailoverResult(false, false, null, null, "TaskA");
return new WriterFailoverResult(success, false, latestTopology, success ? conn : null, "TaskA");
} catch (Exception ex) {
log.logError(ex);
throw ex;
return new WriterFailoverResult(false, false, null, null, "TaskA");
} finally {
try {
if (conn != null && !success && !conn.isClosed()) {
conn.close();
}
} catch (Exception ex) {
// ignore
}
log.logTrace(Messages.getString("ClusterAwareWriterFailoverHandler.8"));
}
}
Expand Down Expand Up @@ -334,12 +355,11 @@ public WriterFailoverResult call() {
Thread.currentThread().interrupt();
return new WriterFailoverResult(false, false, null, null, "TaskB");
} catch (Exception ex) {
log.logError(Messages.getString(
"ClusterAwareWriterFailoverHandler.15",
new Object[] {ex.getMessage()}));
throw ex;
log.logTrace(Messages.getString("ClusterAwareWriterFailoverHandler.15"), ex);
return new WriterFailoverResult(false, false, null, null, "TaskB");
} finally {
performFinalCleanup();
log.logTrace(Messages.getString("ClusterAwareWriterFailoverHandler.10"));
}
}

Expand Down Expand Up @@ -400,8 +420,9 @@ private boolean refreshTopologyAndConnectToNewWriter() throws InterruptedExcepti
}
}
}
} catch (SQLException e) {
// ignore
} catch (CJCommunicationsException | SQLException ex) {
log.logTrace(Messages.getString("ClusterAwareWriterFailoverHandler.15"), ex);
return false;
}

TimeUnit.MILLISECONDS.sleep(readTopologyIntervalMs);
Expand Down Expand Up @@ -473,7 +494,6 @@ private void performFinalCleanup() {
// ignore
}
}
log.logTrace(Messages.getString("ClusterAwareWriterFailoverHandler.10"));
}

private void logTopology() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,10 @@ protected void initializeTopology() throws SQLException {
validateConnection();

if (this.currentHostIndex != NO_CONNECTION_INDEX
&& this.currentHostIndex != WRITER_CONNECTION_INDEX
&& !Util.isNullOrEmpty(this.hosts)) {
HostInfo currentHost = this.hosts.get(this.currentHostIndex);
if (isExplicitlyReadOnly()) {
topologyService.setLastUsedReaderHost(currentHost);
}
topologyService.setLastUsedReaderHost(currentHost);
}

final JdbcConnection currentConnection = this.currentConnectionProvider.getCurrentConnection();
Expand Down Expand Up @@ -539,7 +538,9 @@ protected void failoverReader(int failedHostIdx) throws SQLException {
result.getConnectionIndex());
updateTopologyAndConnectIfNeeded(true);

if (this.currentHostIndex != NO_CONNECTION_INDEX && !Util.isNullOrEmpty(this.hosts)) {
if (this.currentHostIndex != NO_CONNECTION_INDEX
&& this.currentHostIndex != WRITER_CONNECTION_INDEX
&& !Util.isNullOrEmpty(this.hosts)) {
HostInfo currentHost = this.hosts.get(this.currentHostIndex);
topologyService.setLastUsedReaderHost(currentHost);
this.logger.logDebug(
Expand Down Expand Up @@ -625,7 +626,8 @@ protected synchronized void pickNewConnection() throws SQLException {

try {
connectTo(WRITER_CONNECTION_INDEX);
if (isExplicitlyReadOnly() && this.currentHostIndex != NO_CONNECTION_INDEX) {
if (this.currentHostIndex != NO_CONNECTION_INDEX
&& this.currentHostIndex != WRITER_CONNECTION_INDEX) {
topologyService.setLastUsedReaderHost(this.hosts.get(this.currentHostIndex));
}
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -385,7 +385,8 @@ protected void makeSureInstancesUp(String[] instances) throws InterruptedExcepti

protected void makeSureInstancesUp(String[] instances, boolean finalCheck) throws InterruptedException {
final ExecutorService executorService = Executors.newFixedThreadPool(instances.length);
final HashSet<String> remainingInstances = new HashSet<>(Arrays.asList(instances));
final ConcurrentHashMap<String, Boolean> remainingInstances = new ConcurrentHashMap<String, Boolean>();
Arrays.asList(instances).forEach((k) -> remainingInstances.put(k, true));

for (final String id : instances) {
executorService.submit(() -> {
Expand All @@ -396,8 +397,11 @@ protected void makeSureInstancesUp(String[] instances, boolean finalCheck) throw
break;
} catch (final SQLException ex) {
// Continue waiting until instance is up.
} catch (final Exception ex) {
System.out.println("Exception: " + ex);
break;
}
TimeUnit.MILLISECONDS.sleep(500);
TimeUnit.MILLISECONDS.sleep(1000);
}
return null;
});
Expand All @@ -407,7 +411,7 @@ protected void makeSureInstancesUp(String[] instances, boolean finalCheck) throw

if (finalCheck) {
assertTrue(remainingInstances.isEmpty(),
"The following instances are still down: \n" + String.join("\n", remainingInstances));
"The following instances are still down: \n" + String.join("\n", remainingInstances.keySet()));
}

}
Expand Down

0 comments on commit 9593383

Please sign in to comment.