Skip to content

Commit

Permalink
[webcrawler] Do not crash (restart the pod) in case of network glitches
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Oct 27, 2023
1 parent eec9581 commit b468452
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,20 +192,10 @@ public boolean runCycle() throws Exception {
// 1xx, 2xx, 3xx...this is not expected as it is not an "ERROR"
// 5xx errors are server side errors, we can retry

int currentCount = status.temporaryErrorOnUrl(current);
if (currentCount >= configuration.getMaxErrorCount()) {
log.info("Too many errors ({}) on url {}, skipping it", currentCount, current);
discardUrl(current, reference);
} else {
log.info("Putting back the url {} into the backlog", current);
forceAddUrl(current, reference.type(), reference.depth());
}
handleTemporaryError(current, reference);
}

// prevent from being banned for flooding
if (configuration.getMinTimeBetweenRequests() > 0) {
Thread.sleep(configuration.getMinTimeBetweenRequests());
}
handleThrottling(current);

// we did something
return true;
Expand All @@ -217,9 +207,17 @@ public boolean runCycle() throws Exception {
discardUrl(current, reference);

// prevent from being banned for flooding
if (configuration.getMinTimeBetweenRequests() > 0) {
Thread.sleep(configuration.getMinTimeBetweenRequests());
}
handleThrottling(current);

// we did something
return true;
} catch (IOException e) {
log.info("Error while crawling url: {}, IO Error: {}", current, e + "");

handleTemporaryError(current, reference);

// prevent from being banned for flooding
handleThrottling(current);

// we did something
return true;
Expand All @@ -245,6 +243,13 @@ public boolean runCycle() throws Exception {
new ai.langstream.agents.webcrawler.crawler.Document(current, document.html()));
}

// prevent from being banned for flooding
handleThrottling(current);

return true;
}

private void handleThrottling(String current) throws InterruptedException {
int delayMs = getCrawlerDelayFromRobots(current);
if (configuration.getMinTimeBetweenRequests() > 0) {
if (delayMs > 0) {
Expand All @@ -257,8 +262,17 @@ public boolean runCycle() throws Exception {
if (delayMs > 0) {
Thread.sleep(delayMs);
}
}

return true;
private void handleTemporaryError(String current, URLReference reference) {
int currentCount = status.temporaryErrorOnUrl(current);
if (currentCount >= configuration.getMaxErrorCount()) {
log.info("Too many errors ({}) on url {}, skipping it", currentCount, current);
discardUrl(current, reference);
} else {
log.info("Putting back the url {} into the backlog", current);
forceAddUrl(current, reference.type(), reference.depth());
}
}

private int getCrawlerDelayFromRobots(String current) {
Expand Down Expand Up @@ -376,14 +390,26 @@ private void handleSitemapsFile(String url) throws Exception {

private HttpResponse<byte[]> downloadUrl(String url) throws IOException, InterruptedException {
HttpClient client = HttpClient.newHttpClient();
HttpResponse<byte[]> response =
client.send(
IOException lastError = null;
for (int i = 0; i < configuration.getMaxErrorCount(); i++) {
try {
return client.send(
HttpRequest.newBuilder()
.uri(URI.create(url))
.header("User-Agent", configuration.getUserAgent())
.build(),
HttpResponse.BodyHandlers.ofByteArray());
return response;
} catch (IOException err) {
lastError = err;
log.warn("Error while downloading url: {}", url, err);
handleThrottling(url);
}
}
if (lastError != null) {
throw lastError;
} else {
throw new IOException("Unknown error while downloading url: " + url);
}
}

public void restartIndexing(Set<String> seedUrls) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package ai.langstream.agents.webcrawler.crawler;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.notFound;
import static com.github.tomakehurst.wiremock.client.WireMock.okForContentType;
Expand All @@ -23,6 +24,7 @@
import static com.github.tomakehurst.wiremock.client.WireMock.temporaryRedirect;
import static org.junit.jupiter.api.Assertions.*;

import com.github.tomakehurst.wiremock.http.Fault;
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import java.util.ArrayList;
Expand Down Expand Up @@ -219,4 +221,105 @@ void testRedirects(WireMockRuntimeInfo vmRuntimeInfo) throws Exception {
// nothing to do
assertFalse(crawler.runCycle());
}

@Test
void testNetworkErrors(WireMockRuntimeInfo vmRuntimeInfo) throws Exception {

stubFor(
get("/index.html")
.willReturn(
okForContentType(
"text/html",
"""
<a href="internalErrorPage.html">link</a>
""")));
stubFor(
get("/internalErrorPage.html")
.willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)));

WebCrawlerConfiguration configuration =
WebCrawlerConfiguration.builder()
.allowedDomains(Set.of(vmRuntimeInfo.getHttpBaseUrl()))
.handleRobotsFile(false)
.maxErrorCount(5)
.build();
WebCrawlerStatus status = new WebCrawlerStatus();
List<Document> documents = new ArrayList<>();
WebCrawler crawler = new WebCrawler(configuration, status, documents::add);
crawler.crawl(vmRuntimeInfo.getHttpBaseUrl() + "/index.html");
crawler.runCycle();

assertEquals(1, documents.size());
assertEquals(vmRuntimeInfo.getHttpBaseUrl() + "/index.html", documents.get(0).url());
assertEquals(1, status.getPendingUrls().size());
assertEquals(2, status.getUrls().size());

// process the internalErrorPage
crawler.runCycle();

assertEquals(1, status.getPendingUrls().size());
assertEquals(2, status.getUrls().size());

// process the internalErrorPage
crawler.runCycle();

assertEquals(1, status.getPendingUrls().size());
assertEquals(2, status.getUrls().size());

// now the error page starts to work again
stubFor(
get("/internalErrorPage.html")
.willReturn(
okForContentType(
"text/html",
"""
ok !
""")));

// process the internalErrorPage
crawler.runCycle();

assertEquals(0, status.getPendingUrls().size());
assertEquals(2, status.getUrls().size());
}

@Test
void testNetworkErrorsEventuallyFail(WireMockRuntimeInfo vmRuntimeInfo) throws Exception {

stubFor(
get("/index.html")
.willReturn(
okForContentType(
"text/html",
"""
<a href="internalErrorPage.html">link</a>
""")));
stubFor(
get("/internalErrorPage.html")
.willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)));

WebCrawlerConfiguration configuration =
WebCrawlerConfiguration.builder()
.allowedDomains(Set.of(vmRuntimeInfo.getHttpBaseUrl()))
.handleRobotsFile(false)
.maxErrorCount(1)
.build();
WebCrawlerStatus status = new WebCrawlerStatus();
List<Document> documents = new ArrayList<>();
WebCrawler crawler = new WebCrawler(configuration, status, documents::add);
crawler.crawl(vmRuntimeInfo.getHttpBaseUrl() + "/index.html");
crawler.runCycle();

assertEquals(1, documents.size());
assertEquals(vmRuntimeInfo.getHttpBaseUrl() + "/index.html", documents.get(0).url());
assertEquals(1, status.getPendingUrls().size());
assertEquals(2, status.getUrls().size());

// process the internalErrorPage
crawler.runCycle();

// we gave up, too many errors
assertEquals(0, status.getPendingUrls().size());
assertEquals(2, status.getUrls().size());
}
}

0 comments on commit b468452

Please sign in to comment.