Skip to content

Commit

Permalink
5.4.19-20241103_17309482
Browse files Browse the repository at this point in the history
  • Loading branch information
lgjpolardbx committed Nov 7, 2024
1 parent 7a193e0 commit 58a254a
Show file tree
Hide file tree
Showing 8 changed files with 5,127 additions and 4,737 deletions.
31 changes: 31 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,36 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.verison}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.version}</version>
<executions>
<execution>
<goals>
<goal>prepare-agent</goal>
<goal>report</goal>
</goals>
</execution>
</executions>
<configuration>
<excludes>
<exclude>com/mysql/cj/polarx/protobuf/*.*</exclude>
<exclude>com/mysql/cj/x/protobuf/*.*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
74 changes: 55 additions & 19 deletions src/main/java/com/alibaba/polardbx/rpc/client/XSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.polardbx.rpc.client;

import com.alibaba.polardbx.common.constants.IsolationLevel;
import com.alibaba.polardbx.common.constants.ServerVariables;
import com.alibaba.polardbx.common.exception.TddlRuntimeException;
import com.alibaba.polardbx.common.exception.code.ErrorCode;
Expand Down Expand Up @@ -133,6 +134,7 @@ public enum Status {
private boolean lazyMarkDistributed = false;
private long lazySnapshotSeq = -1L;
private long lazyCommitSeq = -1L;
private boolean lazyFlashbackArea = false;

// Cache control.
private boolean noCache = false;
Expand Down Expand Up @@ -498,6 +500,10 @@ private void traceCtsInfo() {
historySql.add("msg_mark_distributed=ON");
sessionSql.add("msg_mark_distributed=ON");
}
if (lazyFlashbackArea) {
historySql.add("msg_flashback_area");
sessionSql.add("msg_flashback_area");
}
}

public static String toJavaEncoding(String encoding) {
Expand Down Expand Up @@ -594,27 +600,12 @@ public synchronized int getIsolation() {
if (null == isolationString) {
isolation = Connection.TRANSACTION_READ_COMMITTED;
} else {
switch (isolationString.toUpperCase()) {
case "READ-UNCOMMITTED":
isolation = Connection.TRANSACTION_READ_UNCOMMITTED;
break;

case "READ-COMMITTED":
isolation = Connection.TRANSACTION_READ_COMMITTED;
break;

case "REPEATABLE-READ":
isolation = Connection.TRANSACTION_REPEATABLE_READ;
break;

case "SERIALIZABLE":
isolation = Connection.TRANSACTION_SERIALIZABLE;
break;

default:
final IsolationLevel level = IsolationLevel.parse(isolationString);
if (null == level) {
throw new TddlRuntimeException(ErrorCode.ERR_X_PROTOCOL_CLIENT,
this + " unknown isolation level: " + isolationString);
}
isolation = level.getCode();
}
}
}
Expand Down Expand Up @@ -1016,6 +1007,7 @@ public synchronized boolean reset() {
lazyCommitSeq = -1L;
lazyMarkDistributed = false;
lazyUseCtsTransaction = false;
lazyFlashbackArea = false;
noCache = false;
forceCache = false;
chunkResult = false;
Expand Down Expand Up @@ -1230,6 +1222,10 @@ public void setLazyMarkDistributed() {
this.lazyMarkDistributed = true;
}

public void setLazyFlashbackArea() {
this.lazyFlashbackArea = true;
}

public void setLazySnapshotSeq(long lazySnapshotSeq) {
this.lazySnapshotSeq = lazySnapshotSeq;
}
Expand All @@ -1242,6 +1238,7 @@ public void setLazyCommitSeq(long lazyCommitSeq) {
private boolean isStashed = false;
private boolean stashUseCtsTransaction = false;
private boolean stashMarkDistributed = false;
private boolean stashFlashbackArea = false;
private long stashSnapshotSeq = -1L;
private long stashCommitSeq = -1L;

Expand All @@ -1252,10 +1249,12 @@ public boolean stashTransactionSequence() {
isStashed = true;
stashUseCtsTransaction = lazyUseCtsTransaction;
stashMarkDistributed = lazyMarkDistributed;
stashFlashbackArea = lazyFlashbackArea;
stashSnapshotSeq = lazySnapshotSeq;
stashCommitSeq = lazyCommitSeq;
lazyUseCtsTransaction = false;
lazyMarkDistributed = false;
lazyFlashbackArea = false;
lazySnapshotSeq = -1;
lazyCommitSeq = -1;
return true;
Expand All @@ -1265,6 +1264,7 @@ public void stashPopTransactionSequence() {
isStashed = false;
lazyUseCtsTransaction = stashUseCtsTransaction;
lazyMarkDistributed = stashMarkDistributed;
lazyFlashbackArea = stashFlashbackArea;
lazySnapshotSeq = stashSnapshotSeq;
lazyCommitSeq = stashCommitSeq;
}
Expand Down Expand Up @@ -1428,6 +1428,15 @@ public synchronized XResult execQuery(XConnection connection, PolarxExecPlan.Exe
}
lazyMarkDistributed = false;
}
if (lazyFlashbackArea) {
execPlan.setQueryViaFlashbackArea(true);
if (null == extra) {
extra = "flashback_area;";
} else {
extra += "flashback_area;";
}
lazyFlashbackArea = false;
}
if (lazySnapshotSeq != -1) {
execPlan.setSnapshotSeq(lazySnapshotSeq);
if (null == extra) {
Expand Down Expand Up @@ -1937,7 +1946,7 @@ public synchronized XResult execUpdate(XConnection connection, BytesSql sql, byt
return result;
}

private String setLazyTrxVariables(PolarxSql.StmtExecute.Builder builder, String extra) {
protected String setLazyTrxVariables(PolarxSql.StmtExecute.Builder builder, String extra) {
if (lazyUseCtsTransaction) {
builder.setUseCtsTransaction(true);
if (null == extra) {
Expand All @@ -1956,6 +1965,15 @@ private String setLazyTrxVariables(PolarxSql.StmtExecute.Builder builder, String
}
lazyMarkDistributed = false;
}
if (lazyFlashbackArea) {
builder.setQueryViaFlashbackArea(true);
if (null == extra) {
extra = "flashback_area;";
} else {
extra += "flashback_area;";
}
lazyFlashbackArea = false;
}
if (lazySnapshotSeq != -1) {
builder.setSnapshotSeq(lazySnapshotSeq);
if (null == extra) {
Expand Down Expand Up @@ -2165,6 +2183,8 @@ public void refereshConnetionId(XConnection connection) throws SQLException {
private Boolean flagXRPC = null;
private Boolean flagMarkDistributed = null;

private Boolean flagFlashbackArea = null;

public boolean supportMessageTimestamp() {
// disabled in XConnectionManager when galaxy protocol
if (!XConnectionManager.getInstance().isEnableMessageTimestamp()) {
Expand Down Expand Up @@ -2756,6 +2776,22 @@ public boolean supportMarkDistributed() {
return flagMarkDistributed = false;
}

public boolean supportFlashbackArea() {
if (flagFlashbackArea != null) {
return flagFlashbackArea;
}

if (!client.isActive()) {
return false; // Initializing.
}

final Object markFlashbackArea = client.getSessionVariablesL().get("opt_flashback_area");
if (null != markFlashbackArea) {
return flagFlashbackArea = true;
}
return flagFlashbackArea = false;
}

/**
* Overrides.
*/
Expand Down
22 changes: 21 additions & 1 deletion src/main/java/com/alibaba/polardbx/rpc/pool/XConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import com.alibaba.polardbx.rpc.compatible.XStatement;
import com.alibaba.polardbx.rpc.result.XResult;
import com.google.protobuf.ByteString;
import com.mysql.cj.polarx.protobuf.PolarxPhysicalBackfill;
import com.mysql.cj.polarx.protobuf.PolarxNotice;
import com.mysql.cj.polarx.protobuf.PolarxPhysicalBackfill;
import com.mysql.cj.x.protobuf.PolarxDatatypes;
import com.mysql.cj.x.protobuf.PolarxExecPlan;

