diff --git a/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java index 4f22c9aeb89e..45d6b6608502 100644 --- a/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java +++ b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.nio.channels.FileChannel; import java.nio.channels.SeekableByteChannel; import java.nio.file.AccessMode; import java.nio.file.AtomicMoveNotSupportedException; @@ -300,6 +301,48 @@ public SeekableByteChannel newByteChannel( } } + /** + * Open a file for reading OR writing. The {@link FileChannel} that is returned will only allow + * reads or writes depending on the {@link OpenOption}s that are specified. If any of the + * following have been specified, the {@link FileChannel} will be write-only: {@link + * StandardOpenOption#CREATE} + * + * + * + * In all other cases the {@link FileChannel} will be read-only. + * + * @param path The path to the file to open or create + * @param options The options specifying how the file should be opened, and whether the {@link + * FileChannel} should be read-only or write-only. + * @param attrs (not supported, the values will be ignored) + * @throws IOException + */ + @Override + public FileChannel newFileChannel( + Path path, Set options, FileAttribute... attrs) throws IOException { + checkNotNull(path); + initStorage(); + CloudStorageUtil.checkNotNullArray(attrs); + if (options.contains(StandardOpenOption.CREATE_NEW)) { + Files.createFile(path, attrs); + } else if (options.contains(StandardOpenOption.CREATE) && !Files.exists(path)) { + Files.createFile(path, attrs); + } + if (options.contains(StandardOpenOption.WRITE) + || options.contains(StandardOpenOption.CREATE) + || options.contains(StandardOpenOption.CREATE_NEW) + || options.contains(StandardOpenOption.TRUNCATE_EXISTING)) { + return new CloudStorageWriteFileChannel(newWriteChannel(path, options)); + } else { + return new CloudStorageReadFileChannel(newReadChannel(path, options)); + } + } + private SeekableByteChannel newReadChannel(Path path, Set options) throws IOException { initStorage(); diff --git a/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageReadFileChannel.java b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageReadFileChannel.java new file mode 100644 index 000000000000..db89365b939d --- /dev/null +++ b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageReadFileChannel.java @@ -0,0 +1,154 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.storage.contrib.nio; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; + +class CloudStorageReadFileChannel extends FileChannel { + private static final String READ_ONLY = "This FileChannel is read-only"; + private final SeekableByteChannel readChannel; + + CloudStorageReadFileChannel(SeekableByteChannel readChannel) { + Preconditions.checkNotNull(readChannel); + this.readChannel = readChannel; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return readChannel.read(dst); + } + + @Override + public synchronized long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + long res = 0L; + for (int i = offset; i < offset + length; i++) { + res += readChannel.read(dsts[i]); + } + return res; + } + + @Override + public int write(ByteBuffer src) throws IOException { + throw new UnsupportedOperationException(READ_ONLY); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + throw new UnsupportedOperationException(READ_ONLY); + } + + @Override + public long position() throws IOException { + return readChannel.position(); + } + + @Override + public FileChannel position(long newPosition) throws IOException { + readChannel.position(newPosition); + return this; + } + + @Override + public long size() throws IOException { + return readChannel.size(); + } + + @Override + public FileChannel truncate(long size) throws IOException { + throw new UnsupportedOperationException(READ_ONLY); + } + + @Override + public void force(boolean metaData) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized long transferTo( + long transferFromPosition, long count, WritableByteChannel target) throws IOException { + long res = 0L; + long originalPosition = position(); + try { + position(transferFromPosition); + int blockSize = (int) Math.min(count, 0xfffffL); + int bytesRead = 0; + ByteBuffer buffer = ByteBuffer.allocate(blockSize); + while (res < count && bytesRead >= 0) { + buffer.position(0); + bytesRead = read(buffer); + if (bytesRead > 0) { + buffer.position(0); + buffer.limit(bytesRead); + target.write(buffer); + res += bytesRead; + } + } + return res; + } finally { + position(originalPosition); + } + } + + @Override + public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException { + throw new UnsupportedOperationException(READ_ONLY); + } + + @Override + public synchronized int read(ByteBuffer dst, long readFromPosition) throws IOException { + long originalPosition = position(); + try { + position(readFromPosition); + int res = readChannel.read(dst); + return res; + } finally { + position(originalPosition); + } + } + + @Override + public int write(ByteBuffer src, long position) throws IOException { + throw new UnsupportedOperationException(READ_ONLY); + } + + @Override + public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FileLock lock(long position, long size, boolean shared) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FileLock tryLock(long position, long size, boolean shared) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void implCloseChannel() throws IOException { + readChannel.close(); + } +} diff --git a/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageWriteFileChannel.java b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageWriteFileChannel.java new file mode 100644 index 000000000000..3e930f7cd2a8 --- /dev/null +++ b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageWriteFileChannel.java @@ -0,0 +1,180 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.storage.contrib.nio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; + +class CloudStorageWriteFileChannel extends FileChannel { + private static final String WRITE_ONLY = "This FileChannel is write-only"; + private SeekableByteChannel writeChannel; + private boolean valid = true; + + CloudStorageWriteFileChannel(SeekableByteChannel writeChannel) { + this.writeChannel = writeChannel; + } + + private void checkValid() throws IOException { + if (!valid) { + // These methods are only supported to be called once, because the underlying channel does not + // support changing the position. + throw new IOException( + "This FileChannel is no longer valid. " + + "A Cloud Storage FileChannel is invalidated after calling one of " + + "the methods FileChannel#write(ByteBuffer, long) or " + + "FileChannel#transferFrom(ReadableByteChannel, long, long)"); + } + if (!writeChannel.isOpen()) { + throw new IOException("This FileChannel is closed"); + } + } + + @Override + public int read(ByteBuffer dst) throws IOException { + throw new UnsupportedOperationException(WRITE_ONLY); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + throw new UnsupportedOperationException(WRITE_ONLY); + } + + @Override + public synchronized int write(ByteBuffer src) throws IOException { + checkValid(); + return writeChannel.write(src); + } + + @Override + public synchronized long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + checkValid(); + long res = 0L; + for (int i = offset; i < offset + length; i++) { + res += writeChannel.write(srcs[i]); + } + return res; + } + + @Override + public synchronized long position() throws IOException { + checkValid(); + return writeChannel.position(); + } + + @Override + public synchronized FileChannel position(long newPosition) throws IOException { + if (newPosition != position()) { + writeChannel.position(newPosition); + } + return this; + } + + @Override + public synchronized long size() throws IOException { + checkValid(); + return writeChannel.size(); + } + + @Override + public synchronized FileChannel truncate(long size) throws IOException { + checkValid(); + writeChannel.truncate(size); + return this; + } + + @Override + public void force(boolean metaData) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long transferTo(long position, long count, WritableByteChannel target) throws IOException { + throw new UnsupportedOperationException(WRITE_ONLY); + } + + @Override + public synchronized long transferFrom(ReadableByteChannel src, long position, long count) + throws IOException { + if (position != position()) { + throw new UnsupportedOperationException( + "This FileChannel only supports transferFrom at the current position"); + } + int blockSize = (int) Math.min(count, 0xfffffL); + long res = 0L; + int bytesRead = 0; + ByteBuffer buffer = ByteBuffer.allocate(blockSize); + while (res < count && bytesRead >= 0) { + buffer.position(0); + bytesRead = src.read(buffer); + if (bytesRead > 0) { + buffer.position(0); + buffer.limit(bytesRead); + write(buffer); + res += bytesRead; + } + } + // The channel is no longer valid as the position has been updated, and there is no way of + // resetting it, but this way we at least support the write-at-position and transferFrom + // methods being called once. + this.valid = false; + return res; + } + + @Override + public int read(ByteBuffer dst, long position) throws IOException { + throw new UnsupportedOperationException(WRITE_ONLY); + } + + @Override + public synchronized int write(ByteBuffer src, long position) throws IOException { + if (position != position()) { + throw new UnsupportedOperationException( + "This FileChannel only supports write at the current position"); + } + int res = writeChannel.write(src); + // The channel is no longer valid as the position has been updated, and there is no way of + // resetting it, but this way we at least support the write-at-position and transferFrom + // methods being called once. + this.valid = false; + return res; + } + + @Override + public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FileLock lock(long position, long size, boolean shared) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FileLock tryLock(long position, long size, boolean shared) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void implCloseChannel() throws IOException { + writeChannel.close(); + } +} diff --git a/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/CloudStorageReadFileChannelTest.java b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/CloudStorageReadFileChannelTest.java new file mode 100644 index 000000000000..c3474a3a1c06 --- /dev/null +++ b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/CloudStorageReadFileChannelTest.java @@ -0,0 +1,188 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.storage.contrib.nio; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CloudStorageReadFileChannelTest { + private static final class SeekableByteChannelImpl implements SeekableByteChannel { + private boolean open = true; + private ByteBuffer data; + + private SeekableByteChannelImpl(ByteBuffer data) { + this.data = data; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + open = false; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + byte[] tmp = new byte[Math.min(dst.remaining(), data.remaining())]; + if (tmp.length == 0) { + return -1; + } else { + data.get(tmp); + dst.put(tmp); + return tmp.length; + } + } + + @Override + public int write(ByteBuffer src) throws IOException { + int res = src.remaining(); + if (data.position() + res > data.limit()) { + data.limit(data.limit() + res); + } + data.put(src); + return res; + } + + @Override + public long position() throws IOException { + return data.position(); + } + + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + if (newPosition >= data.limit()) { + data.limit((int) newPosition); + } + data.position((int) newPosition); + return this; + } + + @Override + public long size() throws IOException { + return data.limit(); + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + if (size < data.limit()) { + if (data.position() >= size) { + data.position((int) size - 1); + } + data.limit((int) size); + } + return this; + } + } + + @Rule public final ExpectedException thrown = ExpectedException.none(); + + private CloudStorageReadFileChannel fileChannel; + private SeekableByteChannel readChannel; + private ByteBuffer data; + + @Before + public void before() throws IOException { + data = ByteBuffer.allocate(5000); + data.limit(3); + data.put(new byte[] {1, 2, 3}); + data.position(0); + readChannel = new SeekableByteChannelImpl(data); + fileChannel = new CloudStorageReadFileChannel(readChannel); + } + + @Test + public void testRead() throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(1); + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(fileChannel.read(buffer), is(equalTo(1))); + assertThat(fileChannel.position(), is(equalTo(1L))); + assertThat(buffer.get(0), is(equalTo((byte) 1))); + } + + @Test + public void testReadArray() throws IOException { + ByteBuffer[] buffers = + new ByteBuffer[] {ByteBuffer.allocate(1), ByteBuffer.allocate(1), ByteBuffer.allocate(1)}; + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(fileChannel.read(buffers), is(equalTo(3L))); + assertThat(fileChannel.position(), is(equalTo(3L))); + assertThat(buffers[0].get(0), is(equalTo((byte) 1))); + assertThat(buffers[1].get(0), is(equalTo((byte) 2))); + assertThat(buffers[2].get(0), is(equalTo((byte) 3))); + } + + @Test + public void testPosition() throws IOException { + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(fileChannel.position(1L), is(equalTo((FileChannel) fileChannel))); + assertThat(fileChannel.position(), is(equalTo(1L))); + assertThat(fileChannel.position(0L), is(equalTo((FileChannel) fileChannel))); + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(fileChannel.position(100L), is(equalTo((FileChannel) fileChannel))); + assertThat(fileChannel.position(), is(equalTo(100L))); + } + + @Test + public void testSize() throws IOException { + assertThat(fileChannel.size(), is(equalTo(3L))); + } + + @Test + public void testTransferTo() throws IOException { + SeekableByteChannelImpl target = new SeekableByteChannelImpl(ByteBuffer.allocate(100)); + assertThat(fileChannel.transferTo(0L, 3L, target), is(equalTo(3L))); + assertThat(target.position(), is(equalTo(3L))); + ByteBuffer dst = ByteBuffer.allocate(3); + target.position(0L); + target.read(dst); + assertThat(dst.get(0), is(equalTo((byte) 1))); + assertThat(dst.get(1), is(equalTo((byte) 2))); + assertThat(dst.get(2), is(equalTo((byte) 3))); + } + + @Test + public void testReadOnPosition() throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(1); + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(fileChannel.read(buffer, 1L), is(equalTo(1))); + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(buffer.get(0), is(equalTo((byte) 2))); + } + + @Test + public void testReadBeyondEnd() throws IOException { + fileChannel.position(3L); + assertThat(fileChannel.read(ByteBuffer.allocate(1)), is(equalTo(-1))); + fileChannel.position(2L); + assertThat(fileChannel.read(ByteBuffer.allocate(2)), is(equalTo(1))); + } +} diff --git a/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/CloudStorageWriteFileChannelTest.java b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/CloudStorageWriteFileChannelTest.java new file mode 100644 index 000000000000..136ec54e0e99 --- /dev/null +++ b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/CloudStorageWriteFileChannelTest.java @@ -0,0 +1,203 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.storage.contrib.nio; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CloudStorageWriteFileChannelTest { + private static final class SeekableByteChannelImpl implements SeekableByteChannel { + private boolean open = true; + private ByteBuffer data; + + private SeekableByteChannelImpl(ByteBuffer data) { + this.data = data; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + open = false; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + byte[] tmp = new byte[Math.min(dst.remaining(), data.remaining())]; + if (tmp.length == 0) { + return -1; + } else { + data.get(tmp); + dst.put(tmp); + return tmp.length; + } + } + + @Override + public int write(ByteBuffer src) throws IOException { + int res = src.remaining(); + if (data.position() + res > data.limit()) { + data.limit(data.limit() + res); + } + data.put(src); + return res; + } + + @Override + public long position() throws IOException { + return data.position(); + } + + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + if (newPosition >= data.limit()) { + data.limit((int) newPosition); + } + data.position((int) newPosition); + return this; + } + + @Override + public long size() throws IOException { + return data.limit(); + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + if (size < data.limit()) { + if (data.position() >= size) { + data.position((int) size - 1); + } + data.limit((int) size); + } + return this; + } + } + + @Rule public final ExpectedException thrown = ExpectedException.none(); + + private CloudStorageWriteFileChannel fileChannel; + private SeekableByteChannel writeChannel; + private ByteBuffer data; + + @Before + public void before() throws IOException { + data = ByteBuffer.allocate(5000); + data.limit(3); + data.put(new byte[] {1, 2, 3}); + data.position(0); + writeChannel = new SeekableByteChannelImpl(data); + fileChannel = new CloudStorageWriteFileChannel(writeChannel); + } + + @Test + public void testWrite() throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(1); + buffer.put((byte) 100).position(0); + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(fileChannel.write(buffer), is(equalTo(1))); + assertThat(fileChannel.position(), is(equalTo(1L))); + assertThat(data.get(0), is(equalTo((byte) 100))); + } + + @Test + public void testWriteArray() throws IOException { + ByteBuffer[] buffers = + new ByteBuffer[] {ByteBuffer.allocate(1), ByteBuffer.allocate(1), ByteBuffer.allocate(1)}; + buffers[0].put((byte) 10).position(0); + buffers[1].put((byte) 20).position(0); + buffers[2].put((byte) 30).position(0); + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(fileChannel.write(buffers), is(equalTo(3L))); + assertThat(fileChannel.position(), is(equalTo(3L))); + + assertThat(data.get(0), is(equalTo((byte) 10))); + assertThat(data.get(1), is(equalTo((byte) 20))); + assertThat(data.get(2), is(equalTo((byte) 30))); + } + + @Test + public void testPosition() throws IOException { + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(fileChannel.position(1L), is(equalTo((FileChannel) fileChannel))); + assertThat(fileChannel.position(), is(equalTo(1L))); + assertThat(fileChannel.position(0L), is(equalTo((FileChannel) fileChannel))); + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(fileChannel.position(100L), is(equalTo((FileChannel) fileChannel))); + assertThat(fileChannel.position(), is(equalTo(100L))); + } + + @Test + public void testSizeAndTruncate() throws IOException { + assertThat(fileChannel.size(), is(equalTo(3L))); + fileChannel.truncate(1L); + assertThat(fileChannel.size(), is(equalTo(1L))); + fileChannel.truncate(10L); + assertThat(fileChannel.size(), is(equalTo(1L))); + assertThat(fileChannel.position(), is(equalTo(0L))); + } + + @Test + public void testTransferFrom() throws IOException { + SeekableByteChannelImpl src = new SeekableByteChannelImpl(ByteBuffer.allocate(100)); + src.write(ByteBuffer.wrap(new byte[] {10, 20, 30})); + src.position(0L); + fileChannel.position(0L); + assertThat(fileChannel.transferFrom(src, 0L, 3L), is(equalTo(3L))); + assertThat(src.position(), is(equalTo(3L))); + + assertThat(data.get(0), is(equalTo((byte) 10))); + assertThat(data.get(1), is(equalTo((byte) 20))); + assertThat(data.get(2), is(equalTo((byte) 30))); + } + + @Test + public void testWriteOnPosition() throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(1); + buffer.put((byte) 100).position(0); + assertThat(fileChannel.position(), is(equalTo(0L))); + assertThat(fileChannel.write(buffer, 0), is(equalTo(1))); + assertThat(data.get(0), is(equalTo((byte) 100))); + } + + @Test + public void testWriteBeyondEnd() throws IOException { + fileChannel.position(3L); + ByteBuffer src = ByteBuffer.wrap(new byte[] {10, 20, 30}); + assertThat(fileChannel.write(src), is(equalTo(3))); + assertThat(fileChannel.position(), is(equalTo(6L))); + fileChannel.position(3L); + assertThat(data.get(3), is(equalTo((byte) 10))); + assertThat(data.get(4), is(equalTo((byte) 20))); + assertThat(data.get(5), is(equalTo((byte) 30))); + } +} diff --git a/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java index e85febfa31c8..6f3293eb1cbd 100644 --- a/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java +++ b/google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java @@ -31,15 +31,18 @@ import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.contrib.nio.CloudStorageConfiguration; import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem; +import com.google.cloud.storage.contrib.nio.CloudStorageFileSystemProvider; import com.google.cloud.storage.contrib.nio.CloudStoragePath; import com.google.cloud.storage.testing.RemoteStorageHelper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.file.FileSystem; @@ -733,6 +736,240 @@ public void testFakeDirectories() throws IOException { } } + @Test + public void testFileChannelRead() throws IOException { + CloudStorageFileSystem testBucket = getTestBucket(); + Path path = testBucket.getPath(SML_FILE); + CloudStorageFileSystemProvider provider = new CloudStorageFileSystemProvider(); + FileChannel chan = provider.newFileChannel(path, Sets.newHashSet(StandardOpenOption.READ)); + long size = Files.size(path); + assertThat(chan.size()).isEqualTo(size); + ByteBuffer buf = ByteBuffer.allocate(SML_SIZE); + int read = 0; + while (chan.isOpen()) { + int rc = chan.read(buf); + assertThat(chan.size()).isEqualTo(size); + if (rc < 0) { + // EOF + break; + } + assertThat(rc).isGreaterThan(0); + read += rc; + assertThat(chan.position()).isEqualTo(read); + } + chan.close(); + assertThat(read).isEqualTo(size); + byte[] expected = new byte[SML_SIZE]; + new Random(SML_SIZE).nextBytes(expected); + assertThat(Arrays.equals(buf.array(), expected)).isTrue(); + } + + @Test + public void testFileChannelCreate() throws IOException { + CloudStorageFileSystem testBucket = getTestBucket(); + Path path = testBucket.getPath(PREFIX + randomSuffix()); + assertThat(Files.exists(path)).isFalse(); + CloudStorageFileSystemProvider provider = new CloudStorageFileSystemProvider(); + try { + FileChannel channel = + provider.newFileChannel(path, Sets.newHashSet(StandardOpenOption.CREATE)); + channel.close(); + assertThat(Files.exists(path)).isTrue(); + long size = Files.size(path); + assertThat(size).isEqualTo(0); + } finally { + Files.deleteIfExists(path); + } + } + + @Test + public void testFileChannelWrite() throws IOException { + CloudStorageFileSystem testBucket = getTestBucket(); + Path path = testBucket.getPath(PREFIX + randomSuffix()); + assertThat(Files.exists(path)).isFalse(); + CloudStorageFileSystemProvider provider = new CloudStorageFileSystemProvider(); + try { + FileChannel channel = + provider.newFileChannel( + path, Sets.newHashSet(StandardOpenOption.CREATE, StandardOpenOption.WRITE)); + ByteBuffer src = ByteBuffer.allocate(5000); + int written = 0; + for (String s : FILE_CONTENTS) { + byte[] bytes = (s + "\n").getBytes(UTF_8); + written += bytes.length; + src.put(bytes); + } + src.limit(written); + src.position(0); + channel.write(src); + channel.close(); + assertThat(Files.exists(path)).isTrue(); + + ByteArrayOutputStream wantBytes = new ByteArrayOutputStream(); + PrintWriter writer = new PrintWriter(new OutputStreamWriter(wantBytes, UTF_8)); + for (String content : FILE_CONTENTS) { + writer.println(content); + } + writer.close(); + SeekableByteChannel chan = Files.newByteChannel(path, StandardOpenOption.READ); + byte[] gotBytes = new byte[(int) chan.size()]; + readFully(chan, gotBytes); + assertThat(gotBytes).isEqualTo(wantBytes.toByteArray()); + } finally { + Files.deleteIfExists(path); + } + } + + @Test + public void testFileChannelWriteOnClose() throws IOException { + CloudStorageFileSystem testBucket = getTestBucket(); + Path path = testBucket.getPath(PREFIX + randomSuffix()); + assertThat(Files.exists(path)).isFalse(); + CloudStorageFileSystemProvider provider = new CloudStorageFileSystemProvider(); + try { + long expectedSize = 0; + try (FileChannel chan = + provider.newFileChannel(path, Sets.newHashSet(StandardOpenOption.WRITE))) { + for (String s : FILE_CONTENTS) { + byte[] sBytes = s.getBytes(UTF_8); + expectedSize += sBytes.length * 9999; + for (int i = 0; i < 9999; i++) { + chan.write(ByteBuffer.wrap(sBytes)); + } + } + try { + Files.size(path); + // we shouldn't make it to this line. Not using thrown.expect because + // I still want to run a few lines after the exception. + Assert.fail("Files.size should have thrown an exception"); + } catch (NoSuchFileException nsf) { + // that's what we wanted, we're good. + } + } + // channel now closed, the file should be there and with the new contents. + assertThat(Files.exists(path)).isTrue(); + assertThat(Files.size(path)).isEqualTo(expectedSize); + } finally { + Files.deleteIfExists(path); + } + } + + @Test + public void testFileChannelSeek() throws IOException { + CloudStorageFileSystem testBucket = getTestBucket(); + Path path = testBucket.getPath(BIG_FILE); + CloudStorageFileSystemProvider provider = new CloudStorageFileSystemProvider(); + int size = BIG_SIZE; + byte[] contents = randomContents(size); + byte[] sample = new byte[100]; + byte[] wanted; + byte[] wanted2; + FileChannel chan = provider.newFileChannel(path, Sets.newHashSet(StandardOpenOption.READ)); + assertThat(chan.size()).isEqualTo(size); + + // check seek + int dest = size / 2; + chan.position(dest); + readFully(chan, sample); + wanted = Arrays.copyOfRange(contents, dest, dest + 100); + assertThat(wanted).isEqualTo(sample); + // now go back and check the beginning + // (we do 2 locations because 0 is sometimes a special case). + chan.position(0); + readFully(chan, sample); + wanted2 = Arrays.copyOf(contents, 100); + assertThat(wanted2).isEqualTo(sample); + // if the two spots in the file have the same contents, then this isn't a good file for this + // test. + assertThat(wanted).isNotEqualTo(wanted2); + } + + @Test + public void testFileChannelTransferFrom() throws IOException { + CloudStorageFileSystem testBucket = getTestBucket(); + Path path = testBucket.getPath(SML_FILE); + Path destPath = testBucket.getPath(PREFIX + randomSuffix()); + assertThat(Files.exists(destPath)).isFalse(); + CloudStorageFileSystemProvider provider = new CloudStorageFileSystemProvider(); + try { + try (FileChannel source = + provider.newFileChannel(path, Sets.newHashSet(StandardOpenOption.READ)); + FileChannel dest = + provider.newFileChannel( + destPath, Sets.newHashSet(StandardOpenOption.CREATE, StandardOpenOption.WRITE))) { + dest.transferFrom(source, 0L, SML_SIZE); + } + + FileChannel source = + provider.newFileChannel(destPath, Sets.newHashSet(StandardOpenOption.READ)); + long size = Files.size(destPath); + assertThat(source.size()).isEqualTo(size); + ByteBuffer buf = ByteBuffer.allocate(SML_SIZE); + int read = 0; + while (source.isOpen()) { + int rc = source.read(buf); + assertThat(source.size()).isEqualTo(size); + if (rc < 0) { + // EOF + break; + } + assertThat(rc).isGreaterThan(0); + read += rc; + assertThat(source.position()).isEqualTo(read); + } + source.close(); + assertThat(read).isEqualTo(size); + byte[] expected = new byte[SML_SIZE]; + new Random(SML_SIZE).nextBytes(expected); + assertThat(Arrays.equals(buf.array(), expected)).isTrue(); + } finally { + Files.deleteIfExists(destPath); + } + } + + @Test + public void testFileChannelTransferTo() throws IOException { + CloudStorageFileSystem testBucket = getTestBucket(); + Path path = testBucket.getPath(SML_FILE); + Path destPath = testBucket.getPath(PREFIX + randomSuffix()); + assertThat(Files.exists(destPath)).isFalse(); + CloudStorageFileSystemProvider provider = new CloudStorageFileSystemProvider(); + try { + try (FileChannel source = + provider.newFileChannel(path, Sets.newHashSet(StandardOpenOption.READ)); + FileChannel dest = + provider.newFileChannel( + destPath, Sets.newHashSet(StandardOpenOption.CREATE, StandardOpenOption.WRITE))) { + source.transferTo(0L, SML_SIZE, dest); + } + + FileChannel source = + provider.newFileChannel(destPath, Sets.newHashSet(StandardOpenOption.READ)); + long size = Files.size(destPath); + assertThat(source.size()).isEqualTo(size); + ByteBuffer buf = ByteBuffer.allocate(SML_SIZE); + int read = 0; + while (source.isOpen()) { + int rc = source.read(buf); + assertThat(source.size()).isEqualTo(size); + if (rc < 0) { + // EOF + break; + } + assertThat(rc).isGreaterThan(0); + read += rc; + assertThat(source.position()).isEqualTo(read); + } + source.close(); + assertThat(read).isEqualTo(size); + byte[] expected = new byte[SML_SIZE]; + new Random(SML_SIZE).nextBytes(expected); + assertThat(Arrays.equals(buf.array(), expected)).isTrue(); + } finally { + Files.deleteIfExists(destPath); + } + } + /** * Delete the given directory and all of its contents if non-empty. *