diff --git a/CHANGELOG.md b/CHANGELOG.md index 621503d65..c4332cc66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +## v1.50.1 + +1. feat(cron): add cron feature flag [#153] + ## v1.36.0 1. feat(sync): subscribe to page updates to perform async handling of data @@ -12,7 +16,7 @@ ## v1.30.5 -1. "feat(worker): add tls support" +1. feat(worker): add tls support ## v1.30.3 diff --git a/Cargo.lock b/Cargo.lock index 35e78c1e4..ccfe19fb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1102,6 +1102,17 @@ dependencies = [ "itertools", ] +[[package]] +name = "cron" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff76b51e4c068c52bfd2866e1567bee7c567ae8f24ada09fd4307019e25eab7" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -3740,13 +3751,16 @@ dependencies = [ [[package]] name = "spider" -version = "1.49.13" +version = "1.50.1" dependencies = [ "ahash", + "async-trait", "bytes", "case_insensitive_string", "chromiumoxide", + "chrono", "compact_str", + "cron", "cssparser", "ego-tree", "fast_html5ever", @@ -3775,7 +3789,7 @@ dependencies = [ [[package]] name = "spider_cli" -version = "1.49.13" +version = "1.50.1" dependencies = [ "clap 4.4.8", "env_logger 0.9.3", @@ -3787,7 +3801,7 @@ dependencies = [ [[package]] name = "spider_examples" -version = "1.49.13" +version = "1.50.1" dependencies = [ "convert_case", "env_logger 0.9.3", @@ -3808,7 +3822,7 @@ dependencies = [ [[package]] name = "spider_worker" -version = "1.49.13" +version = "1.50.1" dependencies = [ "env_logger 0.10.1", "lazy_static", diff --git a/README.md b/README.md index 790384d86..235ffd0a1 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,16 @@ The fastest web crawler and indexer. Foundational building blocks for data curation workloads. +- Concurrent +- Streaming +- Decentralization +- Headless Chrome Rendering +- HTTP Proxies +- Cron Jobs +- Subscriptions +- Blacklisting and Budgeting Depth +- [Changelog](CHANGELOG.md) + ## Getting Started The simplest way to get started is to use the [Spider Cloud](https://spiderwebai.xyz) for a pain free hosted service. View the [spider](./spider/README.md) or [spider_cli](./spider_cli/README.md) directory for local installations. diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 61dd71f3e..a23b05950 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_examples" -version = "1.49.13" +version = "1.50.1" authors = ["madeindjs ", "j-mendez "] description = "Multithreaded web crawler written in Rust." repository = "https://github.com/spider-rs/spider" @@ -22,7 +22,7 @@ htr = "0.5.27" flexbuffers = "2.0.0" [dependencies.spider] -version = "1.49.13" +version = "1.50.1" path = "../spider" features = ["serde"] @@ -70,4 +70,9 @@ path = "configuration.rs" [[example]] name = "budget" path = "budget.rs" -required-features = ["spider/budget", "spider/sync"] \ No newline at end of file +required-features = ["spider/budget", "spider/sync"] + +[[example]] +name = "cron" +path = "cron.rs" +required-features = ["spider/sync", "spider/cron"] \ No newline at end of file diff --git a/examples/README.md b/examples/README.md index 9f7111f97..dd87aa6b9 100644 --- a/examples/README.md +++ b/examples/README.md @@ -39,3 +39,7 @@ Crawl the page and output the links via [Serde](./serde.rs). Crawl links with a budget of amount of pages allowed [Budget](./budget.rs). - `cargo run --example budget` + +Crawl links at a given cron time [Cron](./cron.rs). + +- `cargo run --example cron` \ No newline at end of file diff --git a/examples/cron.rs b/examples/cron.rs new file mode 100644 index 000000000..dbcf98394 --- /dev/null +++ b/examples/cron.rs @@ -0,0 +1,25 @@ +//! `cargo run --example cron` +extern crate spider; + +use spider::tokio; +use spider::website::{Website, run_cron}; + +#[tokio::main] +async fn main() { + let mut website: Website = Website::new("https://rsseau.fr"); + website.cron_str = "1/5 * * * * *".into(); + + let mut rx2 = website.subscribe(16).unwrap(); + + let join_handle = tokio::spawn(async move { + while let Ok(res) = rx2.recv().await { + println!("{:?}", res.get_url()); + } + }); + + let runner = run_cron(website).await; + + println!("Starting the Runner for 20 seconds"); + tokio::time::sleep(tokio::time::Duration::from_secs(20)).await; + let _ = tokio::join!(runner.stop(), join_handle); +} diff --git a/spider/Cargo.toml b/spider/Cargo.toml index 7ccf764fe..16ceccf75 100644 --- a/spider/Cargo.toml +++ b/spider/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider" -version = "1.49.13" +version = "1.50.1" authors = ["madeindjs ", "j-mendez "] description = "The fastest web crawler written in Rust." repository = "https://github.com/spider-rs/spider" @@ -43,12 +43,15 @@ case_insensitive_string = { version = "0.1.7", features = [ "compact", "serde" ] jsdom = { version = "0.0.11-alpha.1", optional = true, features = [ "hashbrown", "tokio" ] } chromiumoxide = { version = "0.5.6", optional = true, features = ["tokio-runtime", "bytes"], default-features = false } sitemap = { version = "0.4.1", optional = true } +chrono = { version = "0.4.31", optional = true } +cron = { version = "0.12.0", optional = true } +async-trait = { version = "0.1.74", optional = true } [target.'cfg(all(not(windows), not(target_os = "android"), not(target_env = "musl")))'.dependencies] tikv-jemallocator = { version = "0.5.0", optional = true } [features] -default = ["sync"] +default = ["sync", "cron"] regex = ["dep:regex"] glob = ["dep:regex", "dep:itertools"] ua_generator = ["dep:ua_generator"] @@ -70,4 +73,5 @@ chrome = ["dep:chromiumoxide"] chrome_headed = ["chrome"] chrome_cpu = ["chrome"] chrome_stealth = ["chrome"] -cookies = ["reqwest/cookies"] \ No newline at end of file +cookies = ["reqwest/cookies"] +cron = ["dep:chrono", "dep:cron", "dep:async-trait"] diff --git a/spider/README.md b/spider/README.md index 231b8ab89..04da80b04 100644 --- a/spider/README.md +++ b/spider/README.md @@ -16,7 +16,7 @@ This is a basic async example crawling a web page, add spider to your `Cargo.tom ```toml [dependencies] -spider = "1.49.13" +spider = "1.50.1" ``` And then the code: @@ -30,7 +30,7 @@ use spider::tokio; #[tokio::main] async fn main() { let url = "https://choosealicense.com"; - let mut website: Website = Website::new(&url); + let mut website = Website::new(&url); website.crawl().await; for link in website.get_links() { @@ -43,7 +43,7 @@ You can use `Configuration` object to configure your crawler: ```rust // .. -let mut website: Website = Website::new("https://choosealicense.com"); +let mut website = Website::new("https://choosealicense.com"); website.configuration.respect_robots_txt = true; website.configuration.subdomains = true; @@ -52,10 +52,12 @@ website.configuration.delay = 0; // Defaults to 0 ms due to concurrency handling website.configuration.request_timeout = None; // Defaults to 15000 ms website.configuration.http2_prior_knowledge = false; // Enable if you know the webserver supports http2 website.configuration.user_agent = Some("myapp/version".into()); // Defaults to using a random agent -website.on_link_find_callback = Some(|s, html| { println!("link target: {}", s); (s, html)}); // Callback to run on each link find +website.on_link_find_callback = Some(|s, html| { println!("link target: {}", s); (s, html)}); // Callback to run on each link find - useful for mutating the url, ex: convert the top level domain from `.fr` to `.es`. website.configuration.blacklist_url.get_or_insert(Default::default()).push("https://choosealicense.com/licenses/".into()); website.configuration.proxies.get_or_insert(Default::default()).push("socks5://10.1.1.1:12345".into()); // Defaults to None - proxy list. website.budget = Some(spider::hashbrown::HashMap::from([(spider::CaseInsensitiveString::new("*"), 300), (spider::CaseInsensitiveString::new("/licenses"), 10)])); // Defaults to None - Requires the `budget` feature flag +website.cron_str = "1/5 * * * * *".into(); // Defaults to empty string - Requires the `cron` feature flag +website.cron_type = spider::website::CronType::Crawl; // Defaults to CronType::Crawl - Requires the `cron` feature flag website.crawl().await; ``` @@ -78,6 +80,8 @@ website .with_external_domains(Some(Vec::from(["https://creativecommons.org/licenses/by/3.0/"].map(|d| d.to_string())).into_iter())) .with_headers(None) .with_blacklist_url(Some(Vec::from(["https://choosealicense.com/licenses/".into()]))) + // requires the `cron` feature flag + .with_cron("1/5 * * * * *", Default::Default()); .with_proxies(None); ``` @@ -87,7 +91,7 @@ We have a couple optional feature flags. Regex blacklisting, jemaloc backend, gl ```toml [dependencies] -spider = { version = "1.49.13", features = ["regex", "ua_generator"] } +spider = { version = "1.50.1", features = ["regex", "ua_generator"] } ``` 1. `ua_generator`: Enables auto generating a random real User-Agent. @@ -97,7 +101,7 @@ spider = { version = "1.49.13", features = ["regex", "ua_generator"] } 1. `sync`: Subscribe to changes for Page data processing async. [Enabled by default] 1. `budget`: Allows setting a crawl budget per path with depth. 1. `control`: Enables the ability to pause, start, and shutdown crawls on demand. -1. `full_resources`: Enables gathering all content that relates to the domain like css,jss, and etc. +1. `full_resources`: Enables gathering all content that relates to the domain like CSS, JS, and etc. 1. `serde`: Enables serde serialization support. 1. `socks`: Enables socks5 proxy support. 1. `glob`: Enables [url glob](https://everything.curl.dev/cmdline/globbing) support. @@ -105,11 +109,12 @@ spider = { version = "1.49.13", features = ["regex", "ua_generator"] } 1. `js`: Enables javascript parsing links created with the alpha [jsdom](https://github.com/a11ywatch/jsdom) crate. 1. `sitemap`: Include sitemap pages in results. 1. `time`: Enables duration tracking per page. -1. `chrome`: Enables chrome headless rendering, use the env var `CHROME_URL` to connect remotely [experimental]. +1. `chrome`: Enables chrome headless rendering, use the env var `CHROME_URL` to connect remotely. 1. `chrome_headed`: Enables chrome rendering headful rendering [experimental]. 1. `chrome_cpu`: Disable gpu usage for chrome browser. 1. `chrome_stealth`: Enables stealth mode to make it harder to be detected as a bot. 1. `cookies`: Enables cookies storing and setting to use for request. +1. `cron`: Enables the ability to start cron jobs for the website. ### Decentralization @@ -117,7 +122,7 @@ Move processing to a worker, drastically increases performance even if worker is ```toml [dependencies] -spider = { version = "1.49.13", features = ["decentralized"] } +spider = { version = "1.50.1", features = ["decentralized"] } ``` ```sh @@ -137,7 +142,7 @@ Use the subscribe method to get a broadcast channel. ```toml [dependencies] -spider = { version = "1.49.13", features = ["sync"] } +spider = { version = "1.50.1", features = ["sync"] } ``` ```rust,no_run @@ -148,7 +153,7 @@ use spider::tokio; #[tokio::main] async fn main() { - let mut website: Website = Website::new("https://choosealicense.com"); + let mut website = Website::new("https://choosealicense.com"); let mut rx2 = website.subscribe(16).unwrap(); let join_handle = tokio::spawn(async move { @@ -167,7 +172,7 @@ Allow regex for blacklisting routes ```toml [dependencies] -spider = { version = "1.49.13", features = ["regex"] } +spider = { version = "1.50.1", features = ["regex"] } ``` ```rust,no_run @@ -178,7 +183,7 @@ use spider::tokio; #[tokio::main] async fn main() { - let mut website: Website = Website::new("https://choosealicense.com"); + let mut website = Website::new("https://choosealicense.com"); website.configuration.blacklist_url.push("/licenses/".into()); website.crawl().await; @@ -194,7 +199,7 @@ If you are performing large workloads you may need to control the crawler by ena ```toml [dependencies] -spider = { version = "1.49.13", features = ["control"] } +spider = { version = "1.50.1", features = ["control"] } ``` ```rust @@ -211,10 +216,10 @@ async fn main() { tokio::spawn(async move { pause(url).await; - sleep(Duration::from_millis(5000)).await; + sleep(tokio::time::Duration::from_millis(5000)).await; resume(url).await; // perform shutdown if crawl takes longer than 15s - sleep(Duration::from_millis(15000)).await; + sleep(tokio::time::Duration::from_millis(15000)).await; // you could also abort the task to shutdown crawls if using website.crawl in another thread. shutdown(url).await; }); @@ -236,7 +241,7 @@ async fn main() { use std::io::{Write, stdout}; let url = "https://choosealicense.com/"; - let mut website: Website = Website::new(&url); + let mut website = Website::new(&url); website.scrape().await; @@ -258,11 +263,49 @@ async fn main() { } ``` +### Cron Jobs + +Use cron jobs to run crawls continuously at anytime. + +```toml +[dependencies] +spider = { version = "1.50.1", features = ["sync", "cron"] } +``` + +```rust,no_run +extern crate spider; + +use spider::website::{Website, run_cron}; +use spider::tokio; + +#[tokio::main] +async fn main() { + let mut website = Website::new("https://choosealicense.com"); + // set the cron to run or use the builder pattern `website.with_cron`. + website.cron_str = "1/5 * * * * *".into(); + + let mut rx2 = website.subscribe(16).unwrap(); + + let join_handle = tokio::spawn(async move { + while let Ok(res) = rx2.recv().await { + println!("{:?}", res.get_url()); + } + }); + + // take ownership of the website. You can also use website.run_cron, except you need to perform abort manually on handles created. + let runner = run_cron(website).await; + + println!("Starting the Runner for 10 seconds"); + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + let _ = tokio::join!(runner.stop(), join_handle); +} +``` + ### Chrome ```toml [dependencies] -spider = { version = "1.49.13", features = ["chrome"] } +spider = { version = "1.50.1", features = ["chrome"] } ``` You can use `website.crawl_concurrent_raw` to perform a crawl without chromium when needed. Use the feature flag `chrome_headed` to enable headful browser usage if needed to debug. diff --git a/spider/src/features/cron.rs b/spider/src/features/cron.rs new file mode 100644 index 000000000..3dc9791b3 --- /dev/null +++ b/spider/src/features/cron.rs @@ -0,0 +1,346 @@ +//! # Cron: a simple cron runner - fork of crony with changes that allow async. +//! +//! Use the `Job` trait to create your cron job struct, pass it to the `Runner` and then start it via `run()` method. +//! Runner will spawn new thread where it will start looping through the jobs and will run their handle +//! method once the scheduled time is reached. +//! +//! +extern crate chrono; +extern crate cron; + +use async_trait::async_trait; +use chrono::{DateTime, Duration, Utc}; +pub use cron::Schedule; +use lazy_static::lazy_static; +use log::{debug, error, info}; +use std::panic; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, Arc, Mutex, +}; +use tokio::task::JoinHandle; + +lazy_static! { + /// Singleton instance of a tracker that won't allow + /// same job to run again while its already running + /// unless you specificly allow the job to run in + /// parallel with itself + pub static ref TRACKER: Mutex = Mutex::new(Tracker::new()); +} + +#[async_trait] +pub trait Job: Send + Sync { + /// Default implementation of is_active method will + /// make this job always active + fn is_active(&self) -> bool { + true + } + + /// In case your job takes longer to finish and it's scheduled + /// to start again (while its still running), default behaviour + /// will skip the next run while one instance is already running. + /// (if your OS has enough threads, and is spawning a thread for next job) + /// + /// To override this behaviour and enable it to run in parallel + /// with other instances of self, return `true` on this instance. + fn allow_parallel_runs(&self) -> bool { + false + } + + /// Define the run schedule for your job + fn schedule(&self) -> Option; + + /// This is where your jobs magic happens, define the action that + /// will happen once the cron start running your job + /// + /// If this method panics, your entire job will panic and that may + /// or may not make the whole runner panic. Handle your errors + /// properly and don't let it panic. + async fn handle(&mut self); + + /// Decide wheather or not to start running your job + fn should_run(&self) -> bool { + if self.is_active() { + match self.schedule() { + Some(schedule) => { + for item in schedule.upcoming(Utc).take(1) { + let now = Utc::now(); + let difference = item - now; + if difference <= Duration::milliseconds(100) { + return true; + } + } + } + _ => (), + } + } + + false + } + + /// Simple output that will return current time so you don't have to do so + /// in your job if you wish to display the time of the run. + fn now(&self) -> DateTime { + Utc::now() + } +} + +/// Struct for marking jobs running +pub struct Tracker(Vec); + +impl Default for Tracker { + fn default() -> Self { + Self::new() + } +} + +impl Tracker { + /// Return new instance of running + pub fn new() -> Self { + Tracker(vec![]) + } + + /// Check if id of the job is marked as running + pub fn running(&self, id: &usize) -> bool { + self.0.contains(id) + } + + /// Set job id as running + pub fn start(&mut self, id: &usize) -> usize { + if !self.running(id) { + self.0.push(*id); + } + self.0.len() + } + + /// Unmark the job from running + pub fn stop(&mut self, id: &usize) -> usize { + if self.running(id) { + match self.0.iter().position(|&r| r == *id) { + Some(i) => self.0.remove(i), + None => 0, + }; + } + self.0.len() + } +} + +/// Runner that will hold all the jobs and will start up the execution +/// and eventually will stop it. +pub struct Runner { + /// the current jobs + pub jobs: Vec>, + /// the task that is running the handle + pub thread: Option>, + /// is the task running or not + pub running: bool, + /// channel sending message + pub tx: Option>>, + /// tracker to determine crons working + pub working: Arc, +} + +impl Default for Runner { + fn default() -> Self { + Self::new() + } +} + +impl Runner { + /// Create new runner + pub fn new() -> Self { + Runner { + jobs: vec![], + thread: None, + running: false, + tx: None, + working: Arc::new(AtomicBool::new(false)), + } + } + + /// Add jobs into the runner + /// + /// **panics** if you try to push a job onto already started runner + #[allow(clippy::should_implement_trait)] + pub fn add(mut self, job: Box) -> Self { + if self.running { + panic!("Cannot push job onto runner once the runner is started!"); + } + self.jobs.push(job); + self + } + + /// Number of jobs ready to start running + pub fn jobs_to_run(&self) -> usize { + self.jobs.len() + } + + /// Start the loop and job execution + pub async fn run(self) -> Self { + if self.jobs.is_empty() { + return self; + } + + let working = Arc::new(AtomicBool::new(false)); + let (thread, tx) = spawn(self, working.clone()).await; + + Self { + thread, + jobs: vec![], + running: true, + tx, + working, + } + } + + /// Stop the spawned runner + pub async fn stop(mut self) { + if !self.running { + return; + } + if let Some(thread) = self.thread.take() { + if let Some(tx) = self.tx { + match tx.send(Ok(())) { + Ok(_) => (), + Err(e) => error!("Could not send stop signal to cron runner thread: {}", e), + }; + } + thread.abort() + } + } + + /// Lets us know if the cron worker is running + pub fn is_running(&self) -> bool { + self.running + } + + /// Lets us know if the worker is in the process of executing a job currently + pub fn is_working(&self) -> bool { + self.working.load(Ordering::Relaxed) + } +} + +/// Spawn the thread for the runner and return its sender to stop it +async fn spawn( + runner: Runner, + working: Arc, +) -> (Option>, Option>>) { + let (tx, rx): (Sender>, Receiver>) = mpsc::channel(); + + let handler = tokio::spawn(async move { + let mut jobs = runner.jobs; + + loop { + if rx.try_recv().is_ok() { + info!("Stopping the cron runner thread"); + break; + } + + for (id, job) in jobs.iter_mut().enumerate() { + let no = (id + 1).to_string(); + + if job.should_run() + && (job.allow_parallel_runs() || !TRACKER.lock().unwrap().running(&id)) + { + TRACKER.lock().unwrap().start(&id); + + let now = Utc::now(); + debug!( + "START: {} --- {}", + format!("cron-job-thread-{}", no), + now.format("%H:%M:%S%.f") + ); + working.store(true, Ordering::Relaxed); + + // keep the work on the same task for now. + job.handle().await; + + working.store(TRACKER.lock().unwrap().stop(&id) != 0, Ordering::Relaxed); + + debug!( + "FINISH: {} --- {}", + format!("cron-job-thread-{}", no), + now.format("%H:%M:%S%.f") + ); + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + }); + + (Some(handler), Some(tx)) +} + +#[cfg(test)] +mod tests { + use super::{Job, Runner}; + use async_trait::async_trait; + use cron::Schedule; + use std::str::FromStr; + struct SomeJob; + + #[async_trait] + impl Job for SomeJob { + fn schedule(&self) -> Option { + Some(Schedule::from_str("0 * * * * *").unwrap()) + } + + async fn handle(&mut self) {} + } + struct AnotherJob; + #[async_trait] + impl Job for AnotherJob { + fn schedule(&self) -> Option { + Some(Schedule::from_str("0 * * * * *").unwrap()) + } + + async fn handle(&mut self) {} + } + #[tokio::test] + async fn create_job() { + let mut some_job = SomeJob; + + assert_eq!(some_job.handle().await, ()); + } + + #[tokio::test] + async fn test_adding_jobs_to_runner() { + let some_job = SomeJob; + let another_job = AnotherJob; + + let runner = Runner::new() + .add(Box::new(some_job)) + .add(Box::new(another_job)); + + assert_eq!(runner.jobs_to_run(), 2); + } + + #[tokio::test] + async fn test_jobs_are_empty_after_runner_starts() { + let some_job = SomeJob; + let another_job = AnotherJob; + + let runner = Runner::new() + .add(Box::new(some_job)) + .add(Box::new(another_job)) + .run() + .await; + + assert_eq!(runner.jobs_to_run(), 0); + } + + #[tokio::test] + async fn test_stopping_the_runner() { + let some_job = SomeJob; + let another_job = AnotherJob; + + let runner = Runner::new() + .add(Box::new(some_job)) + .add(Box::new(another_job)) + .run() + .await; + + assert_eq!(runner.stop().await, ()); + } +} diff --git a/spider/src/features/mod.rs b/spider/src/features/mod.rs index b743cf410..5724c5ba1 100644 --- a/spider/src/features/mod.rs +++ b/spider/src/features/mod.rs @@ -1,6 +1,9 @@ /// Chrome utils #[cfg(feature = "chrome")] pub mod chrome; +/// Cron jobs +#[cfg(feature = "cron")] +pub mod cron; /// URL globbing #[cfg(feature = "glob")] pub mod glob; diff --git a/spider/src/lib.rs b/spider/src/lib.rs index 83deeb80a..b6b395500 100644 --- a/spider/src/lib.rs +++ b/spider/src/lib.rs @@ -64,6 +64,7 @@ //! - `chrome_cpu`: Disable gpu usage for chrome browser. //! - `chrome_stealth`: Enables stealth mode to make it harder to be detected as a bot. //! - `cookies`: Enables cookies storing and setting to use for request. +//! - `cron`: Enables the ability to start cron jobs for the website. pub extern crate bytes; pub extern crate compact_str; diff --git a/spider/src/website.rs b/spider/src/website.rs index db412f6a7..c4715296b 100644 --- a/spider/src/website.rs +++ b/spider/src/website.rs @@ -1,16 +1,21 @@ use crate::black_list::contains; use crate::configuration::{get_ua, Configuration}; +use crate::features::cron::Job; use crate::packages::robotparser::parser::RobotFileParser; use crate::page::{build, get_page_selectors, Page}; use crate::utils::log; use crate::CaseInsensitiveString; + +#[cfg(feature = "cron")] +use async_trait::async_trait; + use compact_str::CompactString; #[cfg(feature = "budget")] use hashbrown::HashMap; use hashbrown::HashSet; -use reqwest::{Client, ClientBuilder}; +use reqwest::Client; use std::io::{Error, ErrorKind}; use std::sync::atomic::{AtomicI8, Ordering}; use std::sync::Arc; @@ -115,6 +120,17 @@ pub enum CrawlStatus { Paused, } +#[cfg(feature = "cron")] +/// The type of cron job to run +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub enum CronType { + #[default] + /// Crawl collecting links, page data, and etc. + Crawl, + /// Scrape collecting links, page data as bytes to store, and etc. + Scrape, +} + /// Represents a website to crawl and gather all links. /// ```rust /// use spider::website::Website; @@ -157,6 +173,12 @@ pub struct Website { #[cfg(feature = "cookies")] /// Cookie string to use for network requests ex: "foo=bar; Domain=blog.spider" pub cookie_str: String, + #[cfg(feature = "cron")] + /// Cron string to perform crawls - use to help generate a valid cron for needs. + pub cron_str: String, + #[cfg(feature = "cron")] + /// The type of cron to run either crawl or scrape + pub cron_type: CronType, } impl Website { @@ -463,7 +485,8 @@ impl Website { } /// build the http client - fn configure_http_client_builder(&mut self) -> ClientBuilder { + #[cfg(not(feature = "decentralized"))] + fn configure_http_client_builder(&mut self) -> reqwest::ClientBuilder { let host_str = self.domain_parsed.as_deref().cloned(); let default_policy = reqwest::redirect::Policy::default(); let policy = match host_str { @@ -2122,6 +2145,14 @@ impl Website { self } + #[cfg(feature = "cron")] + /// Setup cron jobs to run + pub fn with_cron(&mut self, cron_str: &str, cron_type: CronType) -> &mut Self { + self.cron_str = cron_str.into(); + self.cron_type = cron_type; + self + } + /// Build the website configuration when using with_builder pub fn build(&self) -> Result { if self.domain_parsed.is_none() { @@ -2152,6 +2183,50 @@ impl Website { Some(rx2) } + + #[cfg(feature = "cron")] + /// Start a cron job - if you use subscribe on another thread you need to abort the handle in conjuction with runner.stop. + pub async fn run_cron(&self) -> crate::features::cron::Runner { + crate::features::cron::Runner::new() + .add(Box::new(self.clone())) + .run() + .await + } +} + +#[cfg(feature = "cron")] +/// Start a cron job taking ownership of the website +pub async fn run_cron(website: Website) -> crate::features::cron::Runner { + crate::features::cron::Runner::new() + .add(Box::new(website)) + .run() + .await +} + +#[cfg(feature = "cron")] +#[async_trait] +impl Job for Website { + fn schedule(&self) -> Option { + match self.cron_str.parse() { + Ok(schedule) => Some(schedule), + Err(e) => { + log::error!("{:?}", e); + None + } + } + } + async fn handle(&mut self) { + log::info!( + "CRON: {} - cron job running {}", + self.get_domain().as_ref(), + self.now() + ); + if self.cron_type == CronType::Crawl { + self.crawl().await; + } else { + self.scrape().await; + } + } } #[cfg(not(feature = "decentralized"))] @@ -2169,6 +2244,70 @@ async fn crawl() { ); } +#[cfg(feature = "cron")] +#[tokio::test] +async fn crawl_cron() { + let url = "https://choosealicense.com"; + let mut website: Website = Website::new(&url) + .with_cron("1/5 * * * * *", Default::default()) + .build() + .unwrap(); + let mut rx2 = website.subscribe(16).unwrap(); + + // handle an event on every cron + let join_handle = tokio::spawn(async move { + let mut links_visited = HashSet::new(); + while let Ok(res) = rx2.recv().await { + let url = res.get_url(); + links_visited.insert(CaseInsensitiveString::new(url)); + } + assert!( + links_visited + .contains::(&"https://choosealicense.com/licenses/".into()), + "{:?}", + links_visited + ); + }); + + let runner = website.run_cron().await; + log::debug!("Starting the Runner for 10 seconds"); + tokio::time::sleep(Duration::from_secs(10)).await; + runner.stop().await; + join_handle.abort(); + let _ = join_handle.await; +} + +#[cfg(feature = "cron")] +#[tokio::test] +async fn crawl_cron_own() { + let url = "https://choosealicense.com"; + let mut website: Website = Website::new(&url) + .with_cron("1/5 * * * * *", Default::default()) + .build() + .unwrap(); + let mut rx2 = website.subscribe(16).unwrap(); + + // handle an event on every cron + let join_handle = tokio::spawn(async move { + let mut links_visited = HashSet::new(); + while let Ok(res) = rx2.recv().await { + let url = res.get_url(); + links_visited.insert(CaseInsensitiveString::new(url)); + } + assert!( + links_visited + .contains::(&"https://choosealicense.com/licenses/".into()), + "{:?}", + links_visited + ); + }); + + let runner = run_cron(website).await; + log::debug!("Starting the Runner for 10 seconds"); + tokio::time::sleep(Duration::from_secs(10)).await; + let _ = tokio::join!(runner.stop(), join_handle); +} + #[cfg(not(feature = "decentralized"))] #[tokio::test] async fn scrape() { diff --git a/spider_cli/Cargo.toml b/spider_cli/Cargo.toml index 198cf70f7..4269d2016 100644 --- a/spider_cli/Cargo.toml +++ b/spider_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_cli" -version = "1.49.13" +version = "1.50.1" authors = ["madeindjs ", "j-mendez "] description = "The fastest web crawler CLI written in Rust." repository = "https://github.com/spider-rs/spider" @@ -26,7 +26,7 @@ quote = "1.0.18" failure_derive = "0.1.8" [dependencies.spider] -version = "1.49.13" +version = "1.50.1" path = "../spider" [[bin]] diff --git a/spider_worker/Cargo.toml b/spider_worker/Cargo.toml index 08031145c..64b07b9b6 100644 --- a/spider_worker/Cargo.toml +++ b/spider_worker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_worker" -version = "1.49.13" +version = "1.50.1" authors = ["madeindjs ", "j-mendez "] description = "The fastest web crawler as a worker or proxy." repository = "https://github.com/spider-rs/spider" @@ -22,7 +22,7 @@ lazy_static = "1.4.0" env_logger = "0.10.0" [dependencies.spider] -version = "1.49.13" +version = "1.50.1" path = "../spider" features = ["serde", "flexbuffers"]