Expand Down Expand Up @@ -286,6 +286,16 @@ public void setLazyMarkDistributed() throws SQLException {
session.setLazyMarkDistributed();
}

public void setLazyFlashbackArea() throws SQLException {
sessionLock.readLock().lock();
try {
check();
session.setLazyFlashbackArea();
} finally {
sessionLock.readLock().unlock();
}
}

public void setLazySnapshotSeq(long lazySnapshotSeq) throws SQLException {
check();
session.setLazySnapshotSeq(lazySnapshotSeq);
Expand Down Expand Up @@ -552,6 +562,16 @@ public boolean supportMarkDistributed() throws SQLException {
}
}

public boolean supportFlashbackArea() throws SQLException {
sessionLock.readLock().lock();
try {
check();
return session.supportFlashbackArea();
} finally {
sessionLock.readLock().unlock();
}
}

public long getConnectionId() throws SQLException {
sessionLock.readLock().lock();
try {
Expand Down
49 changes: 44 additions & 5 deletions src/main/java/com/alibaba/polardbx/rpc/result/XResultUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.alibaba.polardbx.common.exception.TddlRuntimeException;
import com.alibaba.polardbx.common.exception.code.ErrorCode;
import com.alibaba.polardbx.common.utils.BigDecimalUtil;
import com.alibaba.polardbx.common.utils.LongUtil;
import com.alibaba.polardbx.common.utils.Pair;
import com.alibaba.polardbx.common.utils.hash.ByteUtil;
import com.alibaba.polardbx.common.utils.time.MySQLTimeConverter;
Expand All @@ -40,7 +41,6 @@
import com.alibaba.polardbx.common.utils.time.core.TimeStorage;
import com.alibaba.polardbx.common.utils.time.parser.TimeParseStatus;
import com.alibaba.polardbx.rpc.jdbc.CharsetMapping;
import com.alibaba.polardbx.common.utils.LongUtil;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.mysql.cj.polarx.protobuf.PolarxResultset;
Expand Down Expand Up @@ -93,6 +93,27 @@ public class XResultUtil {

public static long ZERO_TIMESTAMP_LONG_VAL = 0;

public static final int DECIMAL_MAX_SCALE = 30;
public static final int DECIMAL_NOT_SPECIFIED = DECIMAL_MAX_SCALE + 1;

private static String real2string(float f) {
// Should convert float 1.0 as '1' in order to be compatible with MySQL
// Reference: https://stackoverflow.com/a/14126736
if (f == (long) f) {
return Long.toString((long) f);
} else {
return Float.toString(f);
}
}

private static String real2string(double d) {
if (d == (long) d) {
return Long.toString((long) d);
} else {
return Double.toString(d);
}
}

public static Pair<Object, byte[]> resultToObject(PolarxResultset.ColumnMetaData meta, ByteString data,
boolean legacy, TimeZone tz) throws Exception {
final byte[] rawBytes = data.toByteArray();
Expand Down Expand Up @@ -177,12 +198,20 @@ public static Pair<Object, byte[]> resultToObject(PolarxResultset.ColumnMetaData

case DOUBLE:
obj = stream.readDouble();
bytes = obj.toString().getBytes();
if (DECIMAL_NOT_SPECIFIED == meta.getFractionalDigits()) {
bytes = real2string((double) obj).getBytes();
} else {
bytes = String.format(String.format("%%.%df", meta.getFractionalDigits()), obj).getBytes();
}
break;

case FLOAT:
obj = stream.readFloat();
bytes = obj.toString().getBytes();
if (DECIMAL_NOT_SPECIFIED == meta.getFractionalDigits()) {
bytes = real2string((float) obj).getBytes();
} else {
bytes = String.format(String.format("%%.%df", meta.getFractionalDigits()), obj).getBytes();
}
break;

case BYTES:
Expand Down Expand Up @@ -490,10 +519,20 @@ public static byte[] resultToBytes(PolarxResultset.ColumnMetaData meta, ByteStri
}

case DOUBLE:
return Double.toString(stream.readDouble()).getBytes();
if (DECIMAL_NOT_SPECIFIED == meta.getFractionalDigits()) {
return real2string(stream.readDouble()).getBytes();
} else {
return String.format(String.format("%%.%df", meta.getFractionalDigits()), stream.readDouble())
.getBytes();
}

case FLOAT:
return Float.toString(stream.readFloat()).getBytes();
if (DECIMAL_NOT_SPECIFIED == meta.getFractionalDigits()) {
return real2string(stream.readFloat()).getBytes();
} else {
return String.format(String.format("%%.%df", meta.getFractionalDigits()), stream.readFloat())
.getBytes();
}

case BYTES:
switch (meta.getContentType()) {
Expand Down
Loading

0 comments on commit 58a254a

Please sign in to comment.