diff --git a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawler.java b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawler.java index 95706b77d..43ae24158 100644 --- a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawler.java +++ b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawler.java @@ -192,20 +192,10 @@ public boolean runCycle() throws Exception { // 1xx, 2xx, 3xx...this is not expected as it is not an "ERROR" // 5xx errors are server side errors, we can retry - int currentCount = status.temporaryErrorOnUrl(current); - if (currentCount >= configuration.getMaxErrorCount()) { - log.info("Too many errors ({}) on url {}, skipping it", currentCount, current); - discardUrl(current, reference); - } else { - log.info("Putting back the url {} into the backlog", current); - forceAddUrl(current, reference.type(), reference.depth()); - } + handleTemporaryError(current, reference); } - // prevent from being banned for flooding - if (configuration.getMinTimeBetweenRequests() > 0) { - Thread.sleep(configuration.getMinTimeBetweenRequests()); - } + handleThrottling(current); // we did something return true; @@ -217,9 +207,17 @@ public boolean runCycle() throws Exception { discardUrl(current, reference); // prevent from being banned for flooding - if (configuration.getMinTimeBetweenRequests() > 0) { - Thread.sleep(configuration.getMinTimeBetweenRequests()); - } + handleThrottling(current); + + // we did something + return true; + } catch (IOException e) { + log.info("Error while crawling url: {}, IO Error: {}", current, e + ""); + + handleTemporaryError(current, reference); + + // prevent from being banned for flooding + handleThrottling(current); // we did something return true; @@ -245,6 +243,13 @@ public boolean runCycle() throws Exception { new ai.langstream.agents.webcrawler.crawler.Document(current, document.html())); } + // prevent from being banned for flooding + handleThrottling(current); + + return true; + } + + private void handleThrottling(String current) throws InterruptedException { int delayMs = getCrawlerDelayFromRobots(current); if (configuration.getMinTimeBetweenRequests() > 0) { if (delayMs > 0) { @@ -257,8 +262,17 @@ public boolean runCycle() throws Exception { if (delayMs > 0) { Thread.sleep(delayMs); } + } - return true; + private void handleTemporaryError(String current, URLReference reference) { + int currentCount = status.temporaryErrorOnUrl(current); + if (currentCount >= configuration.getMaxErrorCount()) { + log.info("Too many errors ({}) on url {}, skipping it", currentCount, current); + discardUrl(current, reference); + } else { + log.info("Putting back the url {} into the backlog", current); + forceAddUrl(current, reference.type(), reference.depth()); + } } private int getCrawlerDelayFromRobots(String current) { @@ -376,14 +390,26 @@ private void handleSitemapsFile(String url) throws Exception { private HttpResponse downloadUrl(String url) throws IOException, InterruptedException { HttpClient client = HttpClient.newHttpClient(); - HttpResponse response = - client.send( + IOException lastError = null; + for (int i = 0; i < configuration.getMaxErrorCount(); i++) { + try { + return client.send( HttpRequest.newBuilder() .uri(URI.create(url)) .header("User-Agent", configuration.getUserAgent()) .build(), HttpResponse.BodyHandlers.ofByteArray()); - return response; + } catch (IOException err) { + lastError = err; + log.warn("Error while downloading url: {}", url, err); + handleThrottling(url); + } + } + if (lastError != null) { + throw lastError; + } else { + throw new IOException("Unknown error while downloading url: " + url); + } } public void restartIndexing(Set seedUrls) { diff --git a/langstream-agents/langstream-agent-webcrawler/src/test/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerTest.java b/langstream-agents/langstream-agent-webcrawler/src/test/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerTest.java index 2230ef92e..06620638b 100644 --- a/langstream-agents/langstream-agent-webcrawler/src/test/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerTest.java +++ b/langstream-agents/langstream-agent-webcrawler/src/test/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerTest.java @@ -15,6 +15,7 @@ */ package ai.langstream.agents.webcrawler.crawler; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.notFound; import static com.github.tomakehurst.wiremock.client.WireMock.okForContentType; @@ -23,6 +24,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.temporaryRedirect; import static org.junit.jupiter.api.Assertions.*; +import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; import java.util.ArrayList; @@ -219,4 +221,105 @@ void testRedirects(WireMockRuntimeInfo vmRuntimeInfo) throws Exception { // nothing to do assertFalse(crawler.runCycle()); } + + @Test + void testNetworkErrors(WireMockRuntimeInfo vmRuntimeInfo) throws Exception { + + stubFor( + get("/index.html") + .willReturn( + okForContentType( + "text/html", + """ + link + """))); + stubFor( + get("/internalErrorPage.html") + .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER))); + + WebCrawlerConfiguration configuration = + WebCrawlerConfiguration.builder() + .allowedDomains(Set.of(vmRuntimeInfo.getHttpBaseUrl())) + .handleRobotsFile(false) + .maxErrorCount(5) + .build(); + WebCrawlerStatus status = new WebCrawlerStatus(); + List documents = new ArrayList<>(); + WebCrawler crawler = new WebCrawler(configuration, status, documents::add); + crawler.crawl(vmRuntimeInfo.getHttpBaseUrl() + "/index.html"); + crawler.runCycle(); + + assertEquals(1, documents.size()); + assertEquals(vmRuntimeInfo.getHttpBaseUrl() + "/index.html", documents.get(0).url()); + assertEquals(1, status.getPendingUrls().size()); + assertEquals(2, status.getUrls().size()); + + // process the internalErrorPage + crawler.runCycle(); + + assertEquals(1, status.getPendingUrls().size()); + assertEquals(2, status.getUrls().size()); + + // process the internalErrorPage + crawler.runCycle(); + + assertEquals(1, status.getPendingUrls().size()); + assertEquals(2, status.getUrls().size()); + + // now the error page starts to work again + stubFor( + get("/internalErrorPage.html") + .willReturn( + okForContentType( + "text/html", + """ + ok ! + """))); + + // process the internalErrorPage + crawler.runCycle(); + + assertEquals(0, status.getPendingUrls().size()); + assertEquals(2, status.getUrls().size()); + } + + @Test + void testNetworkErrorsEventuallyFail(WireMockRuntimeInfo vmRuntimeInfo) throws Exception { + + stubFor( + get("/index.html") + .willReturn( + okForContentType( + "text/html", + """ + link + """))); + stubFor( + get("/internalErrorPage.html") + .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER))); + + WebCrawlerConfiguration configuration = + WebCrawlerConfiguration.builder() + .allowedDomains(Set.of(vmRuntimeInfo.getHttpBaseUrl())) + .handleRobotsFile(false) + .maxErrorCount(1) + .build(); + WebCrawlerStatus status = new WebCrawlerStatus(); + List documents = new ArrayList<>(); + WebCrawler crawler = new WebCrawler(configuration, status, documents::add); + crawler.crawl(vmRuntimeInfo.getHttpBaseUrl() + "/index.html"); + crawler.runCycle(); + + assertEquals(1, documents.size()); + assertEquals(vmRuntimeInfo.getHttpBaseUrl() + "/index.html", documents.get(0).url()); + assertEquals(1, status.getPendingUrls().size()); + assertEquals(2, status.getUrls().size()); + + // process the internalErrorPage + crawler.runCycle(); + + // we gave up, too many errors + assertEquals(0, status.getPendingUrls().size()); + assertEquals(2, status.getUrls().size()); + } }