From 810369421dbfd5f356aac4b8d5dccefc784c5065 Mon Sep 17 00:00:00 2001 From: Jeff Mendez Date: Tue, 21 Nov 2023 18:27:15 -0500 Subject: [PATCH] chore(website): add crawl_concurrent_raw method (#152) --- Cargo.lock | 8 +- examples/Cargo.toml | 4 +- spider/Cargo.toml | 2 +- spider/README.md | 21 ++- spider/src/page.rs | 15 +- spider/src/website.rs | 310 +++++++++++++++++++++++++++------------ spider_cli/Cargo.toml | 4 +- spider_worker/Cargo.toml | 4 +- 8 files changed, 248 insertions(+), 120 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3f52f355..3c83e2d22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3569,7 +3569,7 @@ dependencies = [ [[package]] name = "spider" -version = "1.49.6" +version = "1.49.7" dependencies = [ "ahash", "bytes", @@ -3604,7 +3604,7 @@ dependencies = [ [[package]] name = "spider_cli" -version = "1.49.6" +version = "1.49.7" dependencies = [ "clap 4.4.4", "env_logger 0.9.3", @@ -3616,7 +3616,7 @@ dependencies = [ [[package]] name = "spider_examples" -version = "1.49.6" +version = "1.49.7" dependencies = [ "convert_case", "env_logger 0.9.3", @@ -3637,7 +3637,7 @@ dependencies = [ [[package]] name = "spider_worker" -version = "1.49.6" +version = "1.49.7" dependencies = [ "env_logger 0.10.0", "lazy_static", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 0df9765f4..bed34b399 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_examples" -version = "1.49.6" +version = "1.49.7" authors = ["madeindjs ", "j-mendez "] description = "Multithreaded web crawler written in Rust." repository = "https://github.com/spider-rs/spider" @@ -22,7 +22,7 @@ htr = "0.5.27" flexbuffers = "2.0.0" [dependencies.spider] -version = "1.49.6" +version = "1.49.7" path = "../spider" features = ["serde"] diff --git a/spider/Cargo.toml b/spider/Cargo.toml index 89ed2807c..0f335632b 100644 --- a/spider/Cargo.toml +++ b/spider/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider" -version = "1.49.6" +version = "1.49.7" authors = ["madeindjs ", "j-mendez "] description = "The fastest web crawler written in Rust." repository = "https://github.com/spider-rs/spider" diff --git a/spider/README.md b/spider/README.md index ec4e981d2..dabfc7d2b 100644 --- a/spider/README.md +++ b/spider/README.md @@ -16,7 +16,7 @@ This is a basic async example crawling a web page, add spider to your `Cargo.tom ```toml [dependencies] -spider = "1.49.6" +spider = "1.49.7" ``` And then the code: @@ -87,7 +87,7 @@ We have a couple optional feature flags. Regex blacklisting, jemaloc backend, gl ```toml [dependencies] -spider = { version = "1.49.6", features = ["regex", "ua_generator"] } +spider = { version = "1.49.7", features = ["regex", "ua_generator"] } ``` 1. `ua_generator`: Enables auto generating a random real User-Agent. @@ -116,7 +116,7 @@ Move processing to a worker, drastically increases performance even if worker is ```toml [dependencies] -spider = { version = "1.49.6", features = ["decentralized"] } +spider = { version = "1.49.7", features = ["decentralized"] } ``` ```sh @@ -136,7 +136,7 @@ Use the subscribe method to get a broadcast channel. ```toml [dependencies] -spider = { version = "1.49.6", features = ["sync"] } +spider = { version = "1.49.7", features = ["sync"] } ``` ```rust,no_run @@ -166,7 +166,7 @@ Allow regex for blacklisting routes ```toml [dependencies] -spider = { version = "1.49.6", features = ["regex"] } +spider = { version = "1.49.7", features = ["regex"] } ``` ```rust,no_run @@ -193,7 +193,7 @@ If you are performing large workloads you may need to control the crawler by ena ```toml [dependencies] -spider = { version = "1.49.6", features = ["control"] } +spider = { version = "1.49.7", features = ["control"] } ``` ```rust @@ -257,6 +257,15 @@ async fn main() { } ``` +### Chrome + +```toml +[dependencies] +spider = { version = "1.49.7", features = ["chrome"] } +``` + +You can use `website.crawl_concurrent_raw` to perform a crawl without chromium when needed. Use the feature flag `chrome_headed` to enable headful browser usage if needed to debug. + ### Blocking If you need a blocking sync implementation use a version prior to `v1.12.0`. diff --git a/spider/src/page.rs b/spider/src/page.rs index 82e5de00f..0e0f6d28d 100644 --- a/spider/src/page.rs +++ b/spider/src/page.rs @@ -197,14 +197,6 @@ pub fn build(_: &str, res: PageResponse) -> Page { } impl Page { - #[cfg(all(not(feature = "decentralized"), feature = "chrome"))] - /// Instantiate a new page and gather the html. - pub async fn new(url: &str, client: &Client, page: &chromiumoxide::Page) -> Self { - let page_resource = crate::utils::fetch_page_html(&url, &client, &page).await; - build(url, page_resource) - } - - #[cfg(not(feature = "decentralized"))] /// Instantiate a new page and gather the html repro of standard fetch_page_html. pub async fn new_page(url: &str, client: &Client) -> Self { let page_resource = crate::utils::fetch_page_html_raw(&url, &client).await; @@ -218,6 +210,13 @@ impl Page { build(url, page_resource) } + #[cfg(all(not(feature = "decentralized"), feature = "chrome"))] + /// Instantiate a new page and gather the html. + pub async fn new(url: &str, client: &Client, page: &chromiumoxide::Page) -> Self { + let page_resource = crate::utils::fetch_page_html(&url, &client, &page).await; + build(url, page_resource) + } + /// Instantiate a new page and gather the links. #[cfg(feature = "decentralized")] pub async fn new(url: &str, client: &Client) -> Self { diff --git a/spider/src/website.rs b/spider/src/website.rs index c9ef70d11..476b25df2 100644 --- a/spider/src/website.rs +++ b/spider/src/website.rs @@ -669,7 +669,6 @@ impl Website { } /// setup selectors for handling link targets - #[cfg(not(feature = "decentralized"))] fn setup_selectors(&self) -> Option<(CompactString, smallvec::SmallVec<[CompactString; 2]>)> { get_page_selectors( &self.domain.inner(), @@ -679,40 +678,32 @@ impl Website { } /// setup shared concurrent configs - #[cfg(not(feature = "decentralized"))] fn setup_crawl( &mut self, ) -> ( - Box>, std::pin::Pin>, std::pin::Pin>, ) { self.status = CrawlStatus::Active; - let blacklist_url = self.configuration.get_blacklist(); let interval = Box::pin(tokio::time::interval(Duration::from_millis(10))); let throttle = Box::pin(self.get_delay()); - (blacklist_url, interval, throttle) + (interval, throttle) } /// get base link for crawl establishing - #[cfg(all(not(feature = "glob"), feature = "regex"))] + #[cfg(feature = "regex")] fn get_base_link(&self) -> &CaseInsensitiveString { &self.domain } /// get base link for crawl establishing - #[cfg(all(not(feature = "glob"), not(feature = "regex")))] + #[cfg(not(feature = "regex"))] fn get_base_link(&self) -> &CompactString { self.domain.inner() } /// expand links for crawl - #[cfg(all( - not(feature = "glob"), - not(feature = "decentralized"), - not(feature = "chrome") - ))] async fn _crawl_establish( &mut self, client: &Client, @@ -722,7 +713,7 @@ impl Website { let links: HashSet = if self .is_allowed_default(&self.get_base_link(), &self.configuration.get_blacklist()) { - let page = Page::new(&self.domain.inner(), &client).await; + let page = Page::new_page(&self.domain.inner(), &client).await; if !self.external_domains.is_empty() { self.external_domains_caseless = self @@ -1034,6 +1025,13 @@ impl Website { self.crawl_concurrent(&client, &handle).await; } + #[cfg(not(feature = "sitemap"))] + /// Start to crawl website with async concurrency using the base raw functionality. Useful when using the "chrome" feature and defaulting to the basic implementation. + pub async fn crawl_raw(&mut self) { + let (client, handle) = self.setup().await; + self.crawl_concurrent_raw(&client, &handle).await; + } + #[cfg(not(feature = "sitemap"))] /// Start to scrape/download website with async concurrency pub async fn scrape(&mut self) { @@ -1049,6 +1047,14 @@ impl Website { self.sitemap_crawl(&client, &handle, false).await; } + #[cfg(feature = "sitemap")] + /// Start to crawl website and include sitemap links with async concurrency using the base raw functionality. Useful when using the "chrome" feature and defaulting to the basic implementation. + pub async fn crawl_raw(&mut self) { + let (client, handle) = self.setup().await; + self.crawl_concurrent_raw(&client, &handle).await; + self.sitemap_crawl(&client, &handle, false).await; + } + #[cfg(feature = "sitemap")] /// Start to scrape/download website with async concurrency pub async fn scrape(&mut self) { @@ -1057,6 +1063,115 @@ impl Website { self.sitemap_crawl(&client, &handle, true).await; } + /// Start to crawl website concurrently + async fn crawl_concurrent_raw(&mut self, client: &Client, handle: &Option>) { + match self.setup_selectors() { + Some(selector) => { + let (mut interval, throttle) = self.setup_crawl(); + let blacklist_url = self.configuration.get_blacklist(); + + let on_link_find_callback = self.on_link_find_callback; + let shared = Arc::new(( + client.to_owned(), + selector, + self.channel.clone(), + self.external_domains_caseless.clone(), + )); + + let mut links: HashSet = + self._crawl_establish(&shared.0, &shared.1, false).await; + + if !links.is_empty() { + let mut set: JoinSet> = JoinSet::new(); + let chandle = Handle::current(); + + // crawl while links exists + loop { + let stream = tokio_stream::iter::>( + links.drain().collect(), + ) + .throttle(*throttle); + tokio::pin!(stream); + + loop { + match stream.next().await { + Some(link) => { + match handle.as_ref() { + Some(handle) => { + while handle.load(Ordering::Relaxed) == 1 { + interval.tick().await; + } + if handle.load(Ordering::Relaxed) == 2 { + set.shutdown().await; + break; + } + } + None => (), + } + + if !self.is_allowed(&link, &blacklist_url) { + continue; + } + + log("fetch", &link); + self.links_visited.insert(link.clone()); + let permit = SEM.acquire().await.unwrap(); + let shared = shared.clone(); + task::yield_now().await; + + set.spawn_on( + async move { + let link_result = match on_link_find_callback { + Some(cb) => cb(link, None), + _ => (link, None), + }; + let mut page = + Page::new(&link_result.0.as_ref(), &shared.0).await; + page.set_external(shared.3.to_owned()); + + let page_links = page.links(&shared.1).await; + + match &shared.2 { + Some(c) => { + match c.0.send(page) { + _ => (), + }; + } + _ => (), + }; + + drop(permit); + + page_links + }, + &chandle, + ); + } + _ => break, + } + } + + while let Some(res) = set.join_next().await { + match res { + Ok(msg) => { + links.extend(&msg - &self.links_visited); + } + _ => (), + }; + } + + if links.is_empty() { + break; + } + } + } + + self.status = CrawlStatus::Idle; + } + _ => log("", "The domain should be a valid URL, refer to ."), + } + } + /// Start to crawl website concurrently #[cfg(all(not(feature = "decentralized"), feature = "chrome"))] async fn crawl_concurrent(&mut self, client: &Client, handle: &Option>) { @@ -1064,7 +1179,9 @@ impl Website { // crawl if valid selector if selectors.is_some() { - let (blacklist_url, mut interval, throttle) = self.setup_crawl(); + let (mut interval, throttle) = self.setup_crawl(); + let blacklist_url = self.configuration.get_blacklist(); + let on_link_find_callback = self.on_link_find_callback; match launch_browser(&self.configuration.proxies).await { @@ -1206,109 +1323,112 @@ impl Website { /// Start to crawl website concurrently #[cfg(all(not(feature = "decentralized"), not(feature = "chrome")))] async fn crawl_concurrent(&mut self, client: &Client, handle: &Option>) { - let selectors = self.setup_selectors(); - // crawl if valid selector - if selectors.is_some() { - let (blacklist_url, mut interval, throttle) = self.setup_crawl(); - let on_link_find_callback = self.on_link_find_callback; + match self.setup_selectors() { + Some(selector) => { + let (mut interval, throttle) = self.setup_crawl(); + let blacklist_url = self.configuration.get_blacklist(); - let shared = Arc::new(( - client.to_owned(), - unsafe { selectors.unwrap_unchecked() }, - self.channel.clone(), - self.external_domains_caseless.clone(), - )); + let on_link_find_callback = self.on_link_find_callback; - let mut links: HashSet = - self.crawl_establish(&shared.0, &shared.1, false).await; + let shared = Arc::new(( + client.to_owned(), + selector, + self.channel.clone(), + self.external_domains_caseless.clone(), + )); - if !links.is_empty() { - let mut set: JoinSet> = JoinSet::new(); - let chandle = Handle::current(); + let mut links: HashSet = + self.crawl_establish(&shared.0, &shared.1, false).await; - // crawl while links exists - loop { - let stream = tokio_stream::iter::>( - links.drain().collect(), - ) - .throttle(*throttle); - tokio::pin!(stream); + if !links.is_empty() { + let mut set: JoinSet> = JoinSet::new(); + let chandle = Handle::current(); + // crawl while links exists loop { - match stream.next().await { - Some(link) => { - match handle.as_ref() { - Some(handle) => { - while handle.load(Ordering::Relaxed) == 1 { - interval.tick().await; - } - if handle.load(Ordering::Relaxed) == 2 { - set.shutdown().await; - break; + let stream = tokio_stream::iter::>( + links.drain().collect(), + ) + .throttle(*throttle); + tokio::pin!(stream); + + loop { + match stream.next().await { + Some(link) => { + match handle.as_ref() { + Some(handle) => { + while handle.load(Ordering::Relaxed) == 1 { + interval.tick().await; + } + if handle.load(Ordering::Relaxed) == 2 { + set.shutdown().await; + break; + } } + None => (), } - None => (), - } - - if !self.is_allowed(&link, &blacklist_url) { - continue; - } - log("fetch", &link); - self.links_visited.insert(link.clone()); - let permit = SEM.acquire().await.unwrap(); - let shared = shared.clone(); - task::yield_now().await; + if !self.is_allowed(&link, &blacklist_url) { + continue; + } - set.spawn_on( - async move { - let link_result = match on_link_find_callback { - Some(cb) => cb(link, None), - _ => (link, None), - }; - let mut page = - Page::new(&link_result.0.as_ref(), &shared.0).await; - page.set_external(shared.3.to_owned()); + log("fetch", &link); + self.links_visited.insert(link.clone()); + let permit = SEM.acquire().await.unwrap(); + let shared = shared.clone(); + task::yield_now().await; + + set.spawn_on( + async move { + let link_result = match on_link_find_callback { + Some(cb) => cb(link, None), + _ => (link, None), + }; + let mut page = + Page::new(&link_result.0.as_ref(), &shared.0).await; + page.set_external(shared.3.to_owned()); - let page_links = page.links(&shared.1).await; + let page_links = page.links(&shared.1).await; - match &shared.2 { - Some(c) => { - match c.0.send(page) { - _ => (), - }; - } - _ => (), - }; + match &shared.2 { + Some(c) => { + match c.0.send(page) { + _ => (), + }; + } + _ => (), + }; - drop(permit); + drop(permit); - page_links - }, - &chandle, - ); + page_links + }, + &chandle, + ); + } + _ => break, } - _ => break, } - } - while let Some(res) = set.join_next().await { - match res { - Ok(msg) => { - links.extend(&msg - &self.links_visited); - } - _ => (), - }; - } + while let Some(res) = set.join_next().await { + match res { + Ok(msg) => { + links.extend(&msg - &self.links_visited); + } + _ => (), + }; + } - if links.is_empty() { - break; + if links.is_empty() { + break; + } } } - } - self.status = CrawlStatus::Idle; + self.status = CrawlStatus::Idle; + } + _ => log("", "The domain should be a valid URL, refer to ."), } } diff --git a/spider_cli/Cargo.toml b/spider_cli/Cargo.toml index 4026f1ed2..2a80f12d9 100644 --- a/spider_cli/Cargo.toml +++ b/spider_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_cli" -version = "1.49.6" +version = "1.49.7" authors = ["madeindjs ", "j-mendez "] description = "The fastest web crawler CLI written in Rust." repository = "https://github.com/spider-rs/spider" @@ -26,7 +26,7 @@ quote = "1.0.18" failure_derive = "0.1.8" [dependencies.spider] -version = "1.49.6" +version = "1.49.7" path = "../spider" [[bin]] diff --git a/spider_worker/Cargo.toml b/spider_worker/Cargo.toml index 5c946bc89..1d7b96fa9 100644 --- a/spider_worker/Cargo.toml +++ b/spider_worker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_worker" -version = "1.49.6" +version = "1.49.7" authors = ["madeindjs ", "j-mendez "] description = "The fastest web crawler as a worker or proxy." repository = "https://github.com/spider-rs/spider" @@ -22,7 +22,7 @@ lazy_static = "1.4.0" env_logger = "0.10.0" [dependencies.spider] -version = "1.49.6" +version = "1.49.7" path = "../spider" features = ["serde", "flexbuffers"]