diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java index 7b61901a60e..be75c1e5c4e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java @@ -25,6 +25,7 @@ import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.ResultSetStats; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import javax.annotation.Nullable; @@ -35,6 +36,7 @@ class GrpcResultSet extends AbstractResultSet> implements ProtobufR private final DecodeMode decodeMode; private ResultSetMetadata metadata; private GrpcStruct currRow; + private List rowData; private SpannerException error; private ResultSetStats statistics; private boolean closed; @@ -85,7 +87,15 @@ public boolean next() throws SpannerException { throw SpannerExceptionFactory.newSpannerException( ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); } - currRow = new GrpcStruct(iterator.type(), new ArrayList<>(), decodeMode); + if (rowData == null) { + rowData = new ArrayList<>(metadata.getRowType().getFieldsCount()); + if (decodeMode != DecodeMode.DIRECT) { + rowData = Collections.synchronizedList(rowData); + } + } else { + rowData.clear(); + } + currRow = new GrpcStruct(iterator.type(), rowData, decodeMode); } boolean hasNext = currRow.consumeRow(iterator); if (!hasNext) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStruct.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStruct.java index 852b9ed61a3..4d07a12880c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStruct.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStruct.java @@ -49,6 +49,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -60,7 +61,7 @@ class GrpcStruct extends Struct implements Serializable { private final List rowData; private final DecodeMode decodeMode; private final BitSet colDecoded; - private boolean rowDecoded; + private final AtomicBoolean rowDecoded; /** * Builds an immutable version of this struct using {@link Struct#newBuilder()} which is used as a @@ -224,7 +225,7 @@ private GrpcStruct( this.type = type; this.rowData = rowData; this.decodeMode = decodeMode; - this.rowDecoded = rowDecoded; + this.rowDecoded = new AtomicBoolean(rowDecoded); this.colDecoded = colDecoded; } @@ -234,29 +235,31 @@ public String toString() { } boolean consumeRow(Iterator iterator) { - rowData.clear(); - if (decodeMode == DecodeMode.LAZY_PER_ROW) { - rowDecoded = false; - } else if (decodeMode == DecodeMode.LAZY_PER_COL) { - colDecoded.clear(); - } - if (!iterator.hasNext()) { - return false; - } - for (Type.StructField fieldType : getType().getStructFields()) { + synchronized (rowData) { + rowData.clear(); + if (decodeMode == DecodeMode.LAZY_PER_ROW) { + rowDecoded.set(false); + } else if (decodeMode == DecodeMode.LAZY_PER_COL) { + colDecoded.clear(); + } if (!iterator.hasNext()) { - throw newSpannerException( - ErrorCode.INTERNAL, - "Invalid value stream: end of stream reached before row is complete"); + return false; } - com.google.protobuf.Value value = iterator.next(); - if (decodeMode == DecodeMode.DIRECT) { - rowData.add(decodeValue(fieldType.getType(), value)); - } else { - rowData.add(value); + for (Type.StructField fieldType : getType().getStructFields()) { + if (!iterator.hasNext()) { + throw newSpannerException( + ErrorCode.INTERNAL, + "Invalid value stream: end of stream reached before row is complete"); + } + com.google.protobuf.Value value = iterator.next(); + if (decodeMode == DecodeMode.DIRECT) { + rowData.add(decodeValue(fieldType.getType(), value)); + } else { + rowData.add(value); + } } + return true; } - return true; } private static Object decodeValue(Type fieldType, com.google.protobuf.Value proto) { @@ -367,12 +370,16 @@ private static void checkType( } Struct immutableCopy() { - return new GrpcStruct( - type, - new ArrayList<>(rowData), - this.decodeMode, - this.rowDecoded, - this.colDecoded == null ? null : (BitSet) this.colDecoded.clone()); + synchronized (rowData) { + return new GrpcStruct( + type, + this.decodeMode == DecodeMode.DIRECT + ? new ArrayList<>(rowData) + : Collections.synchronizedList(new ArrayList<>(rowData)), + this.decodeMode, + this.rowDecoded.get(), + this.colDecoded == null ? null : (BitSet) this.colDecoded.clone()); + } } @Override @@ -382,9 +389,14 @@ public Type getType() { @Override public boolean isNull(int columnIndex) { - if ((decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded) - || (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex))) { - return ((com.google.protobuf.Value) rowData.get(columnIndex)).hasNullValue(); + if (decodeMode == DecodeMode.LAZY_PER_ROW || decodeMode == DecodeMode.LAZY_PER_COL) { + synchronized (rowData) { + if ((decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded.get()) + || (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex))) { + return ((com.google.protobuf.Value) rowData.get(columnIndex)).hasNullValue(); + } + return rowData.get(columnIndex) == null; + } } return rowData.get(columnIndex) == null; } @@ -496,14 +508,18 @@ private boolean isUnrecognizedType(int columnIndex) { } boolean canGetProtoValue(int columnIndex) { - return isUnrecognizedType(columnIndex) - || (decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded) - || (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex)); + synchronized (rowData) { + return isUnrecognizedType(columnIndex) + || (decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded.get()) + || (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex)); + } } protected com.google.protobuf.Value getProtoValueInternal(int columnIndex) { - checkProtoValueSupported(columnIndex); - return (com.google.protobuf.Value) rowData.get(columnIndex); + synchronized (rowData) { + checkProtoValueSupported(columnIndex); + return (com.google.protobuf.Value) rowData.get(columnIndex); + } } private void checkProtoValueSupported(int columnIndex) { @@ -515,7 +531,7 @@ private void checkProtoValueSupported(int columnIndex) { decodeMode != DecodeMode.DIRECT, "Getting proto value is not supported when DecodeMode#DIRECT is used."); Preconditions.checkState( - !(decodeMode == DecodeMode.LAZY_PER_ROW && rowDecoded), + !(decodeMode == DecodeMode.LAZY_PER_ROW && rowDecoded.get()), "Getting proto value after the row has been decoded is not supported."); Preconditions.checkState( !(decodeMode == DecodeMode.LAZY_PER_COL && colDecoded.get(columnIndex)), @@ -523,22 +539,48 @@ private void checkProtoValueSupported(int columnIndex) { } private void ensureDecoded(int columnIndex) { - if (decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded) { - for (int i = 0; i < rowData.size(); i++) { - rowData.set( - i, - decodeValue( - type.getStructFields().get(i).getType(), - (com.google.protobuf.Value) rowData.get(i))); + if (decodeMode == DecodeMode.LAZY_PER_ROW) { + synchronized (rowData) { + if (!rowDecoded.get()) { + for (int i = 0; i < rowData.size(); i++) { + rowData.set( + i, + decodeValue( + type.getStructFields().get(i).getType(), + (com.google.protobuf.Value) rowData.get(i))); + } + } + rowDecoded.set(true); + } + } else if (decodeMode == DecodeMode.LAZY_PER_COL) { + boolean decoded; + Object value; + synchronized (rowData) { + decoded = colDecoded.get(columnIndex); + value = rowData.get(columnIndex); + } + if (!decoded) { + // Use the column as a lock during decoding to ensure that we decode once (mostly), but also + // that multiple different columns can be decoded in parallel if requested. + synchronized (type.getStructFields().get(columnIndex)) { + // Note: It can be that we decode the value twice if two threads request this at the same + // time, but the synchronization on rowData above and below makes sure that we always get + // and set a consistent value (and only set it once). + if (!colDecoded.get(columnIndex)) { + value = + decodeValue( + type.getStructFields().get(columnIndex).getType(), + (com.google.protobuf.Value) value); + decoded = true; + } + } + if (decoded) { + synchronized (rowData) { + rowData.set(columnIndex, value); + colDecoded.set(columnIndex); + } + } } - rowDecoded = true; - } else if (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex)) { - rowData.set( - columnIndex, - decodeValue( - type.getStructFields().get(columnIndex).getType(), - (com.google.protobuf.Value) rowData.get(columnIndex))); - colDecoded.set(columnIndex); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DecodeModeTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DecodeModeTest.java index 6a6125e1dda..b64a05b2ef4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DecodeModeTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DecodeModeTest.java @@ -27,6 +27,12 @@ import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; @@ -41,7 +47,7 @@ public void clearRequests() { } @Test - public void testAllDecodeModes() { + public void testAllDecodeModes() throws Exception { int numRows = 10; RandomResultSetGenerator generator = new RandomResultSetGenerator(numRows); String sql = "select * from random"; @@ -50,57 +56,85 @@ public void testAllDecodeModes() { MockSpannerServiceImpl.StatementResult.query(statement, generator.generate())); try (Connection connection = createConnection()) { - for (boolean readonly : new boolean[] {true, false}) { - for (boolean autocommit : new boolean[] {true, false}) { - connection.setReadOnly(readonly); - connection.setAutocommit(autocommit); + for (boolean multiThreaded : new boolean[] {true, false}) { + for (boolean readonly : new boolean[] {true, false}) { + for (boolean autocommit : new boolean[] {true, false}) { + connection.setReadOnly(readonly); + connection.setAutocommit(autocommit); - int receivedRows = 0; - // DecodeMode#DIRECT is not supported in read/write transactions, as the protobuf value is - // used for checksum calculation. - try (ResultSet direct = - connection.executeQuery( - statement, - !readonly && !autocommit - ? Options.decodeMode(DecodeMode.LAZY_PER_ROW) - : Options.decodeMode(DecodeMode.DIRECT)); - ResultSet lazyPerRow = - connection.executeQuery(statement, Options.decodeMode(DecodeMode.LAZY_PER_ROW)); - ResultSet lazyPerCol = - connection.executeQuery(statement, Options.decodeMode(DecodeMode.LAZY_PER_COL))) { - while (direct.next() && lazyPerRow.next() && lazyPerCol.next()) { - assertEquals(direct.getColumnCount(), lazyPerRow.getColumnCount()); - assertEquals(direct.getColumnCount(), lazyPerCol.getColumnCount()); - for (int col = 0; col < direct.getColumnCount(); col++) { - // Test getting the entire row as a struct both as the first thing we do, and as the - // last thing we do. This ensures that the method works as expected both when a row - // is lazily decoded by this method, and when it has already been decoded by another - // method. - if (col % 2 == 0) { - assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct()); - assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct()); - } - assertEquals(direct.isNull(col), lazyPerRow.isNull(col)); - assertEquals(direct.isNull(col), lazyPerCol.isNull(col)); - assertEquals(direct.getValue(col), lazyPerRow.getValue(col)); - assertEquals(direct.getValue(col), lazyPerCol.getValue(col)); - if (col % 2 == 1) { - assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct()); - assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct()); + int receivedRows = 0; + // DecodeMode#DIRECT is not supported in read/write transactions, as the protobuf value + // is + // used for checksum calculation. + try (ResultSet direct = + connection.executeQuery( + statement, + !readonly && !autocommit + ? Options.decodeMode(DecodeMode.LAZY_PER_ROW) + : Options.decodeMode(DecodeMode.DIRECT)); + ResultSet lazyPerRow = + connection.executeQuery( + statement, Options.decodeMode(DecodeMode.LAZY_PER_ROW)); + ResultSet lazyPerCol = + connection.executeQuery( + statement, Options.decodeMode(DecodeMode.LAZY_PER_COL))) { + while (direct.next() && lazyPerRow.next() && lazyPerCol.next()) { + assertEquals(direct.getColumnCount(), lazyPerRow.getColumnCount()); + assertEquals(direct.getColumnCount(), lazyPerCol.getColumnCount()); + if (multiThreaded) { + ExecutorService service = Executors.newFixedThreadPool(direct.getColumnCount()); + List> futures = new ArrayList<>(direct.getColumnCount()); + for (int col = 0; col < direct.getColumnCount(); col++) { + final int colNumber = col; + futures.add( + service.submit( + () -> checkRowValues(colNumber, direct, lazyPerRow, lazyPerCol))); + } + service.shutdown(); + for (Future future : futures) { + future.get(); + } + } else { + for (int col = 0; col < direct.getColumnCount(); col++) { + checkRowValues(col, direct, lazyPerRow, lazyPerCol); + } } + receivedRows++; } - receivedRows++; + assertEquals(numRows, receivedRows); + } + if (!autocommit) { + connection.commit(); } - assertEquals(numRows, receivedRows); - } - if (!autocommit) { - connection.commit(); } } } } } + private void checkRowValues( + int col, ResultSet direct, ResultSet lazyPerRow, ResultSet lazyPerCol) { + // Randomly decode and get a column to trigger parallel decoding of one column. + lazyPerCol.getValue(ThreadLocalRandom.current().nextInt(lazyPerCol.getColumnCount())); + + // Test getting the entire row as a struct both as the first thing we do, and as the + // last thing we do. This ensures that the method works as expected both when a row + // is lazily decoded by this method, and when it has already been decoded by another + // method. + if (col % 2 == 0) { + assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct()); + assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct()); + } + assertEquals(direct.isNull(col), lazyPerRow.isNull(col)); + assertEquals(direct.isNull(col), lazyPerCol.isNull(col)); + assertEquals(direct.getValue(col), lazyPerRow.getValue(col)); + assertEquals(direct.getValue(col), lazyPerCol.getValue(col)); + if (col % 2 == 1) { + assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct()); + assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct()); + } + } + @Test public void testDecodeModeDirect_failsInReadWriteTransaction() { int numRows = 1;