Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read response body timeout #169

Merged
merged 2 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@
*/
public abstract class AbstractRssReader<C extends Channel, I extends Item> {
private static final String LOG_GROUP = "com.apptasticsoftware.rssreader";
private static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(1);
private final HttpClient httpClient;
private DateTimeParser dateTimeParser = new DateTime();
private String userAgent = "";
private Duration connectionTimeout = Duration.ofSeconds(25);
private Duration requestTimeout = Duration.ofSeconds(25);
private Duration readTimeout = Duration.ofSeconds(25);
private final Map<String, String> headers = new HashMap<>();
private final HashMap<String, BiConsumer<C, String>> channelTags = new HashMap<>();
private final HashMap<String, Map<String, BiConsumer<C, String>>> channelAttributes = new HashMap<>();
Expand Down Expand Up @@ -240,7 +244,7 @@ protected void registerItemAttributes() {
}

/**
* Date and Time parser for parsing timestamps.
* Date and time parser for parsing timestamps.
* @param dateTimeParser the date time parser to use.
* @return updated RSSReader.
*/
Expand All @@ -252,8 +256,8 @@ public AbstractRssReader<C, I> setDateTimeParser(DateTimeParser dateTimeParser)
}

/**
* Sets the user-agent of the HttpClient.
* This is completely optional and if not set then it will not send a user-agent header.
* Sets the user-agent of the http client.
* Optional parameter if not set the default value for {@code java.net.http.HttpClient} will be used.
* @param userAgent the user-agent to use.
* @return updated RSSReader.
*/
Expand All @@ -265,8 +269,7 @@ public AbstractRssReader<C, I> setUserAgent(String userAgent) {
}

/**
* Adds a http header to the HttpClient.
* This is completely optional and if no headers are set then it will not add anything.
* Adds a http header to the http client.
* @param key the key name of the header.
* @param value the value of the header.
* @return updated RSSReader.
Expand All @@ -279,6 +282,58 @@ public AbstractRssReader<C, I> addHeader(String key, String value) {
return this;
}

/**
* Sets the connection timeout for the http client.
* The connection timeout is the time it takes to establish a connection to the server.
* If set to zero the default value for {@link java.net.http.HttpClient.Builder#connectTimeout(Duration)} will be used.
* Default: 25 seconds.
*
* @param connectionTimeout the timeout duration.
* @return updated RSSReader.
*/
public AbstractRssReader<C, I> setConnectionTimeout(Duration connectionTimeout) {
validate(connectionTimeout, "Connection timeout");
this.connectionTimeout = connectionTimeout;
return this;
}

/**
* Sets the request timeout for the http client.
* The request timeout is the time between the request is sent and the first byte of the response is received.
* If set to zero the default value for {@link java.net.http.HttpRequest.Builder#timeout(Duration)} will be used.
* Default: 25 seconds.
*
* @param requestTimeout the timeout duration.
* @return updated RSSReader.
*/
public AbstractRssReader<C, I> setRequestTimeout(Duration requestTimeout) {
validate(requestTimeout, "Request timeout");
this.requestTimeout = requestTimeout;
return this;
}

/**
* Sets the read timeout.
* The read timeout it the time for reading all data in the response body.
* The effect of setting the timeout to zero is the same as setting an infinite Duration, ie. block forever.
* Default: 25 seconds.
*
* @param readTimeout the timeout duration.
* @return updated RSSReader.
*/
public AbstractRssReader<C, I> setReadTimeout(Duration readTimeout) {
validate(readTimeout, "Read timeout");
this.readTimeout = readTimeout;
return this;
}

private void validate(Duration duration, String name) {
Objects.requireNonNull(duration, name + " must not be null");
if (duration.isNegative()) {
throw new IllegalArgumentException(name + " must not be negative");
}
}

/**
* Add item extension for tags
* @param tag - tag name
Expand Down Expand Up @@ -450,8 +505,10 @@ public CompletableFuture<Stream<I>> readAsync(String url) {
*/
protected CompletableFuture<HttpResponse<InputStream>> sendAsyncRequest(String url) {
var builder = HttpRequest.newBuilder(URI.create(url))
.timeout(Duration.ofSeconds(25))
.header("Accept-Encoding", "gzip");
if (requestTimeout.toMillis() > 0) {
builder.timeout(requestTimeout);
}

if (!userAgent.isBlank())
builder.header("User-Agent", userAgent);
Expand Down Expand Up @@ -510,6 +567,7 @@ class RssItemIterator implements Iterator<I> {
private I nextItem;
private boolean isChannelPart = false;
private boolean isItemPart = false;
private ScheduledFuture<?> parseWatchdog;

public RssItemIterator(InputStream is) {
this.is = is;
Expand All @@ -528,6 +586,9 @@ public RssItemIterator(InputStream is) {
xmlInFact.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);

reader = xmlInFact.createXMLStreamReader(is);
if (!readTimeout.isZero()) {
parseWatchdog = EXECUTOR.schedule(this::close, readTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
}
catch (XMLStreamException e) {
var logger = Logger.getLogger(LOG_GROUP);
Expand All @@ -539,6 +600,9 @@ public RssItemIterator(InputStream is) {

public void close() {
try {
if (parseWatchdog != null) {
parseWatchdog.cancel(false);
}
reader.close();
is.close();
} catch (XMLStreamException | IOException e) {
Expand Down Expand Up @@ -783,16 +847,20 @@ private HttpClient createHttpClient() {
var context = SSLContext.getInstance("TLSv1.3");
context.init(null, null, null);

client = HttpClient.newBuilder()
var builder = HttpClient.newBuilder()
.sslContext(context)
.connectTimeout(Duration.ofSeconds(25))
.followRedirects(HttpClient.Redirect.ALWAYS)
.build();
.followRedirects(HttpClient.Redirect.ALWAYS);
if (connectionTimeout.toMillis() > 0) {
builder.connectTimeout(connectionTimeout);
}
client = builder.build();
} catch (NoSuchAlgorithmException | KeyManagementException e) {
client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(25))
.followRedirects(HttpClient.Redirect.ALWAYS)
.build();
var builder = HttpClient.newBuilder()
.followRedirects(HttpClient.Redirect.ALWAYS);
if (connectionTimeout.toMillis() > 0) {
builder.connectTimeout(connectionTimeout);
}
client = builder.build();
}

return client;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package com.apptasticsoftware.integrationtest;

import com.apptasticsoftware.rssreader.Item;
import com.apptasticsoftware.rssreader.RssReader;
import com.apptasticsoftware.rssreader.util.RssServer;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.*;

class ConnectionTest {
private static final int PORT = 8008;
private static final Duration NEGATIVE_DURATION = Duration.ofSeconds(-30);

@Test
void testConnectionTimeoutWithNullValue() {
var rssReader = new RssReader();
var exception = assertThrows(NullPointerException.class, () -> rssReader.setConnectionTimeout(null));
assertEquals("Connection timeout must not be null", exception.getMessage());
}

@Test
void testRequestTimeoutWithNullValue() {
var rssReader = new RssReader();
var exception = assertThrows(NullPointerException.class, () -> rssReader.setRequestTimeout(null));
assertEquals("Request timeout must not be null", exception.getMessage());
}

@Test
void testReadTimeoutWithNullValue() {
var rssReader = new RssReader();
var exception = assertThrows(NullPointerException.class, () -> rssReader.setReadTimeout(null));
assertEquals("Read timeout must not be null", exception.getMessage());
}

@Test
void testConnectionTimeoutWithNegativeValue() {
var rssReader = new RssReader();
var exception = assertThrows(IllegalArgumentException.class, () -> rssReader.setConnectionTimeout(NEGATIVE_DURATION));
assertEquals("Connection timeout must not be negative", exception.getMessage());
}

@Test
void testRequestTimeoutWithNegativeValue() {
var rssReader = new RssReader();
var exception = assertThrows(IllegalArgumentException.class, () -> rssReader.setRequestTimeout(NEGATIVE_DURATION));
assertEquals("Request timeout must not be negative", exception.getMessage());
}

@Test
void testReadTimeoutWithNegativeValue() {
var rssReader = new RssReader();
var exception = assertThrows(IllegalArgumentException.class, () -> rssReader.setReadTimeout(NEGATIVE_DURATION));
assertEquals("Read timeout must not be negative", exception.getMessage());
}

@Test
void testReadFromLocalRssServerNoTimeout() throws IOException {
var server = RssServer.with(getFile("atom-feed.xml"))
.port(PORT)
.endpointPath("/rss")
.build();
server.start();

var items = new RssReader()
.setConnectionTimeout(Duration.ZERO)
.setRequestTimeout(Duration.ZERO)
.setReadTimeout(Duration.ZERO)
.read("http://localhost:8008/rss")
.collect(Collectors.toList());

server.stop();
verify(3, items);
}

@Test
void testReadFromLocalRssServer10SecondTimeout() throws IOException {
var server = RssServer.with(getFile("atom-feed.xml"))
.port(PORT)
.endpointPath("/rss")
.build();
server.start();

var items = new RssReader()
.setConnectionTimeout(Duration.ofSeconds(10))
.setRequestTimeout(Duration.ofSeconds(10))
.setReadTimeout(Duration.ofSeconds(10))
.read("http://localhost:8008/rss")
.collect(Collectors.toList());

server.stop();
verify(3, items);
}


@Test
void testReadFromLocalRssServer() throws IOException {
var server = RssServer.with(getFile("atom-feed.xml"))
.port(PORT)
.endpointPath("/rss")
.build();
server.start();

var items = new RssReader()
.setReadTimeout(Duration.ofSeconds(2))
.read("http://localhost:8008/rss")
.collect(Collectors.toList());

server.stop();
verify(3, items);
}

@Test
void testNoReadTimeout() throws IOException {
var server = RssServer.with(getFile("atom-feed.xml"))
.port(PORT)
.endpointPath("/rss")
.build();
server.start();

var items = new RssReader()
.setReadTimeout(Duration.ZERO)
.read("http://localhost:8008/rss")
.collect(Collectors.toList());

server.stop();
verify(3, items);
}

@Test
void testReadTimeout() throws IOException {
var server = RssServer.withWritePause(getFile("atom-feed.xml"), Duration.ofSeconds(4))
.port(PORT)
.endpointPath("/slow-server")
.build();
server.start();

var items = new RssReader()
.setReadTimeout(Duration.ofSeconds(2))
.read("http://localhost:8008/slow-server")
.collect(Collectors.toList());

server.stop();
verify(2, items);
}

private static void verify(int expectedSize, List<Item> items) {
assertEquals(expectedSize, items.size());

if (!items.isEmpty()) {
assertEquals("dive into mark", items.get(0).getChannel().getTitle());
assertEquals(65, items.get(0).getChannel().getDescription().length());
assertEquals("http://example.org/feed.atom", items.get(0).getChannel().getLink());
assertEquals("Copyright (c) 2003, Mark Pilgrim", items.get(0).getChannel().getCopyright().orElse(null));
assertEquals("Example Toolkit", items.get(0).getChannel().getGenerator().orElse(null));
assertEquals("2005-07-31T12:29:29Z", items.get(0).getChannel().getLastBuildDate().orElse(null));

assertEquals("Atom draft-07 snapshot", items.get(0).getTitle().orElse(null));
assertNull(items.get(1).getAuthor().orElse(null));
assertEquals("http://example.org/audio/ph34r_my_podcast.mp3", items.get(0).getLink().orElse(null));
assertEquals("tag:example.org,2003:3.2397", items.get(0).getGuid().orElse(null));
assertEquals("2003-12-13T08:29:29-04:00", items.get(0).getPubDate().orElse(null));
assertEquals("2005-07-31T12:29:29Z", items.get(0).getUpdated().orElse(null));
assertEquals(211, items.get(1).getDescription().orElse("").length());
}
if (items.size() >= 2) {
assertEquals("Atom-Powered Robots Run Amok", items.get(1).getTitle().orElse(null));
assertNull(items.get(1).getAuthor().orElse(null));
assertEquals("http://example.org/2003/12/13/atom03", items.get(1).getLink().orElse(null));
assertEquals("urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a", items.get(1).getGuid().orElse(null));
assertEquals("2003-12-13T18:30:02Z", items.get(1).getPubDate().orElse(null));
assertEquals("2003-12-13T18:30:02Z", items.get(1).getUpdated().orElse(null));
assertEquals(211, items.get(1).getDescription().orElse("").length());
}
if (items.size() >= 3) {
assertEquals("Atom-Powered Robots Run Amok 2", items.get(2).getTitle().orElse(null));
assertNull(items.get(2).getAuthor().orElse(null));
assertEquals("http://example.org/2003/12/13/atom04", items.get(2).getLink().orElse(null));
assertEquals("urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6b", items.get(2).getGuid().orElse(null));
assertEquals("2003-12-13T09:28:28-04:00", items.get(2).getPubDate().orElse(null));
assertEquals(1071322108, items.get(2).getPubDateZonedDateTime().map(ZonedDateTime::toEpochSecond).orElse(null));
assertEquals("2003-12-13T18:30:01Z", items.get(2).getUpdated().orElse(null));
assertEquals(1071340201, items.get(2).getUpdatedZonedDateTime().map(ZonedDateTime::toEpochSecond).orElse(null));
assertEquals(47, items.get(2).getDescription().orElse("").length());
}
}

private File getFile(String filename) {
var url = getClass().getClassLoader().getResource(filename);
return new File(url.getFile());
}
}
Loading