Skip to content

Commit

Permalink
Merge pull request #3 from ZSYTY/fix_20221116
Browse files Browse the repository at this point in the history
Merge fix 20221116
  • Loading branch information
wcf2333 authored Nov 16, 2022
2 parents 4bda56c + e80d500 commit 3732052
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 17 deletions.
70 changes: 53 additions & 17 deletions src/main/java/com/alibaba/polardbx/rpc/client/XClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.alibaba.polardbx.rpc.result.XResult;
import com.alibaba.polardbx.rpc.result.XResultUtil;
import com.google.protobuf.ByteString;
import com.mysql.cj.polarx.protobuf.PolarxConnection;
import com.mysql.cj.polarx.protobuf.PolarxNotice;
import com.mysql.cj.polarx.protobuf.PolarxSession;
import com.mysql.cj.x.protobuf.Polarx;
Expand Down Expand Up @@ -113,6 +114,7 @@ public enum ClientState {
private volatile ClientState state = ClientState.Initializing;

private final AtomicInteger probeFailTimes = new AtomicInteger(0);
private final AtomicBoolean probeEvent = new AtomicBoolean(false);

public XClient(NIOWorker nioWorker, XClientPool pool, BiFunction<XClient, XPacket, Boolean> filter,
long connectTimeoutNanos) {
Expand Down Expand Up @@ -157,6 +159,13 @@ public XClient(NIOWorker nioWorker, XClientPool pool, BiFunction<XClient, XPacke

// Dealing with auth.
switch (msg.getType()) {
case Polarx.ServerMessages.Type.CONN_CAPABILITIES_VALUE:
synchronized (probeEvent) {
probeEvent.set(true);
probeEvent.notify();
}
break;

case Polarx.ServerMessages.Type.SESS_AUTHENTICATE_CONTINUE_VALUE:
scramble(sessionId, ((PolarxSession.AuthenticateContinue) msg.getPacket()).getAuthData());
break;
Expand Down Expand Up @@ -749,26 +758,53 @@ public boolean needProb() {
&& System.nanoTime() - lastPacketNanos.get() > XConfig.DEFAULT_PROBE_IDLE_NANOS;
}

private void capabilitiesGet(long sessionId) {
final PolarxConnection.CapabilitiesGet.Builder builder = PolarxConnection.CapabilitiesGet.newBuilder();
final XPacket packet =
new XPacket(sessionId, Polarx.ClientMessages.Type.CON_CAPABILITIES_GET_VALUE, builder.build());
send(packet, true);
}

public boolean probe(AtomicLong sessionIdGenerator, long timeoutNanos) {
try (XConnection connection = newXConnection(sessionIdGenerator,
XConnectionManager.getInstance().isEnableAutoCommitOptimize())) {
connection.init();
connection.setNetworkTimeoutNanos(timeoutNanos);
XResult result = connection.execQuery("/*X probe*/ select 1");
long count = 0;
while (result.next() != null) {
Number res = (Number) XResultUtil.resultToObject(
result.getMetaData().get(0), result.current().getRow().get(0), true,
TimeZone.getDefault()).getKey();
count += res.longValue();
if (XConfig.GALAXY_X_PROTOCOL) {
try (XConnection connection = newXConnection(sessionIdGenerator,
XConnectionManager.getInstance().isEnableAutoCommitOptimize())) {
connection.init();
connection.setNetworkTimeoutNanos(timeoutNanos);
XResult result = connection.execQuery("/*X probe*/ select 1");
long count = 0;
while (result.next() != null) {
Number res = (Number) XResultUtil.resultToObject(
result.getMetaData().get(0), result.current().getRow().get(0), true,
TimeZone.getDefault()).getKey();
count += res.longValue();
}
if (1 == count) {
probeFailTimes.set(0);
return true;
}
return probeFailTimes.addAndGet(1) < XConfig.DEFAULT_PROBE_RETRY_TIMES;
} catch (Throwable t) {
XLog.XLogLogger.error(t); // Just log and ignore.
}
if (1 == count) {
probeFailTimes.set(0);
return true;
} else {
// use capabilities to check TCP
try {
probeEvent.set(false);
capabilitiesGet(0);
synchronized (probeEvent) {
if (!probeEvent.get()) {
probeEvent.wait(timeoutNanos / 1000000L);
}
}
if (probeEvent.get()) {
// good
probeFailTimes.set(0);
return true;
}
} catch (Throwable t) {
XLog.XLogLogger.error(t); // Just log and ignore.
}
return probeFailTimes.addAndGet(1) < XConfig.DEFAULT_PROBE_RETRY_TIMES;
} catch (Throwable t) {
XLog.XLogLogger.error(t); // Just log and ignore.
}
return probeFailTimes.addAndGet(1) < XConfig.DEFAULT_PROBE_RETRY_TIMES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public long executeUpdateX(String sql) throws SQLException {
return connection.execUpdate(sql);
}

public long executeUpdateX(BytesSql sql, byte[] hint) throws SQLException {
return connection.execUpdate(sql, hint, null);
}

/**
* Compatible for JDBC statement.
*/
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/alibaba/polardbx/rpc/net/NIOClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ public void write(XPacket packet, boolean flush) throws IOException {

boolean done = false;
if (lastWrite != null) {
final int position = lastWrite.position();
final int limit = lastWrite.limit();
if (limit + fullSize <= lastWrite.capacity()) {
// Capacity enough and append to tail.
Expand All @@ -576,6 +577,7 @@ public void write(XPacket packet, boolean flush) throws IOException {
msg.writeTo(CodedOutputStream.newInstance(lastWrite));
lastWrite.position(lastWrite.position() + size);
lastWrite.flip();
lastWrite.position(position); // Important to restore the last send state.
done = true;
// Skip to next if not enough.
if (lastWrite.limit() + headerSize > lastWrite.capacity()) {
Expand Down

0 comments on commit 3732052

Please sign in to comment.