diff --git a/src/main/java/com/apptasticsoftware/rssreader/AbstractRssReader.java b/src/main/java/com/apptasticsoftware/rssreader/AbstractRssReader.java index a601bf2..6978f98 100644 --- a/src/main/java/com/apptasticsoftware/rssreader/AbstractRssReader.java +++ b/src/main/java/com/apptasticsoftware/rssreader/AbstractRssReader.java @@ -64,9 +64,13 @@ */ public abstract class AbstractRssReader { private static final String LOG_GROUP = "com.apptasticsoftware.rssreader"; + private static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(1); private final HttpClient httpClient; private DateTimeParser dateTimeParser = new DateTime(); private String userAgent = ""; + private Duration connectionTimeout = Duration.ofSeconds(25); + private Duration requestTimeout = Duration.ofSeconds(25); + private Duration readTimeout = Duration.ofSeconds(25); private final Map headers = new HashMap<>(); private final HashMap> channelTags = new HashMap<>(); private final HashMap>> channelAttributes = new HashMap<>(); @@ -240,7 +244,7 @@ protected void registerItemAttributes() { } /** - * Date and Time parser for parsing timestamps. + * Date and time parser for parsing timestamps. * @param dateTimeParser the date time parser to use. * @return updated RSSReader. */ @@ -252,8 +256,8 @@ public AbstractRssReader setDateTimeParser(DateTimeParser dateTimeParser) } /** - * Sets the user-agent of the HttpClient. - * This is completely optional and if not set then it will not send a user-agent header. + * Sets the user-agent of the http client. + * Optional parameter if not set the default value for {@code java.net.http.HttpClient} will be used. * @param userAgent the user-agent to use. * @return updated RSSReader. */ @@ -265,8 +269,7 @@ public AbstractRssReader setUserAgent(String userAgent) { } /** - * Adds a http header to the HttpClient. - * This is completely optional and if no headers are set then it will not add anything. + * Adds a http header to the http client. * @param key the key name of the header. * @param value the value of the header. * @return updated RSSReader. @@ -279,6 +282,58 @@ public AbstractRssReader addHeader(String key, String value) { return this; } + /** + * Sets the connection timeout for the http client. + * The connection timeout is the time it takes to establish a connection to the server. + * If set to zero the default value for {@link java.net.http.HttpClient.Builder#connectTimeout(Duration)} will be used. + * Default: 25 seconds. + * + * @param connectionTimeout the timeout duration. + * @return updated RSSReader. + */ + public AbstractRssReader setConnectionTimeout(Duration connectionTimeout) { + validate(connectionTimeout, "Connection timeout"); + this.connectionTimeout = connectionTimeout; + return this; + } + + /** + * Sets the request timeout for the http client. + * The request timeout is the time between the request is sent and the first byte of the response is received. + * If set to zero the default value for {@link java.net.http.HttpRequest.Builder#timeout(Duration)} will be used. + * Default: 25 seconds. + * + * @param requestTimeout the timeout duration. + * @return updated RSSReader. + */ + public AbstractRssReader setRequestTimeout(Duration requestTimeout) { + validate(requestTimeout, "Request timeout"); + this.requestTimeout = requestTimeout; + return this; + } + + /** + * Sets the read timeout. + * The read timeout it the time for reading all data in the response body. + * The effect of setting the timeout to zero is the same as setting an infinite Duration, ie. block forever. + * Default: 25 seconds. + * + * @param readTimeout the timeout duration. + * @return updated RSSReader. + */ + public AbstractRssReader setReadTimeout(Duration readTimeout) { + validate(readTimeout, "Read timeout"); + this.readTimeout = readTimeout; + return this; + } + + private void validate(Duration duration, String name) { + Objects.requireNonNull(duration, name + " must not be null"); + if (duration.isNegative()) { + throw new IllegalArgumentException(name + " must not be negative"); + } + } + /** * Add item extension for tags * @param tag - tag name @@ -450,8 +505,10 @@ public CompletableFuture> readAsync(String url) { */ protected CompletableFuture> sendAsyncRequest(String url) { var builder = HttpRequest.newBuilder(URI.create(url)) - .timeout(Duration.ofSeconds(25)) .header("Accept-Encoding", "gzip"); + if (requestTimeout.toMillis() > 0) { + builder.timeout(requestTimeout); + } if (!userAgent.isBlank()) builder.header("User-Agent", userAgent); @@ -510,6 +567,7 @@ class RssItemIterator implements Iterator { private I nextItem; private boolean isChannelPart = false; private boolean isItemPart = false; + private ScheduledFuture parseWatchdog; public RssItemIterator(InputStream is) { this.is = is; @@ -528,6 +586,9 @@ public RssItemIterator(InputStream is) { xmlInFact.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE); reader = xmlInFact.createXMLStreamReader(is); + if (!readTimeout.isZero()) { + parseWatchdog = EXECUTOR.schedule(this::close, readTimeout.toMillis(), TimeUnit.MILLISECONDS); + } } catch (XMLStreamException e) { var logger = Logger.getLogger(LOG_GROUP); @@ -539,6 +600,9 @@ public RssItemIterator(InputStream is) { public void close() { try { + if (parseWatchdog != null) { + parseWatchdog.cancel(false); + } reader.close(); is.close(); } catch (XMLStreamException | IOException e) { @@ -783,16 +847,20 @@ private HttpClient createHttpClient() { var context = SSLContext.getInstance("TLSv1.3"); context.init(null, null, null); - client = HttpClient.newBuilder() + var builder = HttpClient.newBuilder() .sslContext(context) - .connectTimeout(Duration.ofSeconds(25)) - .followRedirects(HttpClient.Redirect.ALWAYS) - .build(); + .followRedirects(HttpClient.Redirect.ALWAYS); + if (connectionTimeout.toMillis() > 0) { + builder.connectTimeout(connectionTimeout); + } + client = builder.build(); } catch (NoSuchAlgorithmException | KeyManagementException e) { - client = HttpClient.newBuilder() - .connectTimeout(Duration.ofSeconds(25)) - .followRedirects(HttpClient.Redirect.ALWAYS) - .build(); + var builder = HttpClient.newBuilder() + .followRedirects(HttpClient.Redirect.ALWAYS); + if (connectionTimeout.toMillis() > 0) { + builder.connectTimeout(connectionTimeout); + } + client = builder.build(); } return client; diff --git a/src/test/java/com/apptasticsoftware/integrationtest/ConnectionTest.java b/src/test/java/com/apptasticsoftware/integrationtest/ConnectionTest.java new file mode 100644 index 0000000..ff21f71 --- /dev/null +++ b/src/test/java/com/apptasticsoftware/integrationtest/ConnectionTest.java @@ -0,0 +1,198 @@ +package com.apptasticsoftware.integrationtest; + +import com.apptasticsoftware.rssreader.Item; +import com.apptasticsoftware.rssreader.RssReader; +import com.apptasticsoftware.rssreader.util.RssServer; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.*; + +class ConnectionTest { + private static final int PORT = 8008; + private static final Duration NEGATIVE_DURATION = Duration.ofSeconds(-30); + + @Test + void testConnectionTimeoutWithNullValue() { + var rssReader = new RssReader(); + var exception = assertThrows(NullPointerException.class, () -> rssReader.setConnectionTimeout(null)); + assertEquals("Connection timeout must not be null", exception.getMessage()); + } + + @Test + void testRequestTimeoutWithNullValue() { + var rssReader = new RssReader(); + var exception = assertThrows(NullPointerException.class, () -> rssReader.setRequestTimeout(null)); + assertEquals("Request timeout must not be null", exception.getMessage()); + } + + @Test + void testReadTimeoutWithNullValue() { + var rssReader = new RssReader(); + var exception = assertThrows(NullPointerException.class, () -> rssReader.setReadTimeout(null)); + assertEquals("Read timeout must not be null", exception.getMessage()); + } + + @Test + void testConnectionTimeoutWithNegativeValue() { + var rssReader = new RssReader(); + var exception = assertThrows(IllegalArgumentException.class, () -> rssReader.setConnectionTimeout(NEGATIVE_DURATION)); + assertEquals("Connection timeout must not be negative", exception.getMessage()); + } + + @Test + void testRequestTimeoutWithNegativeValue() { + var rssReader = new RssReader(); + var exception = assertThrows(IllegalArgumentException.class, () -> rssReader.setRequestTimeout(NEGATIVE_DURATION)); + assertEquals("Request timeout must not be negative", exception.getMessage()); + } + + @Test + void testReadTimeoutWithNegativeValue() { + var rssReader = new RssReader(); + var exception = assertThrows(IllegalArgumentException.class, () -> rssReader.setReadTimeout(NEGATIVE_DURATION)); + assertEquals("Read timeout must not be negative", exception.getMessage()); + } + + @Test + void testReadFromLocalRssServerNoTimeout() throws IOException { + var server = RssServer.with(getFile("atom-feed.xml")) + .port(PORT) + .endpointPath("/rss") + .build(); + server.start(); + + var items = new RssReader() + .setConnectionTimeout(Duration.ZERO) + .setRequestTimeout(Duration.ZERO) + .setReadTimeout(Duration.ZERO) + .read("http://localhost:8008/rss") + .collect(Collectors.toList()); + + server.stop(); + verify(3, items); + } + + @Test + void testReadFromLocalRssServer10SecondTimeout() throws IOException { + var server = RssServer.with(getFile("atom-feed.xml")) + .port(PORT) + .endpointPath("/rss") + .build(); + server.start(); + + var items = new RssReader() + .setConnectionTimeout(Duration.ofSeconds(10)) + .setRequestTimeout(Duration.ofSeconds(10)) + .setReadTimeout(Duration.ofSeconds(10)) + .read("http://localhost:8008/rss") + .collect(Collectors.toList()); + + server.stop(); + verify(3, items); + } + + + @Test + void testReadFromLocalRssServer() throws IOException { + var server = RssServer.with(getFile("atom-feed.xml")) + .port(PORT) + .endpointPath("/rss") + .build(); + server.start(); + + var items = new RssReader() + .setReadTimeout(Duration.ofSeconds(2)) + .read("http://localhost:8008/rss") + .collect(Collectors.toList()); + + server.stop(); + verify(3, items); + } + + @Test + void testNoReadTimeout() throws IOException { + var server = RssServer.with(getFile("atom-feed.xml")) + .port(PORT) + .endpointPath("/rss") + .build(); + server.start(); + + var items = new RssReader() + .setReadTimeout(Duration.ZERO) + .read("http://localhost:8008/rss") + .collect(Collectors.toList()); + + server.stop(); + verify(3, items); + } + + @Test + void testReadTimeout() throws IOException { + var server = RssServer.withWritePause(getFile("atom-feed.xml"), Duration.ofSeconds(4)) + .port(PORT) + .endpointPath("/slow-server") + .build(); + server.start(); + + var items = new RssReader() + .setReadTimeout(Duration.ofSeconds(2)) + .read("http://localhost:8008/slow-server") + .collect(Collectors.toList()); + + server.stop(); + verify(2, items); + } + + private static void verify(int expectedSize, List items) { + assertEquals(expectedSize, items.size()); + + if (!items.isEmpty()) { + assertEquals("dive into mark", items.get(0).getChannel().getTitle()); + assertEquals(65, items.get(0).getChannel().getDescription().length()); + assertEquals("http://example.org/feed.atom", items.get(0).getChannel().getLink()); + assertEquals("Copyright (c) 2003, Mark Pilgrim", items.get(0).getChannel().getCopyright().orElse(null)); + assertEquals("Example Toolkit", items.get(0).getChannel().getGenerator().orElse(null)); + assertEquals("2005-07-31T12:29:29Z", items.get(0).getChannel().getLastBuildDate().orElse(null)); + + assertEquals("Atom draft-07 snapshot", items.get(0).getTitle().orElse(null)); + assertNull(items.get(1).getAuthor().orElse(null)); + assertEquals("http://example.org/audio/ph34r_my_podcast.mp3", items.get(0).getLink().orElse(null)); + assertEquals("tag:example.org,2003:3.2397", items.get(0).getGuid().orElse(null)); + assertEquals("2003-12-13T08:29:29-04:00", items.get(0).getPubDate().orElse(null)); + assertEquals("2005-07-31T12:29:29Z", items.get(0).getUpdated().orElse(null)); + assertEquals(211, items.get(1).getDescription().orElse("").length()); + } + if (items.size() >= 2) { + assertEquals("Atom-Powered Robots Run Amok", items.get(1).getTitle().orElse(null)); + assertNull(items.get(1).getAuthor().orElse(null)); + assertEquals("http://example.org/2003/12/13/atom03", items.get(1).getLink().orElse(null)); + assertEquals("urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a", items.get(1).getGuid().orElse(null)); + assertEquals("2003-12-13T18:30:02Z", items.get(1).getPubDate().orElse(null)); + assertEquals("2003-12-13T18:30:02Z", items.get(1).getUpdated().orElse(null)); + assertEquals(211, items.get(1).getDescription().orElse("").length()); + } + if (items.size() >= 3) { + assertEquals("Atom-Powered Robots Run Amok 2", items.get(2).getTitle().orElse(null)); + assertNull(items.get(2).getAuthor().orElse(null)); + assertEquals("http://example.org/2003/12/13/atom04", items.get(2).getLink().orElse(null)); + assertEquals("urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6b", items.get(2).getGuid().orElse(null)); + assertEquals("2003-12-13T09:28:28-04:00", items.get(2).getPubDate().orElse(null)); + assertEquals(1071322108, items.get(2).getPubDateZonedDateTime().map(ZonedDateTime::toEpochSecond).orElse(null)); + assertEquals("2003-12-13T18:30:01Z", items.get(2).getUpdated().orElse(null)); + assertEquals(1071340201, items.get(2).getUpdatedZonedDateTime().map(ZonedDateTime::toEpochSecond).orElse(null)); + assertEquals(47, items.get(2).getDescription().orElse("").length()); + } + } + + private File getFile(String filename) { + var url = getClass().getClassLoader().getResource(filename); + return new File(url.getFile()); + } +} diff --git a/src/test/java/com/apptasticsoftware/rssreader/util/RssServer.java b/src/test/java/com/apptasticsoftware/rssreader/util/RssServer.java new file mode 100644 index 0000000..2f4f93c --- /dev/null +++ b/src/test/java/com/apptasticsoftware/rssreader/util/RssServer.java @@ -0,0 +1,177 @@ +package com.apptasticsoftware.rssreader.util; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; + +import java.io.*; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * Basic RSS server from testing + */ +public class RssServer { + private static final Logger LOGGER = Logger.getLogger("RssServer"); + private final HttpServer server; + + private RssServer(int port, String endpointPath, File file, Duration writeBodyPause) throws IOException { + server = HttpServer.create(new InetSocketAddress(port), 0); + server.createContext(endpointPath, new FileRssHandler(file, writeBodyPause)); + server.setExecutor(null); + } + + /** + * RSS server that publish the given file content as an RSS/Atom feed. + * @param file content to publish + * @return RSS server + */ + public static RssServerBuilder with(File file) { + Objects.requireNonNull(file, "File must not be null"); + if (!file.isFile()) { + throw new IllegalArgumentException("File must exist"); + } + return new RssServerBuilder(file, Duration.ZERO); + } + + /** + * RSS server that publish the given file content as an RSS/Atom feed. + * Server will publish 90% of the data and then wait the given amount of time before publish the rest of the data. + * @param file content to publish + * @param writeBodyPause time to wait before publishing the last data + * @return RSS server + */ + public static RssServerBuilder withWritePause(File file, Duration writeBodyPause) { + Objects.requireNonNull(file, "File must not be null"); + if (!file.isFile()) { + throw new IllegalArgumentException("File must exist"); + } + Objects.requireNonNull(writeBodyPause, "Write body pause must not be null"); + if (writeBodyPause.isNegative()) { + throw new IllegalArgumentException("Write body pause must not be negative"); + } + return new RssServerBuilder(file, writeBodyPause); + } + + /** + * Start RSS server + */ + public void start() { + server.start(); + } + + /** + * Stop RSS server + */ + public void stop() { + server.stop(1); + } + + private static class FileRssHandler implements HttpHandler { + private final File file; + private final Duration writeBodyPause; + + public FileRssHandler(File file, Duration writeBodyPause) { + this.file = file; + this.writeBodyPause = writeBodyPause; + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + LOGGER.info("New connection " + Instant.now()); + var responseBodyLength = Files.size(file.toPath()); + exchange.sendResponseHeaders(200, responseBodyLength); + + try (var os = exchange.getResponseBody()) { + writeResponseBody(os, responseBodyLength); + } + + LOGGER.info("Connection closed " + Instant.now()); + } + + private void writeResponseBody(OutputStream os, long responseBodyLength) throws IOException { + byte[] buffer = new byte[128]; + int readLength; + int totalReadLength = 0; + boolean hasPaused = false; + + try (var is = new FileInputStream(file)){ + while ((readLength = is.read(buffer)) != -1) { + totalReadLength += readLength; + os.write(buffer, 0, readLength); + if (isWritePause(totalReadLength, responseBodyLength) && !hasPaused) { + pause(writeBodyPause); + hasPaused = true; + LOGGER.info("Continue to write " + Instant.now()); + } + } + } + + os.flush(); + } + + private boolean isWritePause(int length, long totalLength) { + return writeBodyPause.toMillis() > 0 && length >= totalLength * 0.90; + } + + @SuppressWarnings("java:S2925") + private void pause(Duration duration) { + try { + TimeUnit.MILLISECONDS.sleep(duration.toMillis()); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + + } + + /** + * Builder for RSS server + */ + public static class RssServerBuilder { + private int port = 8080; + private String endpointPath = "/rss"; + private final File file; + private final Duration writeBodyPause; + + RssServerBuilder(File file, Duration writeBodyPause) { + this.file = file; + this.writeBodyPause = writeBodyPause; + } + + /** + * Port number to use. Default: 8080 + * @param port port number + * @return builder + */ + public RssServerBuilder port(int port) { + this.port = port; + return this; + } + + /** + * The endpoint path to use. Default: /rss + * @param endpointPath endpoint path + * @return builder + */ + public RssServerBuilder endpointPath(String endpointPath) { + this.endpointPath = endpointPath; + return this; + } + + /** + * Builds and configures the RSS server + * @return RSS server + * @throws IOException if an I/O error occurs + */ + public RssServer build() throws IOException { + return new RssServer(port, endpointPath, file, writeBodyPause); + } + } + +}