From f8723a6c499d74b3cae2ff788ddfd2d33480bfe6 Mon Sep 17 00:00:00 2001 From: zymap Date: Thu, 16 May 2024 10:48:05 +0800 Subject: [PATCH 1/2] [fix][offload] Break the fillbuffer loop when met EOF --- ### Motivation When hit an EOF, the write bytes will return -1 then it would make the bytesToCoppy bigger than the buffer to fill. Then we will hit the java.lang.IndexOutOfBoundsException. Break the look when hit the EOF, fill the buffer next time. --- .../impl/BlobStoreBackedInputStreamImpl.java | 18 +++++-- .../impl/BlobStoreBackedInputStreamTest.java | 51 +++++++++++++++++++ 2 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java index 6cb60e14984f9..6ebbe5bce582a 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java @@ -107,9 +107,7 @@ private boolean refillBufferIfNeeded() throws IOException { bufferOffsetEnd = endRange; long bytesRead = endRange - startRange + 1; int bytesToCopy = (int) bytesRead; - while (bytesToCopy > 0) { - bytesToCopy -= buffer.writeBytes(stream, bytesToCopy); - } + fillBuffer(stream, bytesToCopy); cursor += buffer.readableBytes(); } @@ -135,6 +133,20 @@ private boolean refillBufferIfNeeded() throws IOException { return true; } + void fillBuffer(InputStream is, int bytesToCopy) throws IOException { + while (bytesToCopy > 0) { + int writeBytes = buffer.writeBytes(is, bytesToCopy); + if (writeBytes < 0) { + break; + } + bytesToCopy -= writeBytes; + } + } + + ByteBuf getBuffer() { + return buffer; + } + @Override public int read() throws IOException { if (refillBufferIfNeeded()) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java new file mode 100644 index 0000000000000..0e0d7584a34a6 --- /dev/null +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload.jcloud.impl; + +import static org.testng.Assert.assertEquals; + +import java.io.IOException; +import java.io.InputStream; +import org.apache.bookkeeper.mledger.offload.jcloud.BlobStoreTestBase; +import org.testng.annotations.Test; + +public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { + + @Test + public void testFillBuffer() throws Exception { + BlobStoreBackedInputStreamImpl bis = new BlobStoreBackedInputStreamImpl( + blobStore, BUCKET, "testFillBuffer", (k, md) -> { + }, 2048, 512); + + InputStream is = new InputStream() { + int count = 10; + + @Override + public int read() throws IOException { + if (count-- > 0) { + return 1; + } else { + return -1; + } + } + }; + bis.fillBuffer(is, 20); + assertEquals(bis.getBuffer().readableBytes(), 10); + } +} From 8f63259cfd9607f600640617d2fb1d54d6766bd2 Mon Sep 17 00:00:00 2001 From: zymap Date: Thu, 16 May 2024 11:25:01 +0800 Subject: [PATCH 2/2] Fix the license header --- .../jcloud/impl/BlobStoreBackedInputStreamTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java index 0e0d7584a34a6..951180e4e18c8 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY