Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve AutoContinuingInputStream failure case #429

Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This project aims to adhere to [Semantic Versioning](http://semver.org/).
## [3.2.3-SNAPSHOT] - Coming soon!
### Fixed
- [UnsupportedOperationException when getting a 0 byte file using `MantaClient.getAsInputStream`](https://github.com/joyent/java-manta/issues/408)
- [AutoContinuingInputStream fails to handle fatal exceptions correctly and triggers a self-suppression error.](https://github.com/joyent/java-manta/pull/429)

### Added
- Client metrics can now be enabled by selecting a reporting mode with the
Expand All @@ -16,6 +17,9 @@ This project aims to adhere to [Semantic Versioning](http://semver.org/).
- [Timers](http://metrics.dropwizard.io/4.0.0/manual/core.html#timer) per HTTP request method.
- [Meters](http://metrics.dropwizard.io/4.0.0/manual/core.html#meter) per exception that occurs during requests.
- [`MantaClient#delete(String, MantaHttpHeaders)`](https://github.com/joyent/java-manta/issues/427)
- [Download auto-resume](https://github.com/joyent/java-manta/issues/411) has been added in the form of
[`manta.download_continuations`/`MANTA_DOWNLOAD_CONTINUATIONS` configuration
setting](https://github.com/joyent/java-manta/blob/master/USAGE.md#download-continuation).

### Changed
- MBeans registered in JMX no longer use an incrementing integer and instead are created under
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,26 @@ public MantaClient(final ConfigContext config,
this(config, connectionFactoryConfigurator, null, null);
}

/**
* Creates a new instance of the Manta client based on user-provided connection objects. This allows for a higher
* degree of customization at the cost of more involvement from the consumer.
*
* Users opting into advanced configuration (i.e. not passing {@code null} as the second parameter)
* should be comfortable with the internals of {@link CloseableHttpClient} and accept that we can only make a
* best effort to support all possible use-cases. For example, users may pass in a builder which is wired to a
* {@link org.apache.http.impl.conn.BasicHttpClientConnectionManager} and effectively make the client
* single-threaded by eliminating the connection pool. Bug or feature? You decide!
*
* @param config The configuration context that provides all of the configuration values
* @param connectionFactoryConfigurator pre-configured objects for use with a MantaConnectionFactory (or null)
* @param metricConfiguration the metrics registry and configuration, or null to prepare one from the general config
*/
public MantaClient(final ConfigContext config,
final MantaConnectionFactoryConfigurator connectionFactoryConfigurator,
final MantaClientMetricConfiguration metricConfiguration) {
this(config, connectionFactoryConfigurator, null, metricConfiguration);
}

/**
* Creates a new instance of the Manta client based on user-provided connection objects. This allows for a higher
* degree of customization at the cost of more involvement from the consumer.
Expand All @@ -222,6 +242,7 @@ public MantaClient(final ConfigContext config,
* @param config The configuration context that provides all of the configuration values
* @param connectionFactoryConfigurator pre-configured objects for use with a MantaConnectionFactory (or null)
* @param httpHelper helper object for executing http requests (or null to build one ourselves)
* @param metricConfiguration the metrics registry and configuration, or null to prepare one from the general config
*/
MantaClient(final ConfigContext config,
final MantaConnectionFactoryConfigurator connectionFactoryConfigurator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public class ApacheHttpGetResponseEntityContentContinuator implements InputStrea
*/
@Override
public InputStream buildContinuation(final IOException ex, final long bytesRead) throws IOException {
requireNonNull(ex);

if (!isRecoverable(ex)) {
throw ex;
}
Expand All @@ -232,10 +234,15 @@ public InputStream buildContinuation(final IOException ex, final long bytesRead)
ex);
}

LOG.debug("Attempting to build a continuation for request {} to recover at byte offset {} from exception {}",
this.request.getRequestLine(),
LOG.debug("Attempting to build a continuation for "
+ "[{}] request "
+ "to path [{}] "
+ "to recover at byte offset {} "
+ "from exception {}",
this.request.getMethod(),
this.request.getRequestLine().getUri(),
bytesRead,
ex);
ex.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility of ex being null?


// if an IOException occurs while reading EOF the user may ask us for a continuation
// starting after the last valid byte.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
package com.joyent.manta.util;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;

import static java.util.Objects.requireNonNull;

Expand All @@ -26,6 +29,8 @@
*/
public class AutoContinuingInputStream extends ContinuingInputStream {

private static final Logger LOG = LoggerFactory.getLogger(AutoContinuingInputStream.class);

/**
* Produces continuations of the original stream given new byte offsets.
*/
Expand Down Expand Up @@ -54,9 +59,16 @@ public AutoContinuingInputStream(final InputStream wrapped,
*/
private void attemptRecovery(final IOException originalIOException) throws IOException {
try {
this.continueWith(this.continuator.buildContinuation(originalIOException, this.getBytesRead()));
} catch (final IOException ce) {
originalIOException.addSuppressed(ce);
super.continueWith(this.continuator.buildContinuation(originalIOException, this.getBytesRead()));
} catch (final UncheckedIOException | IOException ioe) {
LOG.debug("Failed to automatically recover: {}", ioe.getMessage());

// if a different exception was thrown while recovering, add it as a suppressed exception
if (originalIOException != ioe) {
originalIOException.addSuppressed(ioe);
}

// rethrow the original exception
throw originalIOException;
}
}
Expand Down Expand Up @@ -135,6 +147,12 @@ public boolean markSupported() {
public void close() throws IOException {
IOUtils.closeQuietly(this.continuator);

this.getWrapped().close();
final InputStream wrapped = this.getWrapped();

if (wrapped == null) {
return;
}

wrapped.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,13 @@ public void continueWith(final InputStream next) {
throw new IllegalStateException("Stream is closed, refusing to set new source");
}

// QUESTION: is the following useful? it signals that the caller has probably done something silly
// if (this.wrapped == next) {
// throw new IllegalArgumentException("Current and next stream are the same");
// }
if (this.wrapped == next) {
throw new IllegalArgumentException("Current and next stream are the same");
}

if (this.wrapped != null) {
this.closeAndDiscardWrapped();
}

this.wrapped = next;
}
Expand All @@ -118,20 +121,15 @@ public int read() throws IOException {
return EOF;
}

try {
final int b = this.wrapped.read();
final int b = this.wrapped.read();

if (b != EOF) {
this.bytesRead += b;
} else {
this.eofSeen = true;
}

return b;
} catch (final IOException ioe) {
discardWrapped();
throw ioe;
if (b != EOF) {
this.bytesRead += b;
} else {
this.eofSeen = true;
}

return b;
}

@Override
Expand All @@ -142,20 +140,15 @@ public int read(final byte[] b) throws IOException {
return EOF;
}

try {
final int n = this.wrapped.read(b);

if (n != EOF) {
this.bytesRead += n;
} else {
this.eofSeen = true;
}
final int n = this.wrapped.read(b);

return n;
} catch (final IOException ioe) {
discardWrapped();
throw ioe;
if (n != EOF) {
this.bytesRead += n;
} else {
this.eofSeen = true;
}

return n;
}

@Override
Expand All @@ -166,50 +159,35 @@ public int read(final byte[] b, final int off, final int len) throws IOException
return EOF;
}

try {
final int n = this.wrapped.read(b, off, len);

if (n != EOF) {
this.bytesRead += n;
} else {
this.eofSeen = true;
}
final int n = this.wrapped.read(b, off, len);

return n;
} catch (final IOException ioe) {
discardWrapped();
throw ioe;
if (n != EOF) {
this.bytesRead += n;
} else {
this.eofSeen = true;
}

return n;
}

@Override
public long skip(final long n) throws IOException {
ensureReady();

try {
final long s = this.wrapped.skip(n);
final long s = this.wrapped.skip(n);

if (0 < s) {
this.bytesRead += s;
}

return s;
} catch (final IOException ioe) {
discardWrapped();
throw ioe;
if (0 < s) {
this.bytesRead += s;
}

return s;
}

@Override
public int available() throws IOException {
ensureReady();

try {
return this.wrapped.available();
} catch (final IOException ioe) {
discardWrapped();
throw ioe;
}
return this.wrapped.available();
}

@Override
Expand Down Expand Up @@ -252,7 +230,7 @@ private void ensureReady() {
}

@SuppressWarnings("checkstyle:JavadocMethod")
private void discardWrapped() {
private void closeAndDiscardWrapped() {
IOUtils.closeQuietly(this.wrapped);
this.wrapped = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.joyent.manta.util;

import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.ClosedInputStream;
import org.apache.commons.io.input.ProxyInputStream;
import org.testng.annotations.Test;

import java.io.IOException;
import java.io.InputStream;

import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.expectThrows;

@Test
public class AutoContinuingInputStreamTest {

/**
* We can't use {@link org.apache.commons.io.input.BrokenInputStream} for this test because it will return the same
* exception during close and trigger the
* <a href="https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html#suppressed-exceptions">self-suppression
* issue</a>.
* <p>
* On the other hand, we can't use {@link FailingInputStream} because that generates its own exceptions, so we
* wouldn't be able to use {@link org.testng.Assert#assertSame} and check that no suppressed exceptions were added.
*/
private static final class ReadExceptionInputStream extends ProxyInputStream {

private final IOException exception;

public ReadExceptionInputStream(final IOException exception) {
super(ClosedInputStream.CLOSED_INPUT_STREAM);
this.exception = exception;
}

@Override
protected void beforeRead(final int n) throws IOException {
throw this.exception;
}
}

public void rethrowsUnrecoverableExceptionsDirectly() throws Exception {
// the exception to consider fatal
final IOException ex = new IOException("oops");

// source stream always throws that exception
final InputStream original = new ReadExceptionInputStream(ex);

// pretend that it was a fatal exception and should be rethrown
final InputStreamContinuator continuator = mock(InputStreamContinuator.class);
when(continuator.buildContinuation(same(ex), anyLong())).thenThrow(ex);

final IOException caught = expectThrows(IOException.class, () -> {
try (final AutoContinuingInputStream in = new AutoContinuingInputStream(original, continuator)) {
IOUtils.toByteArray(in);
}
});

assertSame(caught, ex);
assertEquals(caught.getSuppressed().length, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,6 @@ public void testAvailableCanAlsoThrow() throws IOException {
final ContinuingInputStream cis = new ContinuingInputStream(new BrokenInputStream());

assertThrows(IOException.class, () -> cis.available());

assertThrows(IllegalStateException.class, () -> cis.read());
}

public void testSkipCompletelySingleOperation() throws IOException {
Expand Down Expand Up @@ -433,8 +431,6 @@ public void testFailureFromSkipThenRead() throws IOException {
assertThrows(IOException.class, () -> cis.skip(1));
assertEquals(cis.getBytesRead(), 0);

assertThrows(IllegalStateException.class, () -> cis.skip(1));

cis.continueWith(new ByteArrayInputStream(STUB_OBJECT_BYTES));

assertArrayEquals(STUB_OBJECT_BYTES, IOUtils.toByteArray(cis));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.joyent.manta.client;
package com.joyent.manta.http;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.joyent.manta.client.MantaClient;
import com.joyent.manta.client.crypto.AesCtrCipherDetails;
import com.joyent.manta.client.crypto.SecretKeyUtils;
import com.joyent.manta.client.crypto.SupportedCipherDetails;
Expand All @@ -13,8 +14,6 @@
import com.joyent.manta.config.IntegrationTestConfigContext;
import com.joyent.manta.config.MantaClientMetricConfiguration;
import com.joyent.manta.config.MetricReporterMode;
import com.joyent.manta.http.HttpRange;
import com.joyent.manta.http.MantaHttpHeaders;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -392,7 +391,6 @@ private MantaClient prepareClient(final SupportedCipherDetails cipherDetails,
}

final MantaClient mantaClient = new MantaClient(config,
null,
null,
metricConfig);

Expand Down