diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java index 7b2953efe49a..68fb8eb8a4b7 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java @@ -168,6 +168,7 @@ private static class LocalExchangeStorageReader private static final int INSTANCE_SIZE = ClassLayout.parseClass(LocalExchangeStorageReader.class).instanceSize(); private final Queue sourceFiles; + @GuardedBy("this") private InputStreamSliceInput sliceInput; @GuardedBy("this") @@ -185,18 +186,19 @@ public synchronized Slice read() if (closed) { return null; } + if (sliceInput != null && sliceInput.isReadable()) { return sliceInput.readSlice(sliceInput.readInt()); } + ExchangeSourceFile sourceFile = sourceFiles.poll(); - if (sourceFile != null) { - sliceInput = getSliceInput(sourceFile); - return sliceInput.readSlice(sliceInput.readInt()); - } - else { + if (sourceFile == null) { close(); + return null; } - return null; + + sliceInput = getSliceInput(sourceFile); + return sliceInput.readSlice(sliceInput.readInt()); } @Override